# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-packaging==23.0
+packaging==23.1
# via setuptools-scm
setuptools-scm==7.1.0
# via -r osm_webhook_translator/requirements-dist.in
# via requests
click==8.1.3
# via uvicorn
-fastapi==0.95.0
+fastapi==0.95.1
# via -r osm_webhook_translator/requirements.in
h11==0.14.0
# via uvicorn
# requests
pydantic==1.10.7
# via fastapi
-requests==2.28.2
+requests==2.29.0
# via -r osm_webhook_translator/requirements.in
sniffio==1.3.0
# via anyio
git+https://osm.etsi.org/gerrit/osm/common.git@master#egg=osm-common
-r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-apache-airflow==2.4.*
+apache-airflow==2.5.*
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
+aiohttp==3.8.4
+ # via apache-airflow-providers-http
aiokafka==0.8.0
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-alembic==1.9.2
+aiosignal==1.3.1
+ # via aiohttp
+alembic==1.10.4
# via apache-airflow
anyio==3.6.2
# via httpcore
-apache-airflow==2.4.3
+apache-airflow==2.5.3
# via -r requirements-dev.in
-apache-airflow-providers-common-sql==1.3.3
+apache-airflow-providers-common-sql==1.4.0
# via
# apache-airflow
# apache-airflow-providers-sqlite
-apache-airflow-providers-ftp==3.3.0
+apache-airflow-providers-ftp==3.3.1
# via apache-airflow
-apache-airflow-providers-http==4.1.1
+apache-airflow-providers-http==4.3.0
# via apache-airflow
apache-airflow-providers-imap==3.1.1
# via apache-airflow
-apache-airflow-providers-sqlite==3.3.1
+apache-airflow-providers-sqlite==3.3.2
# via apache-airflow
apispec[yaml]==3.3.2
# via flask-appbuilder
-argcomplete==2.0.0
+argcomplete==3.0.8
# via apache-airflow
+asgiref==3.6.0
+ # via apache-airflow-providers-http
async-timeout==4.0.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+ # aiohttp
# aiokafka
-attrs==22.2.0
+attrs==23.1.0
# via
+ # aiohttp
# apache-airflow
# cattrs
# jsonschema
-babel==2.11.0
+babel==2.12.1
# via flask-babel
-blinker==1.5
+blinker==1.6.2
# via apache-airflow
cachelib==0.9.0
# via
# requests
cffi==1.15.1
# via cryptography
-charset-normalizer==3.0.1
- # via requests
+charset-normalizer==3.1.0
+ # via
+ # aiohttp
+ # requests
click==8.1.3
# via
# clickclick
# via apache-airflow
configupdater==3.1.1
# via apache-airflow
-connexion[flask,swagger-ui]==2.14.2
+connexion[flask]==2.14.2
# via apache-airflow
cron-descriptor==1.2.35
# via apache-airflow
-croniter==1.3.8
+croniter==1.3.14
# via apache-airflow
-cryptography==39.0.0
+cryptography==40.0.2
# via apache-airflow
dataclasses==0.6
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# via python-daemon
email-validator==1.3.1
# via flask-appbuilder
-exceptiongroup==1.1.0
+exceptiongroup==1.1.1
# via cattrs
-flask==2.2.2
+flask==2.2.4
# via
# apache-airflow
# connexion
# via
# apache-airflow
# flask-appbuilder
+frozenlist==1.3.3
+ # via
+ # aiohttp
+ # aiosignal
graphviz==0.20.1
# via apache-airflow
greenlet==2.0.2
# via apache-airflow
h11==0.14.0
# via httpcore
-httpcore==0.16.3
+httpcore==0.17.0
# via httpx
-httpx==0.23.3
+httpx==0.24.0
# via apache-airflow
idna==3.4
# via
# anyio
# email-validator
+ # httpx
# requests
- # rfc3986
-importlib-metadata==6.0.0
+ # yarl
+importlib-metadata==4.13.0
# via
# alembic
# apache-airflow
# flask
# markdown
-importlib-resources==5.10.2
+importlib-resources==5.12.0
# via
# alembic
# apache-airflow
# flask
# flask-babel
# python-nvd3
- # swagger-ui-bundle
jsonschema==4.17.3
# via
# apache-airflow
# python-daemon
mako==1.2.4
# via alembic
-markdown==3.4.1
+markdown==3.4.3
# via apache-airflow
-markdown-it-py==2.1.0
+markdown-it-py==2.2.0
# via
# apache-airflow
# mdit-py-plugins
# via apache-airflow
marshmallow-sqlalchemy==0.26.1
# via flask-appbuilder
-mdit-py-plugins==0.3.3
+mdit-py-plugins==0.3.5
# via apache-airflow
mdurl==0.1.2
# via markdown-it-py
motor==1.3.1
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+multidict==6.0.4
+ # via
+ # aiohttp
+ # yarl
osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@master
# via -r requirements-dev.in
packaging==23.0
# via apache-airflow
prison==0.2.1
# via flask-appbuilder
-psutil==5.9.4
+psutil==5.9.5
# via apache-airflow
pycparser==2.21
# via cffi
pycryptodome==3.17
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-pygments==2.14.0
+pygments==2.15.1
# via
# apache-airflow
# rich
# motor
pyrsistent==0.19.3
# via jsonschema
-python-daemon==2.3.2
+python-daemon==3.0.1
# via apache-airflow
python-dateutil==2.8.2
# via
# pendulum
python-nvd3==0.15.0
# via apache-airflow
-python-slugify==8.0.0
+python-slugify==8.0.1
# via
# apache-airflow
# python-nvd3
-pytz==2022.7.1
+pytz==2023.3
# via
# babel
# flask-babel
# apispec
# clickclick
# connexion
-requests==2.28.2
+requests==2.29.0
# via
# apache-airflow-providers-http
# connexion
# requests-toolbelt
requests-toolbelt==0.10.1
# via apache-airflow-providers-http
-rfc3986[idna2008]==1.5.0
- # via httpx
-rich==13.3.1
+rfc3339-validator==0.1.4
+ # via apache-airflow
+rich==13.3.5
# via apache-airflow
setproctitle==1.3.2
# via apache-airflow
# via
# prison
# python-dateutil
+ # rfc3339-validator
sniffio==1.3.0
# via
# anyio
# httpcore
# httpx
-sqlalchemy==1.4.46
+sqlalchemy==1.4.47
# via
# alembic
# apache-airflow
# sqlalchemy-utils
sqlalchemy-jsonfield==1.0.1.post0
# via apache-airflow
-sqlalchemy-utils==0.39.0
+sqlalchemy-utils==0.41.1
# via flask-appbuilder
-sqlparse==0.4.3
+sqlparse==0.4.4
# via apache-airflow-providers-common-sql
-swagger-ui-bundle==0.0.9
- # via connexion
tabulate==0.9.0
# via apache-airflow
-tenacity==8.1.0
+tenacity==8.2.2
# via apache-airflow
-termcolor==2.2.0
+termcolor==2.3.0
# via apache-airflow
text-unidecode==1.3
# via python-slugify
-typing-extensions==4.4.0
+typing-extensions==4.5.0
# via
+ # alembic
# apache-airflow
# rich
uc-micro-py==1.0.1
# via linkify-it-py
unicodecsv==0.14.1
# via apache-airflow
-urllib3==1.26.14
+urllib3==1.26.15
# via requests
-werkzeug==2.2.2
+werkzeug==2.2.3
# via
# apache-airflow
# connexion
# flask
# flask-jwt-extended
# flask-login
-wrapt==1.14.1
+wrapt==1.15.0
# via deprecated
wtforms==3.0.1
# via
# flask-appbuilder
# flask-wtf
-zipp==3.12.0
+yarl==1.9.2
+ # via aiohttp
+zipp==3.15.0
# via
# importlib-metadata
# importlib-resources
# See the License for the specific language governing permissions and
# limitations under the License.
+packaging==23.0
stdeb
setuptools-scm
setuptools<60
# limitations under the License.
#######################################################################################
packaging==23.0
- # via setuptools-scm
+ # via
+ # -r requirements-dist.in
+ # setuptools-scm
setuptools-scm==7.1.0
# via -r requirements-dist.in
stdeb==0.10.0
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-coverage==7.1.0
+coverage==7.2.3
# via -r requirements-test.in
-mock==5.0.1
+mock==5.0.2
# via -r requirements-test.in
nose2==0.12.0
# via -r requirements-test.in
gnocchiclient
google-api-python-client
google-auth
+packaging==23.0
+prometheus-api-client
prometheus-client
python-ceilometerclient
python-keystoneclient
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-attrs==22.2.0
+attrs==23.1.0
# via cmd2
autopage==0.5.1
# via cliff
# via -r requirements.in
azure-mgmt-core==1.4.0
# via azure-mgmt-compute
+backports-zoneinfo==0.2.1;python_version<"3.9"
+ # via tzlocal
cachetools==5.3.0
# via google-auth
certifi==2022.12.7
# via gnocchiclient
cmd2==2.4.3
# via cliff
-cryptography==40.0.1
+contourpy==1.0.7
+ # via matplotlib
+cryptography==40.0.2
# via
# azure-identity
# msal
# pyjwt
+cycler==0.11.0
+ # via matplotlib
+dateparser==1.1.8
+ # via prometheus-api-client
debtcollector==2.5.0
# via
# gnocchiclient
# oslo-config
# oslo-utils
# python-keystoneclient
+fonttools==4.39.3
+ # via matplotlib
futurist==2.4.1
# via gnocchiclient
gnocchiclient==7.0.8
# via -r requirements.in
google-api-core==2.11.0
# via google-api-python-client
-google-api-python-client==2.84.0
+google-api-python-client==2.86.0
# via -r requirements.in
-google-auth==2.17.2
+google-auth==2.17.3
# via
# -r requirements.in
# google-api-core
# via google-api-python-client
googleapis-common-protos==1.59.0
# via google-api-core
+httmock==1.4.0
+ # via prometheus-api-client
httplib2==0.22.0
# via
# google-api-python-client
# google-auth-httplib2
idna==3.4
# via requests
-importlib-metadata==6.3.0
+importlib-metadata==6.6.0
# via cliff
+importlib-resources==5.12.0
+ # via matplotlib
iso8601==1.1.0
# via
# gnocchiclient
# python-ceilometerclient
# python-keystoneclient
# python-novaclient
-msal==1.21.0
+kiwisolver==1.4.4
+ # via matplotlib
+matplotlib==3.7.1
+ # via prometheus-api-client
+msal==1.22.0
# via
# azure-identity
# msal-extensions
# oslo-utils
netifaces==0.11.0
# via oslo-utils
+numpy==1.24.3
+ # via
+ # contourpy
+ # matplotlib
+ # pandas
+ # prometheus-api-client
oauthlib==3.2.2
# via requests-oauthlib
os-service-types==1.7.0
# python-novaclient
packaging==23.0
# via
+ # -r requirements.in
+ # matplotlib
# oslo-utils
# python-keystoneclient
+pandas==2.0.1
+ # via prometheus-api-client
pbr==5.11.1
# via
# keystoneauth1
# python-keystoneclient
# python-novaclient
# stevedore
+pillow==9.5.0
+ # via matplotlib
portalocker==2.7.0
# via msal-extensions
prettytable==0.7.2
# cliff
# python-ceilometerclient
# python-novaclient
+prometheus-api-client==0.5.3
+ # via -r requirements.in
prometheus-client==0.16.0
# via -r requirements.in
-protobuf==4.22.1
+protobuf==4.22.3
# via
# google-api-core
# googleapis-common-protos
-pyasn1==0.4.8
+pyasn1==0.5.0
# via
# pyasn1-modules
# rsa
-pyasn1-modules==0.2.8
+pyasn1-modules==0.3.0
# via google-auth
pycparser==2.21
# via cffi
pyparsing==3.0.9
# via
# httplib2
+ # matplotlib
# oslo-utils
pyperclip==1.8.2
# via cmd2
python-ceilometerclient==2.9.0
# via -r requirements.in
python-dateutil==2.8.2
- # via gnocchiclient
+ # via
+ # dateparser
+ # gnocchiclient
+ # matplotlib
+ # pandas
python-keystoneclient==5.1.0
# via -r requirements.in
python-novaclient==18.3.0
# via -r requirements.in
pytz==2023.3
# via
+ # dateparser
# oslo-serialization
# oslo-utils
+ # pandas
+pytz-deprecation-shim==0.1.0.post0
+ # via tzlocal
pyyaml==5.4.1
# via
# -r requirements.in
# cliff
# oslo-config
-requests==2.28.2
+regex==2023.3.23
+ # via dateparser
+requests==2.29.0
# via
# azure-core
# google-api-core
+ # httmock
# keystoneauth1
# msal
# msrest
# oslo-config
+ # prometheus-api-client
# python-ceilometerclient
# python-keystoneclient
# requests-oauthlib
# python-novaclient
typing-extensions==4.5.0
# via azure-core
+tzdata==2023.3
+ # via pandas
+tzlocal==4.3
+ # via dateparser
ujson==5.7.0
# via gnocchiclient
uritemplate==4.1.1
wrapt==1.15.0
# via debtcollector
zipp==3.15.0
- # via importlib-metadata
+ # via
+ # importlib-metadata
+ # importlib-resources
if status == "firing":
# Searching alerting rule in MongoDB
logger.info(
- f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+ f"Searching healing alert rule in MongoDB: ns_id {ns_id}, "
f"vnf_member_index {vnf_member_index}, "
f"vdu_name {vdu_name}, "
f"vm_id {vm_id}"
)
alert = common_db.get_alert(
- nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=None,
+ vdu_name=vdu_name,
+ action_type="healing",
)
- if alert and alert["action_type"] == "healing":
+ if alert:
logger.info("Found an alert rule:")
logger.info(alert)
# Update alert status
f"vm_id {vm_id}"
)
alert = common_db.get_alert(
- nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=None,
+ vdu_name=vdu_name,
+ action_type="healing",
)
if alert:
logger.info("Found an alert rule, updating status")
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime, timedelta
+import logging
+from math import ceil
+from typing import Dict, List
+
+from airflow import DAG
+from airflow.decorators import task
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.vim_connectors.openstack import OpenStackCollector
+from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
+
+
+SCHEDULE_INTERVAL = 5
+COLLECTOR_MAX_METRICS_PER_TASK = 100
+SUPPORTED_VIM_TYPES = ["openstack", "vio"]
+PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
+PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
+PROMETHEUS_METRICS = {
+ "cpu_utilization": {
+ "metric_name": "cpu_utilization",
+ "metric_descr": "CPU usage percentage",
+ },
+ "average_memory_utilization": {
+ "metric_name": "average_memory_utilization",
+ "metric_descr": "Volume of RAM in MB used by the VM",
+ },
+ "disk_read_ops": {
+ "metric_name": "disk_read_ops",
+ "metric_descr": "Number of read requests",
+ },
+ "disk_write_ops": {
+ "metric_name": "disk_write_ops",
+ "metric_descr": "Number of write requests",
+ },
+ "disk_read_bytes": {
+ "metric_name": "disk_read_bytes",
+ "metric_descr": "Volume of reads in bytes",
+ },
+ "disk_write_bytes": {
+ "metric_name": "disk_write_bytes",
+ "metric_descr": "Volume of writes in bytes",
+ },
+ "packets_received": {
+ "metric_name": "packets_received",
+ "metric_descr": "Number of incoming packets",
+ },
+ "packets_sent": {
+ "metric_name": "packets_sent",
+ "metric_descr": "Number of outgoing packets",
+ },
+ "packets_in_dropped": {
+ "metric_name": "packets_in_dropped",
+ "metric_descr": "Number of incoming dropped packets",
+ },
+ "packets_out_dropped": {
+ "metric_name": "packets_out_dropped",
+ "metric_descr": "Number of outgoing dropped packets",
+ },
+}
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+def get_all_vim():
+ """Get VIMs from MongoDB"""
+ logger.info("Getting VIM list")
+
+ cfg = Config()
+ logger.info(cfg.conf)
+ common_db = CommonDbClient(cfg)
+ vim_accounts = common_db.get_vim_accounts()
+ vim_list = []
+ for vim in vim_accounts:
+ logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
+ vim_list.append(
+ {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
+ )
+
+ logger.info(vim_list)
+ logger.info("Getting VIM list OK")
+ return vim_list
+
+
+def create_dag(dag_id, dag_number, dag_description, vim_id):
+ dag = DAG(
+ dag_id,
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ # "retry_delay": timedelta(minutes=1),
+ "retry_delay": timedelta(seconds=10),
+ },
+ description=dag_description,
+ is_paused_upon_creation=False,
+ # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
+ schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "vdu"],
+ )
+
+ with dag:
+
+ @task(task_id="extract_metrics_from_vnfrs")
+ def extract_metrics_from_vnfrs(vim_id: str):
+ """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
+
+ # Get VNFDs that include "monitoring-parameter" from MongoDB
+ cfg = Config()
+ common_db = CommonDbClient(cfg)
+ logger.info("Getting VNFDs with monitoring parameters from MongoDB")
+ vnfd_list = common_db.get_monitoring_vnfds()
+ # Get VNFR list from MongoDB
+ logger.info("Getting VNFR list from MongoDB")
+ vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id)
+ # Only read metrics if ns state is one of the nsAllowedStatesSet
+ nsAllowedStatesSet = {"INSTANTIATED"}
+ metric_list = []
+ for vnfr in vnfr_list:
+ if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet:
+ continue
+ # Check if VNFR is in "monitoring-parameter" VNFDs list
+ vnfd_id = vnfr["vnfd-id"]
+ vnfd = next(
+ (item for item in vnfd_list if item["_id"] == vnfd_id), None
+ )
+ if not vnfd:
+ continue
+ ns_id = vnfr["nsr-id-ref"]
+ vnf_index = vnfr["member-vnf-index-ref"]
+ logger.info(
+ f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
+ )
+ project_list = vnfr.get("_admin", {}).get("projects_read", [])
+ project_id = "None"
+ if project_list:
+ project_id = project_list[0]
+ for vdur in vnfr.get("vdur", []):
+ vim_info = vdur.get("vim_info")
+ if not vim_info:
+ logger.error("Error: vim_info not available in vdur")
+ continue
+ if len(vim_info) != 1:
+ logger.error("Error: more than one vim_info in vdur")
+ continue
+ vim_id = next(iter(vim_info))[4:]
+ vm_id = vdur.get("vim-id")
+ if not vm_id:
+ logger.error("Error: vim-id not available in vdur")
+ continue
+ vdu_name = vdur.get("name", "UNKNOWN")
+ vdu = next(
+ filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"])
+ )
+ if "monitoring-parameter" not in vdu:
+ logger.error("Error: no monitoring-parameter in descriptor")
+ continue
+ for param in vdu["monitoring-parameter"]:
+ metric_name = param["performance-metric"]
+ metric_id = param["id"]
+ metric = {
+ "metric": metric_name,
+ "metric_id": metric_id,
+ "vm_id": vm_id,
+ "ns_id": ns_id,
+ "project_id": project_id,
+ "vdu_name": vdu_name,
+ "vnf_member_index": vnf_index,
+ "vdu_id": vdu["id"],
+ }
+ metric_list.append(metric)
+
+ logger.info(f"Metrics to collect: {len(metric_list)}")
+ return metric_list
+
+ @task(task_id="split_metrics")
+ def split_metrics(metric_list: List[Dict]):
+ """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
+ n_metrics = len(metric_list)
+ if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK:
+ return [metric_list]
+ metrics_per_chunk = ceil(
+ n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK)
+ )
+ logger.info(
+ f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
+ )
+ chunks = []
+ for i in range(0, n_metrics, metrics_per_chunk):
+ chunks.append(metric_list[i : i + metrics_per_chunk])
+ return chunks
+
+ @task(task_id="collect_metrics")
+ def collect_metrics(vim_id: str, metric_list: List[Dict]):
+ """Collect servers metrics"""
+ if not metric_list:
+ return []
+
+ # Get VIM account info from MongoDB
+ logger.info(f"Reading VIM info, id: {vim_id}")
+ cfg = Config()
+ common_db = CommonDbClient(cfg)
+ vim_account = common_db.get_vim_account(vim_account_id=vim_id)
+ # Create VIM metrics collector
+ vim_type = vim_account["vim_type"]
+ if "config" in vim_account and "vim_type" in vim_account["config"]:
+ vim_type = vim_account["config"]["vim_type"].lower()
+ if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
+ vim_type = "openstack"
+ if vim_type == "openstack":
+ collector = OpenStackCollector(vim_account)
+ else:
+ logger.error(f"VIM type '{vim_type}' not supported")
+ return None
+ # Get metrics
+ results = []
+ if collector:
+ results = collector.collect_metrics(metric_list)
+ logger.info(results)
+ return results
+
+ @task(task_id="send_prometheus")
+ def send_prometheus(metric_lists: List[List[Dict]]):
+ """Send servers metrics to Prometheus Push Gateway"""
+ logger.info(metric_lists)
+
+ # Define Prometheus metrics
+ registry = CollectorRegistry()
+ prom_metrics = {}
+ prom_metrics_keys = PROMETHEUS_METRICS.keys()
+ for key in prom_metrics_keys:
+ prom_metrics[key] = Gauge(
+ PROMETHEUS_METRICS[key]["metric_name"],
+ PROMETHEUS_METRICS[key]["metric_descr"],
+ labelnames=[
+ "metric_id",
+ "ns_id",
+ "project_id",
+ "vnf_member_index",
+ "vm_id",
+ "vim_id",
+ "vdu_name",
+ "vdu_id",
+ ],
+ registry=registry,
+ )
+
+ for metric_list in metric_lists:
+ for metric in metric_list:
+ metric_name = metric["metric"]
+ metric_id = metric["metric_id"]
+ value = metric["value"]
+ vm_id = metric["vm_id"]
+ vm_name = metric.get("vdu_name", "")
+ ns_id = metric["ns_id"]
+ project_id = metric["project_id"]
+ vnf_index = metric["vnf_member_index"]
+ vdu_id = metric["vdu_id"]
+ logger.info(
+ f" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
+ )
+ if metric_name in prom_metrics_keys:
+ prom_metrics[metric_name].labels(
+ metric_id,
+ ns_id,
+ project_id,
+ vnf_index,
+ vm_id,
+ vim_id,
+ vm_name,
+ vdu_id,
+ ).set(value)
+
+ # Push to Prometheus
+ push_to_gateway(
+ gateway=PROMETHEUS_PUSHGW,
+ job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
+ registry=registry,
+ )
+ return
+
+ chunks = split_metrics(extract_metrics_from_vnfrs(vim_id))
+ send_prometheus(
+ collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks)
+ )
+
+ return dag
+
+
+vim_list = get_all_vim()
+for index, vim in enumerate(vim_list):
+ vim_type = vim["vim_type"]
+ if vim_type in SUPPORTED_VIM_TYPES:
+ vim_id = vim["_id"]
+ vim_name = vim["name"]
+ dag_description = f"Dag for collecting VM metrics from VIM {vim_name}"
+ dag_id = f"vm_metrics_vim_{vim_id}"
+ logger.info(f"Creating DAG {dag_id}")
+ globals()[dag_id] = create_dag(
+ dag_id=dag_id,
+ dag_number=index,
+ dag_description=dag_description,
+ vim_id=vim_id,
+ )
+ else:
+ logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics")
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ "retry_delay": timedelta(seconds=15),
+ },
+ description="Webhook callback for scale-in alarm from Prometheus AlertManager",
+ is_paused_upon_creation=False,
+ schedule_interval=None,
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "webhook"],
+)
+def scalein_vdu():
+ @task(task_id="main_task")
+ def main_task():
+ logger.debug("Running main task...")
+ # Read input parameters
+ context = get_current_context()
+ conf = context["dag_run"].conf
+ for alarm in conf["alerts"]:
+ logger.info("Scale-in alarm:")
+ status = alarm["status"]
+ logger.info(f" status: {status}")
+ logger.info(f' annotations: {alarm["annotations"]}')
+ logger.info(f' startsAt: {alarm["startsAt"]}')
+ logger.info(f' endsAt: {alarm["endsAt"]}')
+ logger.info(f' labels: {alarm["labels"]}')
+ alertname = alarm["labels"].get("alertname")
+ if not alertname.startswith("scalein_"):
+ continue
+ # scalein_vdu alert type
+ config = Config()
+ common_db = CommonDbClient(config)
+ ns_id = alarm["labels"]["ns_id"]
+ vdu_id = alarm["labels"]["vdu_id"]
+ vnf_member_index = alarm["labels"]["vnf_member_index"]
+ if status == "firing":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching scale-in alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ f"vdu_id {vdu_id}, "
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=vdu_id,
+ vdu_name=None,
+ action_type="scale_in",
+ )
+ if alert:
+ logger.info("Found an alert rule:")
+ logger.info(alert)
+ # Update alert status
+ common_db.update_alert_status(
+ uuid=alert["uuid"], alarm_status="alarm"
+ )
+ # Get VNFR from MongoDB
+ vnfr = common_db.get_vnfr(
+ nsr_id=ns_id, member_index=vnf_member_index
+ )
+ logger.info(
+ f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+ )
+ # Check cooldown-time before scale-in
+ send_lcm = 1
+ if "cooldown-time" in alert["action"]:
+ cooldown_time = alert["action"]["cooldown-time"]
+ cooldown_time = cooldown_time * 60
+ now = time.time()
+ since = now - cooldown_time
+ logger.info(
+ f"Looking for scale operations in cooldown interval ({cooldown_time} s)"
+ )
+ nslcmops = common_db.get_nslcmop(
+ nsr_id=ns_id, operation_type="scale", since=since
+ )
+ op = next(
+ (
+ sub
+ for sub in nslcmops
+ if ("scaleVnfData" in sub["operationParams"])
+ and (
+ "scaleByStepData"
+ in sub["operationParams"]["scaleVnfData"]
+ )
+ and (
+ "member-vnf-index"
+ in sub["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]
+ )
+ and (
+ sub["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]["member-vnf-index"]
+ == vnf_member_index
+ )
+ ),
+ None,
+ )
+ if op:
+ logger.info(
+ f"No scale-in will be launched, found a previous scale operation in cooldown interval: {op}"
+ )
+ send_lcm = 0
+
+ if send_lcm:
+ # Save nslcmop object in MongoDB
+ msg_bus = MessageBusClient(config)
+ loop = asyncio.get_event_loop()
+ _id = str(uuid.uuid4())
+ now = time.time()
+ projects_read = vnfr["_admin"]["projects_read"]
+ projects_write = vnfr["_admin"]["projects_write"]
+ scaling_group = alert["action"]["scaling-group"]
+ params = {
+ "scaleType": "SCALE_VNF",
+ "scaleVnfData": {
+ "scaleVnfType": "SCALE_IN",
+ "scaleByStepData": {
+ "scaling-group-descriptor": scaling_group,
+ "member-vnf-index": vnf_member_index,
+ },
+ },
+ "scaleTime": "{}Z".format(datetime.utcnow().isoformat()),
+ }
+ nslcmop = {
+ "id": _id,
+ "_id": _id,
+ "operationState": "PROCESSING",
+ "statusEnteredTime": now,
+ "nsInstanceId": ns_id,
+ "lcmOperationType": "scale",
+ "startTime": now,
+ "isAutomaticInvocation": True,
+ "operationParams": params,
+ "isCancelPending": False,
+ "links": {
+ "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+ "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+ },
+ "_admin": {
+ "projects_read": projects_read,
+ "projects_write": projects_write,
+ },
+ }
+ common_db.create_nslcmop(nslcmop)
+ # Send Kafka message to LCM
+ logger.info("Sending scale-in action message:")
+ logger.info(nslcmop)
+ loop.run_until_complete(
+ msg_bus.aiowrite("ns", "scale", nslcmop)
+ )
+ else:
+ logger.info("No alert rule was found")
+ elif status == "resolved":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=vdu_id,
+ vdu_name=None,
+ action_type="scale_in",
+ )
+ if alert:
+ logger.info("Found an alert rule, updating status")
+ # Update alert status
+ common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+ main_task()
+
+
+dag = scalein_vdu()
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ "retry_delay": timedelta(seconds=15),
+ },
+ description="Webhook callback for scale-out alarm from Prometheus AlertManager",
+ is_paused_upon_creation=False,
+ schedule_interval=None,
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "webhook"],
+)
+def scaleout_vdu():
+ @task(task_id="main_task")
+ def main_task():
+ logger.debug("Running main task...")
+ # Read input parameters
+ context = get_current_context()
+ conf = context["dag_run"].conf
+ for alarm in conf["alerts"]:
+ logger.info("Scale-out alarm:")
+ status = alarm["status"]
+ logger.info(f" status: {status}")
+ logger.info(f' annotations: {alarm["annotations"]}')
+ logger.info(f' startsAt: {alarm["startsAt"]}')
+ logger.info(f' endsAt: {alarm["endsAt"]}')
+ logger.info(f' labels: {alarm["labels"]}')
+ alertname = alarm["labels"].get("alertname")
+ if not alertname.startswith("scaleout_"):
+ continue
+ # scaleout_vdu alert type
+ config = Config()
+ common_db = CommonDbClient(config)
+ ns_id = alarm["labels"]["ns_id"]
+ vdu_id = alarm["labels"]["vdu_id"]
+ vnf_member_index = alarm["labels"]["vnf_member_index"]
+ if status == "firing":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching scale-out alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ f"vdu_id {vdu_id}, "
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=vdu_id,
+ vdu_name=None,
+ action_type="scale_out",
+ )
+ if alert:
+ logger.info("Found an alert rule:")
+ logger.info(alert)
+ # Update alert status
+ common_db.update_alert_status(
+ uuid=alert["uuid"], alarm_status="alarm"
+ )
+ # Get VNFR from MongoDB
+ vnfr = common_db.get_vnfr(
+ nsr_id=ns_id, member_index=vnf_member_index
+ )
+ logger.info(
+ f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+ )
+ # Check cooldown-time before scale-out
+ send_lcm = 1
+ if "cooldown-time" in alert["action"]:
+ cooldown_time = alert["action"]["cooldown-time"]
+ cooldown_time = cooldown_time * 60
+ now = time.time()
+ since = now - cooldown_time
+ logger.info(
+ f"Looking for scale operations in cooldown interval ({cooldown_time} s)"
+ )
+ nslcmops = common_db.get_nslcmop(
+ nsr_id=ns_id, operation_type="scale", since=since
+ )
+ op = next(
+ (
+ sub
+ for sub in nslcmops
+ if ("scaleVnfData" in sub["operationParams"])
+ and (
+ "scaleByStepData"
+ in sub["operationParams"]["scaleVnfData"]
+ )
+ and (
+ "member-vnf-index"
+ in sub["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]
+ )
+ and (
+ sub["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]["member-vnf-index"]
+ == vnf_member_index
+ )
+ ),
+ None,
+ )
+ if op:
+ logger.info(
+ f"No scale-out will be launched, found a previous scale operation in cooldown interval: {op}"
+ )
+ send_lcm = 0
+
+ if send_lcm:
+ # Save nslcmop object in MongoDB
+ msg_bus = MessageBusClient(config)
+ loop = asyncio.get_event_loop()
+ _id = str(uuid.uuid4())
+ projects_read = vnfr["_admin"]["projects_read"]
+ projects_write = vnfr["_admin"]["projects_write"]
+ scaling_group = alert["action"]["scaling-group"]
+ params = {
+ "scaleType": "SCALE_VNF",
+ "scaleVnfData": {
+ "scaleVnfType": "SCALE_OUT",
+ "scaleByStepData": {
+ "scaling-group-descriptor": scaling_group,
+ "member-vnf-index": vnf_member_index,
+ },
+ },
+ "scaleTime": "{}Z".format(datetime.utcnow().isoformat()),
+ }
+ nslcmop = {
+ "id": _id,
+ "_id": _id,
+ "operationState": "PROCESSING",
+ "statusEnteredTime": now,
+ "nsInstanceId": ns_id,
+ "lcmOperationType": "scale",
+ "startTime": now,
+ "isAutomaticInvocation": True,
+ "operationParams": params,
+ "isCancelPending": False,
+ "links": {
+ "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+ "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+ },
+ "_admin": {
+ "projects_read": projects_read,
+ "projects_write": projects_write,
+ },
+ }
+ common_db.create_nslcmop(nslcmop)
+ # Send Kafka message to LCM
+ logger.info("Sending scale-out action message:")
+ logger.info(nslcmop)
+ loop.run_until_complete(
+ msg_bus.aiowrite("ns", "scale", nslcmop)
+ )
+ else:
+ logger.info("No alert rule was found")
+ elif status == "resolved":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=vdu_id,
+ vdu_name=None,
+ action_type="scale_out",
+ )
+ if alert:
+ logger.info("Found an alert rule, updating status")
+ # Update alert status
+ common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+ main_task()
+
+
+dag = scaleout_vdu()
nsr = self.common_db.get_one("nsrs", {"id": nsr_id})
return nsr
+ def get_vnfds(self):
+ return self.common_db.get_list("vnfds")
+
+ def get_monitoring_vnfds(self):
+ return self.common_db.get_list(
+ "vnfds", {"vdu.monitoring-parameter": {"$exists": "true"}}
+ )
+
def decrypt_vim_password(self, vim_password: str, schema_version: str, vim_id: str):
return self.common_db.decrypt(vim_password, schema_version, vim_id)
)
return vim_account
- def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str):
+ def get_alert(
+ self,
+ nsr_id: str,
+ vnf_member_index: str,
+ vdu_id: str,
+ vdu_name: str,
+ action_type: str,
+ ):
+ q_filter = {"action_type": action_type}
+ if nsr_id:
+ q_filter["tags.ns_id"] = nsr_id
+ if vnf_member_index:
+ q_filter["tags.vnf_member_index"] = vnf_member_index
+ if vdu_id:
+ q_filter["tags.vdu_id"] = vdu_id
+ if vdu_name:
+ q_filter["tags.vdu_name"] = vdu_name
alert = self.common_db.get_one(
- "alerts",
- {
- "tags.ns_id": nsr_id,
- "tags.vnf_member_index": vnf_member_index,
- "tags.vdu_name": vdu_name,
- },
+ table="alerts", q_filter=q_filter, fail_on_empty=False
)
return alert
def create_nslcmop(self, nslcmop: dict):
self.common_db.create("nslcmops", nslcmop)
+
+ def get_nslcmop(self, nsr_id: str, operation_type: str, since: str):
+ q_filter = {}
+ if nsr_id:
+ q_filter["nsInstanceId"] = nsr_id
+ if operation_type:
+ q_filter["lcmOperationType"] = operation_type
+ if since:
+ q_filter["startTime"] = {"$gt": since}
+ ops = self.common_db.get_list(table="nslcmops", q_filter=q_filter)
+ return ops
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
+from enum import Enum
import logging
+import time
from typing import Dict, List
+from ceilometerclient import client as ceilometer_client
+from ceilometerclient.exc import HTTPException
+import gnocchiclient.exceptions
+from gnocchiclient.v1 import client as gnocchi_client
from keystoneauth1 import session
+from keystoneauth1.exceptions.catalog import EndpointNotFound
from keystoneauth1.identity import v3
from novaclient import client as nova_client
from osm_mon.vim_connectors.base_vim import VIMConnector
+from prometheus_api_client import PrometheusConnect as prometheus_client
log = logging.getLogger(__name__)
+METRIC_MULTIPLIERS = {"cpu": 0.0000001}
+
+METRIC_AGGREGATORS = {"cpu": "rate:mean"}
+
+INTERFACE_METRICS = [
+ "packets_in_dropped",
+ "packets_out_dropped",
+ "packets_received",
+ "packets_sent",
+]
+
+INSTANCE_DISK = [
+ "disk_read_ops",
+ "disk_write_ops",
+ "disk_read_bytes",
+ "disk_write_bytes",
+]
+
+METRIC_MAPPINGS = {
+ "average_memory_utilization": "memory.usage",
+ "disk_read_ops": "disk.device.read.requests",
+ "disk_write_ops": "disk.device.write.requests",
+ "disk_read_bytes": "disk.device.read.bytes",
+ "disk_write_bytes": "disk.device.write.bytes",
+ "packets_in_dropped": "network.outgoing.packets.drop",
+ "packets_out_dropped": "network.incoming.packets.drop",
+ "packets_received": "network.incoming.packets",
+ "packets_sent": "network.outgoing.packets",
+ "cpu_utilization": "cpu",
+}
+
+METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD = {
+ "cpu_utilization": "cpu",
+ "average_memory_utilization": "memory_usage",
+ "disk_read_ops": "disk_device_read_requests",
+ "disk_write_ops": "disk_device_write_requests",
+ "disk_read_bytes": "disk_device_read_bytes",
+ "disk_write_bytes": "disk_device_write_bytes",
+ "packets_in_dropped": "network_incoming_packets_drop",
+ "packets_out_dropped": "network_outgoing_packets_drop",
+ "packets_received": "network_incoming_packets",
+ "packets_sent": "network_outgoing_packets",
+}
+
+
+class MetricType(Enum):
+ INSTANCE = "instance"
+ INTERFACE_ALL = "interface_all"
+ INTERFACE_ONE = "interface_one"
+ INSTANCEDISK = "instancedisk"
+
class CertificateNotCreated(Exception):
pass
class OpenStackCollector(VIMConnector):
def __init__(self, vim_account: Dict):
- log.info("__init__")
+ log.debug("__init__")
self.vim_account = vim_account
self.vim_session = None
self.vim_session = self._get_session(vim_account)
self.nova = self._build_nova_client()
+ # self.gnocchi = self._build_gnocchi_client()
+ self.backend = self._get_backend(vim_account, self.vim_session)
def _get_session(self, creds: Dict):
+ log.debug("_get_session")
verify_ssl = True
project_domain_name = "Default"
user_domain_name = "Default"
except CertificateNotCreated as e:
log.error(e)
+ def _get_backend(self, vim_account: dict, vim_session: object):
+ if vim_account.get("prometheus-config"):
+ # try:
+ # tsbd = PrometheusTSBDBackend(vim_account)
+ # log.debug("Using prometheustsbd backend to collect metric")
+ # return tsbd
+ # except Exception as e:
+ # log.error(f"Can't create prometheus client, {e}")
+ # return None
+ return None
+ try:
+ gnocchi = GnocchiBackend(vim_account, vim_session)
+ gnocchi.client.metric.list(limit=1)
+ log.debug("Using gnocchi backend to collect metric")
+ return gnocchi
+ except (HTTPException, EndpointNotFound):
+ ceilometer = CeilometerBackend(vim_account, vim_session)
+ ceilometer.client.capabilities.get()
+ log.debug("Using ceilometer backend to collect metric")
+ return ceilometer
+
def _build_nova_client(self) -> nova_client.Client:
return nova_client.Client("2", session=self.vim_session, timeout=10)
+ def _build_gnocchi_client(self) -> gnocchi_client.Client:
+ return gnocchi_client.Client(session=self.vim_session)
+
def collect_servers_status(self) -> List[Dict]:
- log.info("collect_servers_status")
+ log.debug("collect_servers_status")
servers = []
for server in self.nova.servers.list(detailed=True):
vm = {
return servers
def is_vim_ok(self) -> bool:
+ log.debug("is_vim_ok")
try:
self.nova.servers.list()
return True
except Exception as e:
log.warning("VIM status is not OK: %s" % e)
return False
+
+ def _get_metric_type(self, metric_name: str) -> MetricType:
+ if metric_name not in INTERFACE_METRICS:
+ if metric_name not in INSTANCE_DISK:
+ return MetricType.INSTANCE
+ else:
+ return MetricType.INSTANCEDISK
+ else:
+ return MetricType.INTERFACE_ALL
+
+ def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+ log.debug("collect_metrics")
+ if not self.backend:
+ log.error("Undefined backend")
+ return []
+
+ if type(self.backend) is PrometheusTSBDBackend:
+ log.info("Using Prometheus as backend (NOT SUPPORTED)")
+ return []
+
+ metric_results = []
+ for metric in metric_list:
+ server = metric["vm_id"]
+ metric_name = metric["metric"]
+ openstack_metric_name = METRIC_MAPPINGS[metric_name]
+ metric_type = self._get_metric_type(metric_name)
+ log.info(f"Collecting metric {openstack_metric_name} for {server}")
+ try:
+ value = self.backend.collect_metric(
+ metric_type, openstack_metric_name, server
+ )
+ if value is not None:
+ log.info(f"value: {value}")
+ metric["value"] = value
+ metric_results.append(metric)
+ else:
+ log.info("metric value is empty")
+ except Exception as e:
+ log.error("Error in metric collection: %s" % e)
+ return metric_results
+
+
+class OpenstackBackend:
+ def collect_metric(
+ self, metric_type: MetricType, metric_name: str, resource_id: str
+ ):
+ pass
+
+
+class PrometheusTSBDBackend(OpenstackBackend):
+ def __init__(self, vim_account: dict):
+ self.map = self._build_map(vim_account)
+ self.cred = vim_account["prometheus-config"].get("prometheus-cred")
+ self.client = self._build_prometheus_client(
+ vim_account["prometheus-config"]["prometheus-url"]
+ )
+
+ def _build_prometheus_client(self, url: str) -> prometheus_client:
+ return prometheus_client(url, disable_ssl=True)
+
+ def _build_map(self, vim_account: dict) -> dict:
+ custom_map = METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD
+ if "prometheus-map" in vim_account["prometheus-config"]:
+ custom_map.update(vim_account["prometheus-config"]["prometheus-map"])
+ return custom_map
+
+ def collect_metric(
+ self, metric_type: MetricType, metric_name: str, resource_id: str
+ ):
+ metric = self.query_metric(metric_name, resource_id)
+ return metric["value"][1] if metric else None
+
+ def map_metric(self, metric_name: str):
+ return self.map[metric_name]
+
+ def query_metric(self, metric_name, resource_id=None):
+ metrics = self.client.get_current_metric_value(metric_name=metric_name)
+ if resource_id:
+ metric = next(
+ filter(lambda x: resource_id in x["metric"]["resource_id"], metrics)
+ )
+ return metric
+ return metrics
+
+
+class GnocchiBackend(OpenstackBackend):
+ def __init__(self, vim_account: dict, vim_session: object):
+ self.client = self._build_gnocchi_client(vim_account, vim_session)
+
+ def _build_gnocchi_client(
+ self, vim_account: dict, vim_session: object
+ ) -> gnocchi_client.Client:
+ return gnocchi_client.Client(session=vim_session)
+
+ def collect_metric(
+ self, metric_type: MetricType, metric_name: str, resource_id: str
+ ):
+ if metric_type == MetricType.INTERFACE_ALL:
+ return self._collect_interface_all_metric(metric_name, resource_id)
+
+ elif metric_type == MetricType.INSTANCE:
+ return self._collect_instance_metric(metric_name, resource_id)
+
+ elif metric_type == MetricType.INSTANCEDISK:
+ return self._collect_instance_disk_metric(metric_name, resource_id)
+
+ else:
+ raise Exception("Unknown metric type %s" % metric_type.value)
+
+ def _collect_interface_all_metric(self, openstack_metric_name, resource_id):
+ total_measure = None
+ interfaces = self.client.resource.search(
+ resource_type="instance_network_interface",
+ query={"=": {"instance_id": resource_id}},
+ )
+ for interface in interfaces:
+ try:
+ measures = self.client.metric.get_measures(
+ openstack_metric_name, resource_id=interface["id"], limit=1
+ )
+ if measures:
+ if not total_measure:
+ total_measure = 0.0
+ total_measure += measures[-1][2]
+ except (gnocchiclient.exceptions.NotFound, TypeError) as e:
+ # Gnocchi in some Openstack versions raise TypeError instead of NotFound
+ log.debug(
+ "No metric %s found for interface %s: %s",
+ openstack_metric_name,
+ interface["id"],
+ e,
+ )
+ return total_measure
+
+ def _collect_instance_disk_metric(self, openstack_metric_name, resource_id):
+ value = None
+ instances = self.client.resource.search(
+ resource_type="instance_disk",
+ query={"=": {"instance_id": resource_id}},
+ )
+ for instance in instances:
+ try:
+ measures = self.client.metric.get_measures(
+ openstack_metric_name, resource_id=instance["id"], limit=1
+ )
+ if measures:
+ value = measures[-1][2]
+
+ except gnocchiclient.exceptions.NotFound as e:
+ log.debug(
+ "No metric %s found for instance disk %s: %s",
+ openstack_metric_name,
+ instance["id"],
+ e,
+ )
+ return value
+
+ def _collect_instance_metric(self, openstack_metric_name, resource_id):
+ value = None
+ try:
+ aggregation = METRIC_AGGREGATORS.get(openstack_metric_name)
+
+ try:
+ measures = self.client.metric.get_measures(
+ openstack_metric_name,
+ aggregation=aggregation,
+ start=time.time() - 1200,
+ resource_id=resource_id,
+ )
+ if measures:
+ value = measures[-1][2]
+ except (
+ gnocchiclient.exceptions.NotFound,
+ gnocchiclient.exceptions.BadRequest,
+ TypeError,
+ ) as e:
+ # CPU metric in previous Openstack versions do not support rate:mean aggregation method
+ # Gnocchi in some Openstack versions raise TypeError instead of NotFound or BadRequest
+ if openstack_metric_name == "cpu":
+ log.debug(
+ "No metric %s found for instance %s: %s",
+ openstack_metric_name,
+ resource_id,
+ e,
+ )
+ log.info(
+ "Retrying to get metric %s for instance %s without aggregation",
+ openstack_metric_name,
+ resource_id,
+ )
+ measures = self.client.metric.get_measures(
+ openstack_metric_name, resource_id=resource_id, limit=1
+ )
+ else:
+ raise e
+ # measures[-1] is the last measure
+ # measures[-2] is the previous measure
+ # measures[x][2] is the value of the metric
+ if measures and len(measures) >= 2:
+ value = measures[-1][2] - measures[-2][2]
+ if value:
+ # measures[-1][0] is the time of the reporting interval
+ # measures[-1][1] is the duration of the reporting interval
+ if aggregation:
+ # If this is an aggregate, we need to divide the total over the reported time period.
+ # Even if the aggregation method is not supported by Openstack, the code will execute it
+ # because aggregation is specified in METRIC_AGGREGATORS
+ value = value / measures[-1][1]
+ if openstack_metric_name in METRIC_MULTIPLIERS:
+ value = value * METRIC_MULTIPLIERS[openstack_metric_name]
+ except gnocchiclient.exceptions.NotFound as e:
+ log.debug(
+ "No metric %s found for instance %s: %s",
+ openstack_metric_name,
+ resource_id,
+ e,
+ )
+ return value
+
+
+class CeilometerBackend(OpenstackBackend):
+ def __init__(self, vim_account: dict, vim_session: object):
+ self.client = self._build_ceilometer_client(vim_account, vim_session)
+
+ def _build_ceilometer_client(
+ self, vim_account: dict, vim_session: object
+ ) -> ceilometer_client.Client:
+ return ceilometer_client.Client("2", session=vim_session)
+
+ def collect_metric(
+ self, metric_type: MetricType, metric_name: str, resource_id: str
+ ):
+ if metric_type != MetricType.INSTANCE:
+ raise NotImplementedError(
+ "Ceilometer backend only support instance metrics"
+ )
+ measures = self.client.samples.list(
+ meter_name=metric_name,
+ limit=1,
+ q=[{"field": "resource_id", "op": "eq", "value": resource_id}],
+ )
+ return measures[0].counter_volume if measures else None