Feature 10981: add Openstack metrics collector and scale-out/in DAGs for autoscaling
[osm/NG-SA.git] / src / osm_ngsa / dags / multivim_vm_metrics.py
diff --git a/src/osm_ngsa/dags/multivim_vm_metrics.py b/src/osm_ngsa/dags/multivim_vm_metrics.py
new file mode 100644 (file)
index 0000000..6f03292
--- /dev/null
@@ -0,0 +1,324 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime, timedelta
+import logging
+from math import ceil
+from typing import Dict, List
+
+from airflow import DAG
+from airflow.decorators import task
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.vim_connectors.openstack import OpenStackCollector
+from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
+
+
+SCHEDULE_INTERVAL = 5
+COLLECTOR_MAX_METRICS_PER_TASK = 100
+SUPPORTED_VIM_TYPES = ["openstack", "vio"]
+PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
+PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
+PROMETHEUS_METRICS = {
+    "cpu_utilization": {
+        "metric_name": "cpu_utilization",
+        "metric_descr": "CPU usage percentage",
+    },
+    "average_memory_utilization": {
+        "metric_name": "average_memory_utilization",
+        "metric_descr": "Volume of RAM in MB used by the VM",
+    },
+    "disk_read_ops": {
+        "metric_name": "disk_read_ops",
+        "metric_descr": "Number of read requests",
+    },
+    "disk_write_ops": {
+        "metric_name": "disk_write_ops",
+        "metric_descr": "Number of write requests",
+    },
+    "disk_read_bytes": {
+        "metric_name": "disk_read_bytes",
+        "metric_descr": "Volume of reads in bytes",
+    },
+    "disk_write_bytes": {
+        "metric_name": "disk_write_bytes",
+        "metric_descr": "Volume of writes in bytes",
+    },
+    "packets_received": {
+        "metric_name": "packets_received",
+        "metric_descr": "Number of incoming packets",
+    },
+    "packets_sent": {
+        "metric_name": "packets_sent",
+        "metric_descr": "Number of outgoing packets",
+    },
+    "packets_in_dropped": {
+        "metric_name": "packets_in_dropped",
+        "metric_descr": "Number of incoming dropped packets",
+    },
+    "packets_out_dropped": {
+        "metric_name": "packets_out_dropped",
+        "metric_descr": "Number of outgoing dropped packets",
+    },
+}
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+def get_all_vim():
+    """Get VIMs from MongoDB"""
+    logger.info("Getting VIM list")
+
+    cfg = Config()
+    logger.info(cfg.conf)
+    common_db = CommonDbClient(cfg)
+    vim_accounts = common_db.get_vim_accounts()
+    vim_list = []
+    for vim in vim_accounts:
+        logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
+        vim_list.append(
+            {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
+        )
+
+    logger.info(vim_list)
+    logger.info("Getting VIM list OK")
+    return vim_list
+
+
+def create_dag(dag_id, dag_number, dag_description, vim_id):
+    dag = DAG(
+        dag_id,
+        catchup=False,
+        default_args={
+            "depends_on_past": False,
+            "retries": 1,
+            # "retry_delay": timedelta(minutes=1),
+            "retry_delay": timedelta(seconds=10),
+        },
+        description=dag_description,
+        is_paused_upon_creation=False,
+        # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
+        schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
+        start_date=datetime(2022, 1, 1),
+        tags=["osm", "vdu"],
+    )
+
+    with dag:
+
+        @task(task_id="extract_metrics_from_vnfrs")
+        def extract_metrics_from_vnfrs(vim_id: str):
+            """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
+
+            # Get VNFDs that include "monitoring-parameter" from MongoDB
+            cfg = Config()
+            common_db = CommonDbClient(cfg)
+            logger.info("Getting VNFDs with monitoring parameters from MongoDB")
+            vnfd_list = common_db.get_monitoring_vnfds()
+            # Get VNFR list from MongoDB
+            logger.info("Getting VNFR list from MongoDB")
+            vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id)
+            # Only read metrics if ns state is one of the nsAllowedStatesSet
+            nsAllowedStatesSet = {"INSTANTIATED"}
+            metric_list = []
+            for vnfr in vnfr_list:
+                if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet:
+                    continue
+                # Check if VNFR is in "monitoring-parameter" VNFDs list
+                vnfd_id = vnfr["vnfd-id"]
+                vnfd = next(
+                    (item for item in vnfd_list if item["_id"] == vnfd_id), None
+                )
+                if not vnfd:
+                    continue
+                ns_id = vnfr["nsr-id-ref"]
+                vnf_index = vnfr["member-vnf-index-ref"]
+                logger.info(
+                    f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
+                )
+                project_list = vnfr.get("_admin", {}).get("projects_read", [])
+                project_id = "None"
+                if project_list:
+                    project_id = project_list[0]
+                for vdur in vnfr.get("vdur", []):
+                    vim_info = vdur.get("vim_info")
+                    if not vim_info:
+                        logger.error("Error: vim_info not available in vdur")
+                        continue
+                    if len(vim_info) != 1:
+                        logger.error("Error: more than one vim_info in vdur")
+                        continue
+                    vim_id = next(iter(vim_info))[4:]
+                    vm_id = vdur.get("vim-id")
+                    if not vm_id:
+                        logger.error("Error: vim-id not available in vdur")
+                        continue
+                    vdu_name = vdur.get("name", "UNKNOWN")
+                    vdu = next(
+                        filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"])
+                    )
+                    if "monitoring-parameter" not in vdu:
+                        logger.error("Error: no monitoring-parameter in descriptor")
+                        continue
+                    for param in vdu["monitoring-parameter"]:
+                        metric_name = param["performance-metric"]
+                        metric_id = param["id"]
+                        metric = {
+                            "metric": metric_name,
+                            "metric_id": metric_id,
+                            "vm_id": vm_id,
+                            "ns_id": ns_id,
+                            "project_id": project_id,
+                            "vdu_name": vdu_name,
+                            "vnf_member_index": vnf_index,
+                            "vdu_id": vdu["id"],
+                        }
+                        metric_list.append(metric)
+
+            logger.info(f"Metrics to collect: {len(metric_list)}")
+            return metric_list
+
+        @task(task_id="split_metrics")
+        def split_metrics(metric_list: List[Dict]):
+            """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
+            n_metrics = len(metric_list)
+            if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK:
+                return [metric_list]
+            metrics_per_chunk = ceil(
+                n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK)
+            )
+            logger.info(
+                f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
+            )
+            chunks = []
+            for i in range(0, n_metrics, metrics_per_chunk):
+                chunks.append(metric_list[i : i + metrics_per_chunk])
+            return chunks
+
+        @task(task_id="collect_metrics")
+        def collect_metrics(vim_id: str, metric_list: List[Dict]):
+            """Collect servers metrics"""
+            if not metric_list:
+                return []
+
+            # Get VIM account info from MongoDB
+            logger.info(f"Reading VIM info, id: {vim_id}")
+            cfg = Config()
+            common_db = CommonDbClient(cfg)
+            vim_account = common_db.get_vim_account(vim_account_id=vim_id)
+            # Create VIM metrics collector
+            vim_type = vim_account["vim_type"]
+            if "config" in vim_account and "vim_type" in vim_account["config"]:
+                vim_type = vim_account["config"]["vim_type"].lower()
+                if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
+                    vim_type = "openstack"
+            if vim_type == "openstack":
+                collector = OpenStackCollector(vim_account)
+            else:
+                logger.error(f"VIM type '{vim_type}' not supported")
+                return None
+            # Get metrics
+            results = []
+            if collector:
+                results = collector.collect_metrics(metric_list)
+            logger.info(results)
+            return results
+
+        @task(task_id="send_prometheus")
+        def send_prometheus(metric_lists: List[List[Dict]]):
+            """Send servers metrics to Prometheus Push Gateway"""
+            logger.info(metric_lists)
+
+            # Define Prometheus metrics
+            registry = CollectorRegistry()
+            prom_metrics = {}
+            prom_metrics_keys = PROMETHEUS_METRICS.keys()
+            for key in prom_metrics_keys:
+                prom_metrics[key] = Gauge(
+                    PROMETHEUS_METRICS[key]["metric_name"],
+                    PROMETHEUS_METRICS[key]["metric_descr"],
+                    labelnames=[
+                        "metric_id",
+                        "ns_id",
+                        "project_id",
+                        "vnf_member_index",
+                        "vm_id",
+                        "vim_id",
+                        "vdu_name",
+                        "vdu_id",
+                    ],
+                    registry=registry,
+                )
+
+            for metric_list in metric_lists:
+                for metric in metric_list:
+                    metric_name = metric["metric"]
+                    metric_id = metric["metric_id"]
+                    value = metric["value"]
+                    vm_id = metric["vm_id"]
+                    vm_name = metric.get("vdu_name", "")
+                    ns_id = metric["ns_id"]
+                    project_id = metric["project_id"]
+                    vnf_index = metric["vnf_member_index"]
+                    vdu_id = metric["vdu_id"]
+                    logger.info(
+                        f"  metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
+                    )
+                    if metric_name in prom_metrics_keys:
+                        prom_metrics[metric_name].labels(
+                            metric_id,
+                            ns_id,
+                            project_id,
+                            vnf_index,
+                            vm_id,
+                            vim_id,
+                            vm_name,
+                            vdu_id,
+                        ).set(value)
+
+            # Push to Prometheus
+            push_to_gateway(
+                gateway=PROMETHEUS_PUSHGW,
+                job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
+                registry=registry,
+            )
+            return
+
+        chunks = split_metrics(extract_metrics_from_vnfrs(vim_id))
+        send_prometheus(
+            collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks)
+        )
+
+    return dag
+
+
+vim_list = get_all_vim()
+for index, vim in enumerate(vim_list):
+    vim_type = vim["vim_type"]
+    if vim_type in SUPPORTED_VIM_TYPES:
+        vim_id = vim["_id"]
+        vim_name = vim["name"]
+        dag_description = f"Dag for collecting VM metrics from VIM {vim_name}"
+        dag_id = f"vm_metrics_vim_{vim_id}"
+        logger.info(f"Creating DAG {dag_id}")
+        globals()[dag_id] = create_dag(
+            dag_id=dag_id,
+            dag_number=index,
+            dag_description=dag_description,
+            vim_id=vim_id,
+        )
+    else:
+        logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics")