X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=src%2Fosm_ngsa%2Fdags%2Fmultivim_vm_metrics.py;fp=src%2Fosm_ngsa%2Fdags%2Fmultivim_vm_metrics.py;h=6f03292da65a3f8ed80bc69a855182f4234d973c;hb=9377117ccd1b86e62f1a3e8b63aee2880a5b9c55;hp=0000000000000000000000000000000000000000;hpb=64a4473f399192c615d08ff06ebf8326c4c6c4b1;p=osm%2FNG-SA.git diff --git a/src/osm_ngsa/dags/multivim_vm_metrics.py b/src/osm_ngsa/dags/multivim_vm_metrics.py new file mode 100644 index 0000000..6f03292 --- /dev/null +++ b/src/osm_ngsa/dags/multivim_vm_metrics.py @@ -0,0 +1,324 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +from datetime import datetime, timedelta +import logging +from math import ceil +from typing import Dict, List + +from airflow import DAG +from airflow.decorators import task +from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config +from osm_mon.vim_connectors.openstack import OpenStackCollector +from prometheus_client import CollectorRegistry, Gauge, push_to_gateway + + +SCHEDULE_INTERVAL = 5 +COLLECTOR_MAX_METRICS_PER_TASK = 100 +SUPPORTED_VIM_TYPES = ["openstack", "vio"] +PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091" +PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_" +PROMETHEUS_METRICS = { + "cpu_utilization": { + "metric_name": "cpu_utilization", + "metric_descr": "CPU usage percentage", + }, + "average_memory_utilization": { + "metric_name": "average_memory_utilization", + "metric_descr": "Volume of RAM in MB used by the VM", + }, + "disk_read_ops": { + "metric_name": "disk_read_ops", + "metric_descr": "Number of read requests", + }, + "disk_write_ops": { + "metric_name": "disk_write_ops", + "metric_descr": "Number of write requests", + }, + "disk_read_bytes": { + "metric_name": "disk_read_bytes", + "metric_descr": "Volume of reads in bytes", + }, + "disk_write_bytes": { + "metric_name": "disk_write_bytes", + "metric_descr": "Volume of writes in bytes", + }, + "packets_received": { + "metric_name": "packets_received", + "metric_descr": "Number of incoming packets", + }, + "packets_sent": { + "metric_name": "packets_sent", + "metric_descr": "Number of outgoing packets", + }, + "packets_in_dropped": { + "metric_name": "packets_in_dropped", + "metric_descr": "Number of incoming dropped packets", + }, + "packets_out_dropped": { + "metric_name": "packets_out_dropped", + "metric_descr": "Number of outgoing dropped packets", + }, +} + +# Logging +logger = logging.getLogger("airflow.task") + + +def get_all_vim(): + """Get VIMs from MongoDB""" + logger.info("Getting VIM list") + + cfg = Config() + logger.info(cfg.conf) + common_db = CommonDbClient(cfg) + vim_accounts = common_db.get_vim_accounts() + vim_list = [] + for vim in vim_accounts: + logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})') + vim_list.append( + {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]} + ) + + logger.info(vim_list) + logger.info("Getting VIM list OK") + return vim_list + + +def create_dag(dag_id, dag_number, dag_description, vim_id): + dag = DAG( + dag_id, + catchup=False, + default_args={ + "depends_on_past": False, + "retries": 1, + # "retry_delay": timedelta(minutes=1), + "retry_delay": timedelta(seconds=10), + }, + description=dag_description, + is_paused_upon_creation=False, + # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL), + schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *", + start_date=datetime(2022, 1, 1), + tags=["osm", "vdu"], + ) + + with dag: + + @task(task_id="extract_metrics_from_vnfrs") + def extract_metrics_from_vnfrs(vim_id: str): + """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB""" + + # Get VNFDs that include "monitoring-parameter" from MongoDB + cfg = Config() + common_db = CommonDbClient(cfg) + logger.info("Getting VNFDs with monitoring parameters from MongoDB") + vnfd_list = common_db.get_monitoring_vnfds() + # Get VNFR list from MongoDB + logger.info("Getting VNFR list from MongoDB") + vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id) + # Only read metrics if ns state is one of the nsAllowedStatesSet + nsAllowedStatesSet = {"INSTANTIATED"} + metric_list = [] + for vnfr in vnfr_list: + if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet: + continue + # Check if VNFR is in "monitoring-parameter" VNFDs list + vnfd_id = vnfr["vnfd-id"] + vnfd = next( + (item for item in vnfd_list if item["_id"] == vnfd_id), None + ) + if not vnfd: + continue + ns_id = vnfr["nsr-id-ref"] + vnf_index = vnfr["member-vnf-index-ref"] + logger.info( + f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter" + ) + project_list = vnfr.get("_admin", {}).get("projects_read", []) + project_id = "None" + if project_list: + project_id = project_list[0] + for vdur in vnfr.get("vdur", []): + vim_info = vdur.get("vim_info") + if not vim_info: + logger.error("Error: vim_info not available in vdur") + continue + if len(vim_info) != 1: + logger.error("Error: more than one vim_info in vdur") + continue + vim_id = next(iter(vim_info))[4:] + vm_id = vdur.get("vim-id") + if not vm_id: + logger.error("Error: vim-id not available in vdur") + continue + vdu_name = vdur.get("name", "UNKNOWN") + vdu = next( + filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"]) + ) + if "monitoring-parameter" not in vdu: + logger.error("Error: no monitoring-parameter in descriptor") + continue + for param in vdu["monitoring-parameter"]: + metric_name = param["performance-metric"] + metric_id = param["id"] + metric = { + "metric": metric_name, + "metric_id": metric_id, + "vm_id": vm_id, + "ns_id": ns_id, + "project_id": project_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_index, + "vdu_id": vdu["id"], + } + metric_list.append(metric) + + logger.info(f"Metrics to collect: {len(metric_list)}") + return metric_list + + @task(task_id="split_metrics") + def split_metrics(metric_list: List[Dict]): + """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK""" + n_metrics = len(metric_list) + if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK: + return [metric_list] + metrics_per_chunk = ceil( + n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK) + ) + logger.info( + f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items" + ) + chunks = [] + for i in range(0, n_metrics, metrics_per_chunk): + chunks.append(metric_list[i : i + metrics_per_chunk]) + return chunks + + @task(task_id="collect_metrics") + def collect_metrics(vim_id: str, metric_list: List[Dict]): + """Collect servers metrics""" + if not metric_list: + return [] + + # Get VIM account info from MongoDB + logger.info(f"Reading VIM info, id: {vim_id}") + cfg = Config() + common_db = CommonDbClient(cfg) + vim_account = common_db.get_vim_account(vim_account_id=vim_id) + # Create VIM metrics collector + vim_type = vim_account["vim_type"] + if "config" in vim_account and "vim_type" in vim_account["config"]: + vim_type = vim_account["config"]["vim_type"].lower() + if vim_type == "vio" and "vrops_site" not in vim_account["config"]: + vim_type = "openstack" + if vim_type == "openstack": + collector = OpenStackCollector(vim_account) + else: + logger.error(f"VIM type '{vim_type}' not supported") + return None + # Get metrics + results = [] + if collector: + results = collector.collect_metrics(metric_list) + logger.info(results) + return results + + @task(task_id="send_prometheus") + def send_prometheus(metric_lists: List[List[Dict]]): + """Send servers metrics to Prometheus Push Gateway""" + logger.info(metric_lists) + + # Define Prometheus metrics + registry = CollectorRegistry() + prom_metrics = {} + prom_metrics_keys = PROMETHEUS_METRICS.keys() + for key in prom_metrics_keys: + prom_metrics[key] = Gauge( + PROMETHEUS_METRICS[key]["metric_name"], + PROMETHEUS_METRICS[key]["metric_descr"], + labelnames=[ + "metric_id", + "ns_id", + "project_id", + "vnf_member_index", + "vm_id", + "vim_id", + "vdu_name", + "vdu_id", + ], + registry=registry, + ) + + for metric_list in metric_lists: + for metric in metric_list: + metric_name = metric["metric"] + metric_id = metric["metric_id"] + value = metric["value"] + vm_id = metric["vm_id"] + vm_name = metric.get("vdu_name", "") + ns_id = metric["ns_id"] + project_id = metric["project_id"] + vnf_index = metric["vnf_member_index"] + vdu_id = metric["vdu_id"] + logger.info( + f" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} " + ) + if metric_name in prom_metrics_keys: + prom_metrics[metric_name].labels( + metric_id, + ns_id, + project_id, + vnf_index, + vm_id, + vim_id, + vm_name, + vdu_id, + ).set(value) + + # Push to Prometheus + push_to_gateway( + gateway=PROMETHEUS_PUSHGW, + job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}", + registry=registry, + ) + return + + chunks = split_metrics(extract_metrics_from_vnfrs(vim_id)) + send_prometheus( + collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks) + ) + + return dag + + +vim_list = get_all_vim() +for index, vim in enumerate(vim_list): + vim_type = vim["vim_type"] + if vim_type in SUPPORTED_VIM_TYPES: + vim_id = vim["_id"] + vim_name = vim["name"] + dag_description = f"Dag for collecting VM metrics from VIM {vim_name}" + dag_id = f"vm_metrics_vim_{vim_id}" + logger.info(f"Creating DAG {dag_id}") + globals()[dag_id] = create_dag( + dag_id=dag_id, + dag_number=index, + dag_description=dag_description, + vim_id=vim_id, + ) + else: + logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics")