From: aguilard Date: Thu, 13 Apr 2023 10:43:07 +0000 (+0000) Subject: Feature 10981: add Openstack metrics collector and scale-out/in DAGs for autoscaling X-Git-Tag: release-v14.0-start~10 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=9377117ccd1b86e62f1a3e8b63aee2880a5b9c55;p=osm%2FNG-SA.git Feature 10981: add Openstack metrics collector and scale-out/in DAGs for autoscaling Change-Id: Idff1974545d28208a853787d748f1839dffc69e5 Signed-off-by: aguilard --- diff --git a/osm_webhook_translator/requirements-dist.txt b/osm_webhook_translator/requirements-dist.txt index 6ddded6..64ae136 100644 --- a/osm_webhook_translator/requirements-dist.txt +++ b/osm_webhook_translator/requirements-dist.txt @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ####################################################################################### -packaging==23.0 +packaging==23.1 # via setuptools-scm setuptools-scm==7.1.0 # via -r osm_webhook_translator/requirements-dist.in diff --git a/osm_webhook_translator/requirements.txt b/osm_webhook_translator/requirements.txt index 6cf7f42..c8d78fa 100644 --- a/osm_webhook_translator/requirements.txt +++ b/osm_webhook_translator/requirements.txt @@ -22,7 +22,7 @@ charset-normalizer==3.1.0 # via requests click==8.1.3 # via uvicorn -fastapi==0.95.0 +fastapi==0.95.1 # via -r osm_webhook_translator/requirements.in h11==0.14.0 # via uvicorn @@ -32,7 +32,7 @@ idna==3.4 # requests pydantic==1.10.7 # via fastapi -requests==2.28.2 +requests==2.29.0 # via -r osm_webhook_translator/requirements.in sniffio==1.3.0 # via anyio diff --git a/requirements-dev.in b/requirements-dev.in index ed3f514..8434f0e 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -17,4 +17,4 @@ git+https://osm.etsi.org/gerrit/osm/common.git@master#egg=osm-common -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master -apache-airflow==2.4.* +apache-airflow==2.5.* diff --git a/requirements-dev.txt b/requirements-dev.txt index 973bdcb..a82e656 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -14,42 +14,50 @@ # See the License for the specific language governing permissions and # limitations under the License. ####################################################################################### +aiohttp==3.8.4 + # via apache-airflow-providers-http aiokafka==0.8.0 # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master -alembic==1.9.2 +aiosignal==1.3.1 + # via aiohttp +alembic==1.10.4 # via apache-airflow anyio==3.6.2 # via httpcore -apache-airflow==2.4.3 +apache-airflow==2.5.3 # via -r requirements-dev.in -apache-airflow-providers-common-sql==1.3.3 +apache-airflow-providers-common-sql==1.4.0 # via # apache-airflow # apache-airflow-providers-sqlite -apache-airflow-providers-ftp==3.3.0 +apache-airflow-providers-ftp==3.3.1 # via apache-airflow -apache-airflow-providers-http==4.1.1 +apache-airflow-providers-http==4.3.0 # via apache-airflow apache-airflow-providers-imap==3.1.1 # via apache-airflow -apache-airflow-providers-sqlite==3.3.1 +apache-airflow-providers-sqlite==3.3.2 # via apache-airflow apispec[yaml]==3.3.2 # via flask-appbuilder -argcomplete==2.0.0 +argcomplete==3.0.8 # via apache-airflow +asgiref==3.6.0 + # via apache-airflow-providers-http async-timeout==4.0.2 # via # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master + # aiohttp # aiokafka -attrs==22.2.0 +attrs==23.1.0 # via + # aiohttp # apache-airflow # cattrs # jsonschema -babel==2.11.0 +babel==2.12.1 # via flask-babel -blinker==1.5 +blinker==1.6.2 # via apache-airflow cachelib==0.9.0 # via @@ -64,8 +72,10 @@ certifi==2022.12.7 # requests cffi==1.15.1 # via cryptography -charset-normalizer==3.0.1 - # via requests +charset-normalizer==3.1.0 + # via + # aiohttp + # requests click==8.1.3 # via # clickclick @@ -79,13 +89,13 @@ colorlog==4.8.0 # via apache-airflow configupdater==3.1.1 # via apache-airflow -connexion[flask,swagger-ui]==2.14.2 +connexion[flask]==2.14.2 # via apache-airflow cron-descriptor==1.2.35 # via apache-airflow -croniter==1.3.8 +croniter==1.3.14 # via apache-airflow -cryptography==39.0.0 +cryptography==40.0.2 # via apache-airflow dataclasses==0.6 # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master @@ -99,9 +109,9 @@ docutils==0.19 # via python-daemon email-validator==1.3.1 # via flask-appbuilder -exceptiongroup==1.1.0 +exceptiongroup==1.1.1 # via cattrs -flask==2.2.2 +flask==2.2.4 # via # apache-airflow # connexion @@ -133,6 +143,10 @@ flask-wtf==1.1.1 # via # apache-airflow # flask-appbuilder +frozenlist==1.3.3 + # via + # aiohttp + # aiosignal graphviz==0.20.1 # via apache-airflow greenlet==2.0.2 @@ -141,23 +155,24 @@ gunicorn==20.1.0 # via apache-airflow h11==0.14.0 # via httpcore -httpcore==0.16.3 +httpcore==0.17.0 # via httpx -httpx==0.23.3 +httpx==0.24.0 # via apache-airflow idna==3.4 # via # anyio # email-validator + # httpx # requests - # rfc3986 -importlib-metadata==6.0.0 + # yarl +importlib-metadata==4.13.0 # via # alembic # apache-airflow # flask # markdown -importlib-resources==5.10.2 +importlib-resources==5.12.0 # via # alembic # apache-airflow @@ -176,7 +191,6 @@ jinja2==3.1.2 # flask # flask-babel # python-nvd3 - # swagger-ui-bundle jsonschema==4.17.3 # via # apache-airflow @@ -196,9 +210,9 @@ lockfile==0.12.2 # python-daemon mako==1.2.4 # via alembic -markdown==3.4.1 +markdown==3.4.3 # via apache-airflow -markdown-it-py==2.1.0 +markdown-it-py==2.2.0 # via # apache-airflow # mdit-py-plugins @@ -222,12 +236,16 @@ marshmallow-oneofschema==3.0.1 # via apache-airflow marshmallow-sqlalchemy==0.26.1 # via flask-appbuilder -mdit-py-plugins==0.3.3 +mdit-py-plugins==0.3.5 # via apache-airflow mdurl==0.1.2 # via markdown-it-py motor==1.3.1 # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master +multidict==6.0.4 + # via + # aiohttp + # yarl osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@master # via -r requirements-dev.in packaging==23.0 @@ -247,13 +265,13 @@ pluggy==1.0.0 # via apache-airflow prison==0.2.1 # via flask-appbuilder -psutil==5.9.4 +psutil==5.9.5 # via apache-airflow pycparser==2.21 # via cffi pycryptodome==3.17 # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master -pygments==2.14.0 +pygments==2.15.1 # via # apache-airflow # rich @@ -268,7 +286,7 @@ pymongo==3.13.0 # motor pyrsistent==0.19.3 # via jsonschema -python-daemon==2.3.2 +python-daemon==3.0.1 # via apache-airflow python-dateutil==2.8.2 # via @@ -278,11 +296,11 @@ python-dateutil==2.8.2 # pendulum python-nvd3==0.15.0 # via apache-airflow -python-slugify==8.0.0 +python-slugify==8.0.1 # via # apache-airflow # python-nvd3 -pytz==2022.7.1 +pytz==2023.3 # via # babel # flask-babel @@ -294,16 +312,16 @@ pyyaml==5.4.1 # apispec # clickclick # connexion -requests==2.28.2 +requests==2.29.0 # via # apache-airflow-providers-http # connexion # requests-toolbelt requests-toolbelt==0.10.1 # via apache-airflow-providers-http -rfc3986[idna2008]==1.5.0 - # via httpx -rich==13.3.1 +rfc3339-validator==0.1.4 + # via apache-airflow +rich==13.3.5 # via apache-airflow setproctitle==1.3.2 # via apache-airflow @@ -311,12 +329,13 @@ six==1.16.0 # via # prison # python-dateutil + # rfc3339-validator sniffio==1.3.0 # via # anyio # httpcore # httpx -sqlalchemy==1.4.46 +sqlalchemy==1.4.47 # via # alembic # apache-airflow @@ -327,44 +346,45 @@ sqlalchemy==1.4.46 # sqlalchemy-utils sqlalchemy-jsonfield==1.0.1.post0 # via apache-airflow -sqlalchemy-utils==0.39.0 +sqlalchemy-utils==0.41.1 # via flask-appbuilder -sqlparse==0.4.3 +sqlparse==0.4.4 # via apache-airflow-providers-common-sql -swagger-ui-bundle==0.0.9 - # via connexion tabulate==0.9.0 # via apache-airflow -tenacity==8.1.0 +tenacity==8.2.2 # via apache-airflow -termcolor==2.2.0 +termcolor==2.3.0 # via apache-airflow text-unidecode==1.3 # via python-slugify -typing-extensions==4.4.0 +typing-extensions==4.5.0 # via + # alembic # apache-airflow # rich uc-micro-py==1.0.1 # via linkify-it-py unicodecsv==0.14.1 # via apache-airflow -urllib3==1.26.14 +urllib3==1.26.15 # via requests -werkzeug==2.2.2 +werkzeug==2.2.3 # via # apache-airflow # connexion # flask # flask-jwt-extended # flask-login -wrapt==1.14.1 +wrapt==1.15.0 # via deprecated wtforms==3.0.1 # via # flask-appbuilder # flask-wtf -zipp==3.12.0 +yarl==1.9.2 + # via aiohttp +zipp==3.15.0 # via # importlib-metadata # importlib-resources diff --git a/requirements-dist.in b/requirements-dist.in index 03ff6e9..7929c6f 100644 --- a/requirements-dist.in +++ b/requirements-dist.in @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +packaging==23.0 stdeb setuptools-scm setuptools<60 diff --git a/requirements-dist.txt b/requirements-dist.txt index 7b15e9f..48f399b 100644 --- a/requirements-dist.txt +++ b/requirements-dist.txt @@ -15,7 +15,9 @@ # limitations under the License. ####################################################################################### packaging==23.0 - # via setuptools-scm + # via + # -r requirements-dist.in + # setuptools-scm setuptools-scm==7.1.0 # via -r requirements-dist.in stdeb==0.10.0 diff --git a/requirements-test.txt b/requirements-test.txt index fd0128d..dc5a646 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -14,9 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ####################################################################################### -coverage==7.1.0 +coverage==7.2.3 # via -r requirements-test.in -mock==5.0.1 +mock==5.0.2 # via -r requirements-test.in nose2==0.12.0 # via -r requirements-test.in diff --git a/requirements.in b/requirements.in index 6779aef..58bbbd4 100644 --- a/requirements.in +++ b/requirements.in @@ -21,6 +21,8 @@ azure-mgmt-compute gnocchiclient google-api-python-client google-auth +packaging==23.0 +prometheus-api-client prometheus-client python-ceilometerclient python-keystoneclient diff --git a/requirements.txt b/requirements.txt index fc34d3f..953edd4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ####################################################################################### -attrs==22.2.0 +attrs==23.1.0 # via cmd2 autopage==0.5.1 # via cliff @@ -33,6 +33,8 @@ azure-mgmt-compute==29.1.0 # via -r requirements.in azure-mgmt-core==1.4.0 # via azure-mgmt-compute +backports-zoneinfo==0.2.1;python_version<"3.9" + # via tzlocal cachetools==5.3.0 # via google-auth certifi==2022.12.7 @@ -47,26 +49,34 @@ cliff==4.2.0 # via gnocchiclient cmd2==2.4.3 # via cliff -cryptography==40.0.1 +contourpy==1.0.7 + # via matplotlib +cryptography==40.0.2 # via # azure-identity # msal # pyjwt +cycler==0.11.0 + # via matplotlib +dateparser==1.1.8 + # via prometheus-api-client debtcollector==2.5.0 # via # gnocchiclient # oslo-config # oslo-utils # python-keystoneclient +fonttools==4.39.3 + # via matplotlib futurist==2.4.1 # via gnocchiclient gnocchiclient==7.0.8 # via -r requirements.in google-api-core==2.11.0 # via google-api-python-client -google-api-python-client==2.84.0 +google-api-python-client==2.86.0 # via -r requirements.in -google-auth==2.17.2 +google-auth==2.17.3 # via # -r requirements.in # google-api-core @@ -76,14 +86,18 @@ google-auth-httplib2==0.1.0 # via google-api-python-client googleapis-common-protos==1.59.0 # via google-api-core +httmock==1.4.0 + # via prometheus-api-client httplib2==0.22.0 # via # google-api-python-client # google-auth-httplib2 idna==3.4 # via requests -importlib-metadata==6.3.0 +importlib-metadata==6.6.0 # via cliff +importlib-resources==5.12.0 + # via matplotlib iso8601==1.1.0 # via # gnocchiclient @@ -99,7 +113,11 @@ keystoneauth1==5.1.2 # python-ceilometerclient # python-keystoneclient # python-novaclient -msal==1.21.0 +kiwisolver==1.4.4 + # via matplotlib +matplotlib==3.7.1 + # via prometheus-api-client +msal==1.22.0 # via # azure-identity # msal-extensions @@ -115,6 +133,12 @@ netaddr==0.8.0 # oslo-utils netifaces==0.11.0 # via oslo-utils +numpy==1.24.3 + # via + # contourpy + # matplotlib + # pandas + # prometheus-api-client oauthlib==3.2.2 # via requests-oauthlib os-service-types==1.7.0 @@ -141,8 +165,12 @@ oslo-utils==6.1.0 # python-novaclient packaging==23.0 # via + # -r requirements.in + # matplotlib # oslo-utils # python-keystoneclient +pandas==2.0.1 + # via prometheus-api-client pbr==5.11.1 # via # keystoneauth1 @@ -153,6 +181,8 @@ pbr==5.11.1 # python-keystoneclient # python-novaclient # stevedore +pillow==9.5.0 + # via matplotlib portalocker==2.7.0 # via msal-extensions prettytable==0.7.2 @@ -160,17 +190,19 @@ prettytable==0.7.2 # cliff # python-ceilometerclient # python-novaclient +prometheus-api-client==0.5.3 + # via -r requirements.in prometheus-client==0.16.0 # via -r requirements.in -protobuf==4.22.1 +protobuf==4.22.3 # via # google-api-core # googleapis-common-protos -pyasn1==0.4.8 +pyasn1==0.5.0 # via # pyasn1-modules # rsa -pyasn1-modules==0.2.8 +pyasn1-modules==0.3.0 # via google-auth pycparser==2.21 # via cffi @@ -179,34 +211,47 @@ pyjwt[crypto]==2.6.0 pyparsing==3.0.9 # via # httplib2 + # matplotlib # oslo-utils pyperclip==1.8.2 # via cmd2 python-ceilometerclient==2.9.0 # via -r requirements.in python-dateutil==2.8.2 - # via gnocchiclient + # via + # dateparser + # gnocchiclient + # matplotlib + # pandas python-keystoneclient==5.1.0 # via -r requirements.in python-novaclient==18.3.0 # via -r requirements.in pytz==2023.3 # via + # dateparser # oslo-serialization # oslo-utils + # pandas +pytz-deprecation-shim==0.1.0.post0 + # via tzlocal pyyaml==5.4.1 # via # -r requirements.in # cliff # oslo-config -requests==2.28.2 +regex==2023.3.23 + # via dateparser +requests==2.29.0 # via # azure-core # google-api-core + # httmock # keystoneauth1 # msal # msrest # oslo-config + # prometheus-api-client # python-ceilometerclient # python-keystoneclient # requests-oauthlib @@ -238,6 +283,10 @@ stevedore==5.0.0 # python-novaclient typing-extensions==4.5.0 # via azure-core +tzdata==2023.3 + # via pandas +tzlocal==4.3 + # via dateparser ujson==5.7.0 # via gnocchiclient uritemplate==4.1.1 @@ -249,4 +298,6 @@ wcwidth==0.2.6 wrapt==1.15.0 # via debtcollector zipp==3.15.0 - # via importlib-metadata + # via + # importlib-metadata + # importlib-resources diff --git a/src/osm_ngsa/dags/alert_vdu.py b/src/osm_ngsa/dags/alert_vdu.py index 390460a..c314893 100644 --- a/src/osm_ngsa/dags/alert_vdu.py +++ b/src/osm_ngsa/dags/alert_vdu.py @@ -69,15 +69,19 @@ def alert_vdu(): if status == "firing": # Searching alerting rule in MongoDB logger.info( - f"Searching alert rule in MongoDB: ns_id {ns_id}, " + f"Searching healing alert rule in MongoDB: ns_id {ns_id}, " f"vnf_member_index {vnf_member_index}, " f"vdu_name {vdu_name}, " f"vm_id {vm_id}" ) alert = common_db.get_alert( - nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name + nsr_id=ns_id, + vnf_member_index=vnf_member_index, + vdu_id=None, + vdu_name=vdu_name, + action_type="healing", ) - if alert and alert["action_type"] == "healing": + if alert: logger.info("Found an alert rule:") logger.info(alert) # Update alert status @@ -166,7 +170,11 @@ def alert_vdu(): f"vm_id {vm_id}" ) alert = common_db.get_alert( - nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name + nsr_id=ns_id, + vnf_member_index=vnf_member_index, + vdu_id=None, + vdu_name=vdu_name, + action_type="healing", ) if alert: logger.info("Found an alert rule, updating status") diff --git a/src/osm_ngsa/dags/multivim_vm_metrics.py b/src/osm_ngsa/dags/multivim_vm_metrics.py new file mode 100644 index 0000000..6f03292 --- /dev/null +++ b/src/osm_ngsa/dags/multivim_vm_metrics.py @@ -0,0 +1,324 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +from datetime import datetime, timedelta +import logging +from math import ceil +from typing import Dict, List + +from airflow import DAG +from airflow.decorators import task +from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config +from osm_mon.vim_connectors.openstack import OpenStackCollector +from prometheus_client import CollectorRegistry, Gauge, push_to_gateway + + +SCHEDULE_INTERVAL = 5 +COLLECTOR_MAX_METRICS_PER_TASK = 100 +SUPPORTED_VIM_TYPES = ["openstack", "vio"] +PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091" +PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_" +PROMETHEUS_METRICS = { + "cpu_utilization": { + "metric_name": "cpu_utilization", + "metric_descr": "CPU usage percentage", + }, + "average_memory_utilization": { + "metric_name": "average_memory_utilization", + "metric_descr": "Volume of RAM in MB used by the VM", + }, + "disk_read_ops": { + "metric_name": "disk_read_ops", + "metric_descr": "Number of read requests", + }, + "disk_write_ops": { + "metric_name": "disk_write_ops", + "metric_descr": "Number of write requests", + }, + "disk_read_bytes": { + "metric_name": "disk_read_bytes", + "metric_descr": "Volume of reads in bytes", + }, + "disk_write_bytes": { + "metric_name": "disk_write_bytes", + "metric_descr": "Volume of writes in bytes", + }, + "packets_received": { + "metric_name": "packets_received", + "metric_descr": "Number of incoming packets", + }, + "packets_sent": { + "metric_name": "packets_sent", + "metric_descr": "Number of outgoing packets", + }, + "packets_in_dropped": { + "metric_name": "packets_in_dropped", + "metric_descr": "Number of incoming dropped packets", + }, + "packets_out_dropped": { + "metric_name": "packets_out_dropped", + "metric_descr": "Number of outgoing dropped packets", + }, +} + +# Logging +logger = logging.getLogger("airflow.task") + + +def get_all_vim(): + """Get VIMs from MongoDB""" + logger.info("Getting VIM list") + + cfg = Config() + logger.info(cfg.conf) + common_db = CommonDbClient(cfg) + vim_accounts = common_db.get_vim_accounts() + vim_list = [] + for vim in vim_accounts: + logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})') + vim_list.append( + {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]} + ) + + logger.info(vim_list) + logger.info("Getting VIM list OK") + return vim_list + + +def create_dag(dag_id, dag_number, dag_description, vim_id): + dag = DAG( + dag_id, + catchup=False, + default_args={ + "depends_on_past": False, + "retries": 1, + # "retry_delay": timedelta(minutes=1), + "retry_delay": timedelta(seconds=10), + }, + description=dag_description, + is_paused_upon_creation=False, + # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL), + schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *", + start_date=datetime(2022, 1, 1), + tags=["osm", "vdu"], + ) + + with dag: + + @task(task_id="extract_metrics_from_vnfrs") + def extract_metrics_from_vnfrs(vim_id: str): + """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB""" + + # Get VNFDs that include "monitoring-parameter" from MongoDB + cfg = Config() + common_db = CommonDbClient(cfg) + logger.info("Getting VNFDs with monitoring parameters from MongoDB") + vnfd_list = common_db.get_monitoring_vnfds() + # Get VNFR list from MongoDB + logger.info("Getting VNFR list from MongoDB") + vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id) + # Only read metrics if ns state is one of the nsAllowedStatesSet + nsAllowedStatesSet = {"INSTANTIATED"} + metric_list = [] + for vnfr in vnfr_list: + if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet: + continue + # Check if VNFR is in "monitoring-parameter" VNFDs list + vnfd_id = vnfr["vnfd-id"] + vnfd = next( + (item for item in vnfd_list if item["_id"] == vnfd_id), None + ) + if not vnfd: + continue + ns_id = vnfr["nsr-id-ref"] + vnf_index = vnfr["member-vnf-index-ref"] + logger.info( + f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter" + ) + project_list = vnfr.get("_admin", {}).get("projects_read", []) + project_id = "None" + if project_list: + project_id = project_list[0] + for vdur in vnfr.get("vdur", []): + vim_info = vdur.get("vim_info") + if not vim_info: + logger.error("Error: vim_info not available in vdur") + continue + if len(vim_info) != 1: + logger.error("Error: more than one vim_info in vdur") + continue + vim_id = next(iter(vim_info))[4:] + vm_id = vdur.get("vim-id") + if not vm_id: + logger.error("Error: vim-id not available in vdur") + continue + vdu_name = vdur.get("name", "UNKNOWN") + vdu = next( + filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"]) + ) + if "monitoring-parameter" not in vdu: + logger.error("Error: no monitoring-parameter in descriptor") + continue + for param in vdu["monitoring-parameter"]: + metric_name = param["performance-metric"] + metric_id = param["id"] + metric = { + "metric": metric_name, + "metric_id": metric_id, + "vm_id": vm_id, + "ns_id": ns_id, + "project_id": project_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_index, + "vdu_id": vdu["id"], + } + metric_list.append(metric) + + logger.info(f"Metrics to collect: {len(metric_list)}") + return metric_list + + @task(task_id="split_metrics") + def split_metrics(metric_list: List[Dict]): + """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK""" + n_metrics = len(metric_list) + if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK: + return [metric_list] + metrics_per_chunk = ceil( + n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK) + ) + logger.info( + f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items" + ) + chunks = [] + for i in range(0, n_metrics, metrics_per_chunk): + chunks.append(metric_list[i : i + metrics_per_chunk]) + return chunks + + @task(task_id="collect_metrics") + def collect_metrics(vim_id: str, metric_list: List[Dict]): + """Collect servers metrics""" + if not metric_list: + return [] + + # Get VIM account info from MongoDB + logger.info(f"Reading VIM info, id: {vim_id}") + cfg = Config() + common_db = CommonDbClient(cfg) + vim_account = common_db.get_vim_account(vim_account_id=vim_id) + # Create VIM metrics collector + vim_type = vim_account["vim_type"] + if "config" in vim_account and "vim_type" in vim_account["config"]: + vim_type = vim_account["config"]["vim_type"].lower() + if vim_type == "vio" and "vrops_site" not in vim_account["config"]: + vim_type = "openstack" + if vim_type == "openstack": + collector = OpenStackCollector(vim_account) + else: + logger.error(f"VIM type '{vim_type}' not supported") + return None + # Get metrics + results = [] + if collector: + results = collector.collect_metrics(metric_list) + logger.info(results) + return results + + @task(task_id="send_prometheus") + def send_prometheus(metric_lists: List[List[Dict]]): + """Send servers metrics to Prometheus Push Gateway""" + logger.info(metric_lists) + + # Define Prometheus metrics + registry = CollectorRegistry() + prom_metrics = {} + prom_metrics_keys = PROMETHEUS_METRICS.keys() + for key in prom_metrics_keys: + prom_metrics[key] = Gauge( + PROMETHEUS_METRICS[key]["metric_name"], + PROMETHEUS_METRICS[key]["metric_descr"], + labelnames=[ + "metric_id", + "ns_id", + "project_id", + "vnf_member_index", + "vm_id", + "vim_id", + "vdu_name", + "vdu_id", + ], + registry=registry, + ) + + for metric_list in metric_lists: + for metric in metric_list: + metric_name = metric["metric"] + metric_id = metric["metric_id"] + value = metric["value"] + vm_id = metric["vm_id"] + vm_name = metric.get("vdu_name", "") + ns_id = metric["ns_id"] + project_id = metric["project_id"] + vnf_index = metric["vnf_member_index"] + vdu_id = metric["vdu_id"] + logger.info( + f" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} " + ) + if metric_name in prom_metrics_keys: + prom_metrics[metric_name].labels( + metric_id, + ns_id, + project_id, + vnf_index, + vm_id, + vim_id, + vm_name, + vdu_id, + ).set(value) + + # Push to Prometheus + push_to_gateway( + gateway=PROMETHEUS_PUSHGW, + job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}", + registry=registry, + ) + return + + chunks = split_metrics(extract_metrics_from_vnfrs(vim_id)) + send_prometheus( + collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks) + ) + + return dag + + +vim_list = get_all_vim() +for index, vim in enumerate(vim_list): + vim_type = vim["vim_type"] + if vim_type in SUPPORTED_VIM_TYPES: + vim_id = vim["_id"] + vim_name = vim["name"] + dag_description = f"Dag for collecting VM metrics from VIM {vim_name}" + dag_id = f"vm_metrics_vim_{vim_id}" + logger.info(f"Creating DAG {dag_id}") + globals()[dag_id] = create_dag( + dag_id=dag_id, + dag_number=index, + dag_description=dag_description, + vim_id=vim_id, + ) + else: + logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics") diff --git a/src/osm_ngsa/dags/scalein_vdu.py b/src/osm_ngsa/dags/scalein_vdu.py new file mode 100644 index 0000000..c5daefd --- /dev/null +++ b/src/osm_ngsa/dags/scalein_vdu.py @@ -0,0 +1,211 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +import asyncio +from datetime import datetime, timedelta +import logging +import time +import uuid + +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config +from osm_mon.core.message_bus_client import MessageBusClient + +# Logging +logger = logging.getLogger("airflow.task") + + +@dag( + catchup=False, + default_args={ + "depends_on_past": False, + "retries": 1, + "retry_delay": timedelta(seconds=15), + }, + description="Webhook callback for scale-in alarm from Prometheus AlertManager", + is_paused_upon_creation=False, + schedule_interval=None, + start_date=datetime(2022, 1, 1), + tags=["osm", "webhook"], +) +def scalein_vdu(): + @task(task_id="main_task") + def main_task(): + logger.debug("Running main task...") + # Read input parameters + context = get_current_context() + conf = context["dag_run"].conf + for alarm in conf["alerts"]: + logger.info("Scale-in alarm:") + status = alarm["status"] + logger.info(f" status: {status}") + logger.info(f' annotations: {alarm["annotations"]}') + logger.info(f' startsAt: {alarm["startsAt"]}') + logger.info(f' endsAt: {alarm["endsAt"]}') + logger.info(f' labels: {alarm["labels"]}') + alertname = alarm["labels"].get("alertname") + if not alertname.startswith("scalein_"): + continue + # scalein_vdu alert type + config = Config() + common_db = CommonDbClient(config) + ns_id = alarm["labels"]["ns_id"] + vdu_id = alarm["labels"]["vdu_id"] + vnf_member_index = alarm["labels"]["vnf_member_index"] + if status == "firing": + # Searching alerting rule in MongoDB + logger.info( + f"Searching scale-in alert rule in MongoDB: ns_id {ns_id}, " + f"vnf_member_index {vnf_member_index}, " + f"vdu_id {vdu_id}, " + ) + alert = common_db.get_alert( + nsr_id=ns_id, + vnf_member_index=vnf_member_index, + vdu_id=vdu_id, + vdu_name=None, + action_type="scale_in", + ) + if alert: + logger.info("Found an alert rule:") + logger.info(alert) + # Update alert status + common_db.update_alert_status( + uuid=alert["uuid"], alarm_status="alarm" + ) + # Get VNFR from MongoDB + vnfr = common_db.get_vnfr( + nsr_id=ns_id, member_index=vnf_member_index + ) + logger.info( + f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}" + ) + # Check cooldown-time before scale-in + send_lcm = 1 + if "cooldown-time" in alert["action"]: + cooldown_time = alert["action"]["cooldown-time"] + cooldown_time = cooldown_time * 60 + now = time.time() + since = now - cooldown_time + logger.info( + f"Looking for scale operations in cooldown interval ({cooldown_time} s)" + ) + nslcmops = common_db.get_nslcmop( + nsr_id=ns_id, operation_type="scale", since=since + ) + op = next( + ( + sub + for sub in nslcmops + if ("scaleVnfData" in sub["operationParams"]) + and ( + "scaleByStepData" + in sub["operationParams"]["scaleVnfData"] + ) + and ( + "member-vnf-index" + in sub["operationParams"]["scaleVnfData"][ + "scaleByStepData" + ] + ) + and ( + sub["operationParams"]["scaleVnfData"][ + "scaleByStepData" + ]["member-vnf-index"] + == vnf_member_index + ) + ), + None, + ) + if op: + logger.info( + f"No scale-in will be launched, found a previous scale operation in cooldown interval: {op}" + ) + send_lcm = 0 + + if send_lcm: + # Save nslcmop object in MongoDB + msg_bus = MessageBusClient(config) + loop = asyncio.get_event_loop() + _id = str(uuid.uuid4()) + now = time.time() + projects_read = vnfr["_admin"]["projects_read"] + projects_write = vnfr["_admin"]["projects_write"] + scaling_group = alert["action"]["scaling-group"] + params = { + "scaleType": "SCALE_VNF", + "scaleVnfData": { + "scaleVnfType": "SCALE_IN", + "scaleByStepData": { + "scaling-group-descriptor": scaling_group, + "member-vnf-index": vnf_member_index, + }, + }, + "scaleTime": "{}Z".format(datetime.utcnow().isoformat()), + } + nslcmop = { + "id": _id, + "_id": _id, + "operationState": "PROCESSING", + "statusEnteredTime": now, + "nsInstanceId": ns_id, + "lcmOperationType": "scale", + "startTime": now, + "isAutomaticInvocation": True, + "operationParams": params, + "isCancelPending": False, + "links": { + "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id, + "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id, + }, + "_admin": { + "projects_read": projects_read, + "projects_write": projects_write, + }, + } + common_db.create_nslcmop(nslcmop) + # Send Kafka message to LCM + logger.info("Sending scale-in action message:") + logger.info(nslcmop) + loop.run_until_complete( + msg_bus.aiowrite("ns", "scale", nslcmop) + ) + else: + logger.info("No alert rule was found") + elif status == "resolved": + # Searching alerting rule in MongoDB + logger.info( + f"Searching alert rule in MongoDB: ns_id {ns_id}, " + f"vnf_member_index {vnf_member_index}, " + ) + alert = common_db.get_alert( + nsr_id=ns_id, + vnf_member_index=vnf_member_index, + vdu_id=vdu_id, + vdu_name=None, + action_type="scale_in", + ) + if alert: + logger.info("Found an alert rule, updating status") + # Update alert status + common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok") + + main_task() + + +dag = scalein_vdu() diff --git a/src/osm_ngsa/dags/scaleout_vdu.py b/src/osm_ngsa/dags/scaleout_vdu.py new file mode 100644 index 0000000..978ab3f --- /dev/null +++ b/src/osm_ngsa/dags/scaleout_vdu.py @@ -0,0 +1,210 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +import asyncio +from datetime import datetime, timedelta +import logging +import time +import uuid + +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config +from osm_mon.core.message_bus_client import MessageBusClient + +# Logging +logger = logging.getLogger("airflow.task") + + +@dag( + catchup=False, + default_args={ + "depends_on_past": False, + "retries": 1, + "retry_delay": timedelta(seconds=15), + }, + description="Webhook callback for scale-out alarm from Prometheus AlertManager", + is_paused_upon_creation=False, + schedule_interval=None, + start_date=datetime(2022, 1, 1), + tags=["osm", "webhook"], +) +def scaleout_vdu(): + @task(task_id="main_task") + def main_task(): + logger.debug("Running main task...") + # Read input parameters + context = get_current_context() + conf = context["dag_run"].conf + for alarm in conf["alerts"]: + logger.info("Scale-out alarm:") + status = alarm["status"] + logger.info(f" status: {status}") + logger.info(f' annotations: {alarm["annotations"]}') + logger.info(f' startsAt: {alarm["startsAt"]}') + logger.info(f' endsAt: {alarm["endsAt"]}') + logger.info(f' labels: {alarm["labels"]}') + alertname = alarm["labels"].get("alertname") + if not alertname.startswith("scaleout_"): + continue + # scaleout_vdu alert type + config = Config() + common_db = CommonDbClient(config) + ns_id = alarm["labels"]["ns_id"] + vdu_id = alarm["labels"]["vdu_id"] + vnf_member_index = alarm["labels"]["vnf_member_index"] + if status == "firing": + # Searching alerting rule in MongoDB + logger.info( + f"Searching scale-out alert rule in MongoDB: ns_id {ns_id}, " + f"vnf_member_index {vnf_member_index}, " + f"vdu_id {vdu_id}, " + ) + alert = common_db.get_alert( + nsr_id=ns_id, + vnf_member_index=vnf_member_index, + vdu_id=vdu_id, + vdu_name=None, + action_type="scale_out", + ) + if alert: + logger.info("Found an alert rule:") + logger.info(alert) + # Update alert status + common_db.update_alert_status( + uuid=alert["uuid"], alarm_status="alarm" + ) + # Get VNFR from MongoDB + vnfr = common_db.get_vnfr( + nsr_id=ns_id, member_index=vnf_member_index + ) + logger.info( + f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}" + ) + # Check cooldown-time before scale-out + send_lcm = 1 + if "cooldown-time" in alert["action"]: + cooldown_time = alert["action"]["cooldown-time"] + cooldown_time = cooldown_time * 60 + now = time.time() + since = now - cooldown_time + logger.info( + f"Looking for scale operations in cooldown interval ({cooldown_time} s)" + ) + nslcmops = common_db.get_nslcmop( + nsr_id=ns_id, operation_type="scale", since=since + ) + op = next( + ( + sub + for sub in nslcmops + if ("scaleVnfData" in sub["operationParams"]) + and ( + "scaleByStepData" + in sub["operationParams"]["scaleVnfData"] + ) + and ( + "member-vnf-index" + in sub["operationParams"]["scaleVnfData"][ + "scaleByStepData" + ] + ) + and ( + sub["operationParams"]["scaleVnfData"][ + "scaleByStepData" + ]["member-vnf-index"] + == vnf_member_index + ) + ), + None, + ) + if op: + logger.info( + f"No scale-out will be launched, found a previous scale operation in cooldown interval: {op}" + ) + send_lcm = 0 + + if send_lcm: + # Save nslcmop object in MongoDB + msg_bus = MessageBusClient(config) + loop = asyncio.get_event_loop() + _id = str(uuid.uuid4()) + projects_read = vnfr["_admin"]["projects_read"] + projects_write = vnfr["_admin"]["projects_write"] + scaling_group = alert["action"]["scaling-group"] + params = { + "scaleType": "SCALE_VNF", + "scaleVnfData": { + "scaleVnfType": "SCALE_OUT", + "scaleByStepData": { + "scaling-group-descriptor": scaling_group, + "member-vnf-index": vnf_member_index, + }, + }, + "scaleTime": "{}Z".format(datetime.utcnow().isoformat()), + } + nslcmop = { + "id": _id, + "_id": _id, + "operationState": "PROCESSING", + "statusEnteredTime": now, + "nsInstanceId": ns_id, + "lcmOperationType": "scale", + "startTime": now, + "isAutomaticInvocation": True, + "operationParams": params, + "isCancelPending": False, + "links": { + "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id, + "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id, + }, + "_admin": { + "projects_read": projects_read, + "projects_write": projects_write, + }, + } + common_db.create_nslcmop(nslcmop) + # Send Kafka message to LCM + logger.info("Sending scale-out action message:") + logger.info(nslcmop) + loop.run_until_complete( + msg_bus.aiowrite("ns", "scale", nslcmop) + ) + else: + logger.info("No alert rule was found") + elif status == "resolved": + # Searching alerting rule in MongoDB + logger.info( + f"Searching alert rule in MongoDB: ns_id {ns_id}, " + f"vnf_member_index {vnf_member_index}, " + ) + alert = common_db.get_alert( + nsr_id=ns_id, + vnf_member_index=vnf_member_index, + vdu_id=vdu_id, + vdu_name=None, + action_type="scale_out", + ) + if alert: + logger.info("Found an alert rule, updating status") + # Update alert status + common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok") + + main_task() + + +dag = scaleout_vdu() diff --git a/src/osm_ngsa/osm_mon/core/common_db.py b/src/osm_ngsa/osm_mon/core/common_db.py index 93254b1..465bb0d 100644 --- a/src/osm_ngsa/osm_mon/core/common_db.py +++ b/src/osm_ngsa/osm_mon/core/common_db.py @@ -54,6 +54,14 @@ class CommonDbClient: nsr = self.common_db.get_one("nsrs", {"id": nsr_id}) return nsr + def get_vnfds(self): + return self.common_db.get_list("vnfds") + + def get_monitoring_vnfds(self): + return self.common_db.get_list( + "vnfds", {"vdu.monitoring-parameter": {"$exists": "true"}} + ) + def decrypt_vim_password(self, vim_password: str, schema_version: str, vim_id: str): return self.common_db.decrypt(vim_password, schema_version, vim_id) @@ -89,14 +97,25 @@ class CommonDbClient: ) return vim_account - def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str): + def get_alert( + self, + nsr_id: str, + vnf_member_index: str, + vdu_id: str, + vdu_name: str, + action_type: str, + ): + q_filter = {"action_type": action_type} + if nsr_id: + q_filter["tags.ns_id"] = nsr_id + if vnf_member_index: + q_filter["tags.vnf_member_index"] = vnf_member_index + if vdu_id: + q_filter["tags.vdu_id"] = vdu_id + if vdu_name: + q_filter["tags.vdu_name"] = vdu_name alert = self.common_db.get_one( - "alerts", - { - "tags.ns_id": nsr_id, - "tags.vnf_member_index": vnf_member_index, - "tags.vdu_name": vdu_name, - }, + table="alerts", q_filter=q_filter, fail_on_empty=False ) return alert @@ -108,3 +127,14 @@ class CommonDbClient: def create_nslcmop(self, nslcmop: dict): self.common_db.create("nslcmops", nslcmop) + + def get_nslcmop(self, nsr_id: str, operation_type: str, since: str): + q_filter = {} + if nsr_id: + q_filter["nsInstanceId"] = nsr_id + if operation_type: + q_filter["lcmOperationType"] = operation_type + if since: + q_filter["startTime"] = {"$gt": since} + ops = self.common_db.get_list(table="nslcmops", q_filter=q_filter) + return ops diff --git a/src/osm_ngsa/osm_mon/vim_connectors/openstack.py b/src/osm_ngsa/osm_mon/vim_connectors/openstack.py index d37973d..1eb33af 100644 --- a/src/osm_ngsa/osm_mon/vim_connectors/openstack.py +++ b/src/osm_ngsa/osm_mon/vim_connectors/openstack.py @@ -14,16 +14,75 @@ # See the License for the specific language governing permissions and # limitations under the License. ####################################################################################### +from enum import Enum import logging +import time from typing import Dict, List +from ceilometerclient import client as ceilometer_client +from ceilometerclient.exc import HTTPException +import gnocchiclient.exceptions +from gnocchiclient.v1 import client as gnocchi_client from keystoneauth1 import session +from keystoneauth1.exceptions.catalog import EndpointNotFound from keystoneauth1.identity import v3 from novaclient import client as nova_client from osm_mon.vim_connectors.base_vim import VIMConnector +from prometheus_api_client import PrometheusConnect as prometheus_client log = logging.getLogger(__name__) +METRIC_MULTIPLIERS = {"cpu": 0.0000001} + +METRIC_AGGREGATORS = {"cpu": "rate:mean"} + +INTERFACE_METRICS = [ + "packets_in_dropped", + "packets_out_dropped", + "packets_received", + "packets_sent", +] + +INSTANCE_DISK = [ + "disk_read_ops", + "disk_write_ops", + "disk_read_bytes", + "disk_write_bytes", +] + +METRIC_MAPPINGS = { + "average_memory_utilization": "memory.usage", + "disk_read_ops": "disk.device.read.requests", + "disk_write_ops": "disk.device.write.requests", + "disk_read_bytes": "disk.device.read.bytes", + "disk_write_bytes": "disk.device.write.bytes", + "packets_in_dropped": "network.outgoing.packets.drop", + "packets_out_dropped": "network.incoming.packets.drop", + "packets_received": "network.incoming.packets", + "packets_sent": "network.outgoing.packets", + "cpu_utilization": "cpu", +} + +METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD = { + "cpu_utilization": "cpu", + "average_memory_utilization": "memory_usage", + "disk_read_ops": "disk_device_read_requests", + "disk_write_ops": "disk_device_write_requests", + "disk_read_bytes": "disk_device_read_bytes", + "disk_write_bytes": "disk_device_write_bytes", + "packets_in_dropped": "network_incoming_packets_drop", + "packets_out_dropped": "network_outgoing_packets_drop", + "packets_received": "network_incoming_packets", + "packets_sent": "network_outgoing_packets", +} + + +class MetricType(Enum): + INSTANCE = "instance" + INTERFACE_ALL = "interface_all" + INTERFACE_ONE = "interface_one" + INSTANCEDISK = "instancedisk" + class CertificateNotCreated(Exception): pass @@ -31,13 +90,16 @@ class CertificateNotCreated(Exception): class OpenStackCollector(VIMConnector): def __init__(self, vim_account: Dict): - log.info("__init__") + log.debug("__init__") self.vim_account = vim_account self.vim_session = None self.vim_session = self._get_session(vim_account) self.nova = self._build_nova_client() + # self.gnocchi = self._build_gnocchi_client() + self.backend = self._get_backend(vim_account, self.vim_session) def _get_session(self, creds: Dict): + log.debug("_get_session") verify_ssl = True project_domain_name = "Default" user_domain_name = "Default" @@ -67,11 +129,35 @@ class OpenStackCollector(VIMConnector): except CertificateNotCreated as e: log.error(e) + def _get_backend(self, vim_account: dict, vim_session: object): + if vim_account.get("prometheus-config"): + # try: + # tsbd = PrometheusTSBDBackend(vim_account) + # log.debug("Using prometheustsbd backend to collect metric") + # return tsbd + # except Exception as e: + # log.error(f"Can't create prometheus client, {e}") + # return None + return None + try: + gnocchi = GnocchiBackend(vim_account, vim_session) + gnocchi.client.metric.list(limit=1) + log.debug("Using gnocchi backend to collect metric") + return gnocchi + except (HTTPException, EndpointNotFound): + ceilometer = CeilometerBackend(vim_account, vim_session) + ceilometer.client.capabilities.get() + log.debug("Using ceilometer backend to collect metric") + return ceilometer + def _build_nova_client(self) -> nova_client.Client: return nova_client.Client("2", session=self.vim_session, timeout=10) + def _build_gnocchi_client(self) -> gnocchi_client.Client: + return gnocchi_client.Client(session=self.vim_session) + def collect_servers_status(self) -> List[Dict]: - log.info("collect_servers_status") + log.debug("collect_servers_status") servers = [] for server in self.nova.servers.list(detailed=True): vm = { @@ -83,9 +169,252 @@ class OpenStackCollector(VIMConnector): return servers def is_vim_ok(self) -> bool: + log.debug("is_vim_ok") try: self.nova.servers.list() return True except Exception as e: log.warning("VIM status is not OK: %s" % e) return False + + def _get_metric_type(self, metric_name: str) -> MetricType: + if metric_name not in INTERFACE_METRICS: + if metric_name not in INSTANCE_DISK: + return MetricType.INSTANCE + else: + return MetricType.INSTANCEDISK + else: + return MetricType.INTERFACE_ALL + + def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]: + log.debug("collect_metrics") + if not self.backend: + log.error("Undefined backend") + return [] + + if type(self.backend) is PrometheusTSBDBackend: + log.info("Using Prometheus as backend (NOT SUPPORTED)") + return [] + + metric_results = [] + for metric in metric_list: + server = metric["vm_id"] + metric_name = metric["metric"] + openstack_metric_name = METRIC_MAPPINGS[metric_name] + metric_type = self._get_metric_type(metric_name) + log.info(f"Collecting metric {openstack_metric_name} for {server}") + try: + value = self.backend.collect_metric( + metric_type, openstack_metric_name, server + ) + if value is not None: + log.info(f"value: {value}") + metric["value"] = value + metric_results.append(metric) + else: + log.info("metric value is empty") + except Exception as e: + log.error("Error in metric collection: %s" % e) + return metric_results + + +class OpenstackBackend: + def collect_metric( + self, metric_type: MetricType, metric_name: str, resource_id: str + ): + pass + + +class PrometheusTSBDBackend(OpenstackBackend): + def __init__(self, vim_account: dict): + self.map = self._build_map(vim_account) + self.cred = vim_account["prometheus-config"].get("prometheus-cred") + self.client = self._build_prometheus_client( + vim_account["prometheus-config"]["prometheus-url"] + ) + + def _build_prometheus_client(self, url: str) -> prometheus_client: + return prometheus_client(url, disable_ssl=True) + + def _build_map(self, vim_account: dict) -> dict: + custom_map = METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD + if "prometheus-map" in vim_account["prometheus-config"]: + custom_map.update(vim_account["prometheus-config"]["prometheus-map"]) + return custom_map + + def collect_metric( + self, metric_type: MetricType, metric_name: str, resource_id: str + ): + metric = self.query_metric(metric_name, resource_id) + return metric["value"][1] if metric else None + + def map_metric(self, metric_name: str): + return self.map[metric_name] + + def query_metric(self, metric_name, resource_id=None): + metrics = self.client.get_current_metric_value(metric_name=metric_name) + if resource_id: + metric = next( + filter(lambda x: resource_id in x["metric"]["resource_id"], metrics) + ) + return metric + return metrics + + +class GnocchiBackend(OpenstackBackend): + def __init__(self, vim_account: dict, vim_session: object): + self.client = self._build_gnocchi_client(vim_account, vim_session) + + def _build_gnocchi_client( + self, vim_account: dict, vim_session: object + ) -> gnocchi_client.Client: + return gnocchi_client.Client(session=vim_session) + + def collect_metric( + self, metric_type: MetricType, metric_name: str, resource_id: str + ): + if metric_type == MetricType.INTERFACE_ALL: + return self._collect_interface_all_metric(metric_name, resource_id) + + elif metric_type == MetricType.INSTANCE: + return self._collect_instance_metric(metric_name, resource_id) + + elif metric_type == MetricType.INSTANCEDISK: + return self._collect_instance_disk_metric(metric_name, resource_id) + + else: + raise Exception("Unknown metric type %s" % metric_type.value) + + def _collect_interface_all_metric(self, openstack_metric_name, resource_id): + total_measure = None + interfaces = self.client.resource.search( + resource_type="instance_network_interface", + query={"=": {"instance_id": resource_id}}, + ) + for interface in interfaces: + try: + measures = self.client.metric.get_measures( + openstack_metric_name, resource_id=interface["id"], limit=1 + ) + if measures: + if not total_measure: + total_measure = 0.0 + total_measure += measures[-1][2] + except (gnocchiclient.exceptions.NotFound, TypeError) as e: + # Gnocchi in some Openstack versions raise TypeError instead of NotFound + log.debug( + "No metric %s found for interface %s: %s", + openstack_metric_name, + interface["id"], + e, + ) + return total_measure + + def _collect_instance_disk_metric(self, openstack_metric_name, resource_id): + value = None + instances = self.client.resource.search( + resource_type="instance_disk", + query={"=": {"instance_id": resource_id}}, + ) + for instance in instances: + try: + measures = self.client.metric.get_measures( + openstack_metric_name, resource_id=instance["id"], limit=1 + ) + if measures: + value = measures[-1][2] + + except gnocchiclient.exceptions.NotFound as e: + log.debug( + "No metric %s found for instance disk %s: %s", + openstack_metric_name, + instance["id"], + e, + ) + return value + + def _collect_instance_metric(self, openstack_metric_name, resource_id): + value = None + try: + aggregation = METRIC_AGGREGATORS.get(openstack_metric_name) + + try: + measures = self.client.metric.get_measures( + openstack_metric_name, + aggregation=aggregation, + start=time.time() - 1200, + resource_id=resource_id, + ) + if measures: + value = measures[-1][2] + except ( + gnocchiclient.exceptions.NotFound, + gnocchiclient.exceptions.BadRequest, + TypeError, + ) as e: + # CPU metric in previous Openstack versions do not support rate:mean aggregation method + # Gnocchi in some Openstack versions raise TypeError instead of NotFound or BadRequest + if openstack_metric_name == "cpu": + log.debug( + "No metric %s found for instance %s: %s", + openstack_metric_name, + resource_id, + e, + ) + log.info( + "Retrying to get metric %s for instance %s without aggregation", + openstack_metric_name, + resource_id, + ) + measures = self.client.metric.get_measures( + openstack_metric_name, resource_id=resource_id, limit=1 + ) + else: + raise e + # measures[-1] is the last measure + # measures[-2] is the previous measure + # measures[x][2] is the value of the metric + if measures and len(measures) >= 2: + value = measures[-1][2] - measures[-2][2] + if value: + # measures[-1][0] is the time of the reporting interval + # measures[-1][1] is the duration of the reporting interval + if aggregation: + # If this is an aggregate, we need to divide the total over the reported time period. + # Even if the aggregation method is not supported by Openstack, the code will execute it + # because aggregation is specified in METRIC_AGGREGATORS + value = value / measures[-1][1] + if openstack_metric_name in METRIC_MULTIPLIERS: + value = value * METRIC_MULTIPLIERS[openstack_metric_name] + except gnocchiclient.exceptions.NotFound as e: + log.debug( + "No metric %s found for instance %s: %s", + openstack_metric_name, + resource_id, + e, + ) + return value + + +class CeilometerBackend(OpenstackBackend): + def __init__(self, vim_account: dict, vim_session: object): + self.client = self._build_ceilometer_client(vim_account, vim_session) + + def _build_ceilometer_client( + self, vim_account: dict, vim_session: object + ) -> ceilometer_client.Client: + return ceilometer_client.Client("2", session=vim_session) + + def collect_metric( + self, metric_type: MetricType, metric_name: str, resource_id: str + ): + if metric_type != MetricType.INSTANCE: + raise NotImplementedError( + "Ceilometer backend only support instance metrics" + ) + measures = self.client.samples.list( + meter_name=metric_name, + limit=1, + q=[{"field": "resource_id", "op": "eq", "value": resource_id}], + ) + return measures[0].counter_volume if measures else None