Feature 10981: add Openstack metrics collector and scale-out/in DAGs for autoscaling 94/13194/14
authoraguilard <e.dah.tid@telefonica.com>
Thu, 13 Apr 2023 10:43:07 +0000 (10:43 +0000)
committeraguilard <e.dah.tid@telefonica.com>
Tue, 2 May 2023 09:17:35 +0000 (09:17 +0000)
Change-Id: Idff1974545d28208a853787d748f1839dffc69e5
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
15 files changed:
osm_webhook_translator/requirements-dist.txt
osm_webhook_translator/requirements.txt
requirements-dev.in
requirements-dev.txt
requirements-dist.in
requirements-dist.txt
requirements-test.txt
requirements.in
requirements.txt
src/osm_ngsa/dags/alert_vdu.py
src/osm_ngsa/dags/multivim_vm_metrics.py [new file with mode: 0644]
src/osm_ngsa/dags/scalein_vdu.py [new file with mode: 0644]
src/osm_ngsa/dags/scaleout_vdu.py [new file with mode: 0644]
src/osm_ngsa/osm_mon/core/common_db.py
src/osm_ngsa/osm_mon/vim_connectors/openstack.py

index 6ddded6..64ae136 100644 (file)
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
-packaging==23.0
+packaging==23.1
     # via setuptools-scm
 setuptools-scm==7.1.0
     # via -r osm_webhook_translator/requirements-dist.in
index 6cf7f42..c8d78fa 100644 (file)
@@ -22,7 +22,7 @@ charset-normalizer==3.1.0
     # via requests
 click==8.1.3
     # via uvicorn
-fastapi==0.95.0
+fastapi==0.95.1
     # via -r osm_webhook_translator/requirements.in
 h11==0.14.0
     # via uvicorn
@@ -32,7 +32,7 @@ idna==3.4
     #   requests
 pydantic==1.10.7
     # via fastapi
-requests==2.28.2
+requests==2.29.0
     # via -r osm_webhook_translator/requirements.in
 sniffio==1.3.0
     # via anyio
index ed3f514..8434f0e 100644 (file)
@@ -17,4 +17,4 @@
 
 git+https://osm.etsi.org/gerrit/osm/common.git@master#egg=osm-common
 -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-apache-airflow==2.4.*
+apache-airflow==2.5.*
index 973bdcb..a82e656 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
+aiohttp==3.8.4
+    # via apache-airflow-providers-http
 aiokafka==0.8.0
     # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-alembic==1.9.2
+aiosignal==1.3.1
+    # via aiohttp
+alembic==1.10.4
     # via apache-airflow
 anyio==3.6.2
     # via httpcore
-apache-airflow==2.4.3
+apache-airflow==2.5.3
     # via -r requirements-dev.in
-apache-airflow-providers-common-sql==1.3.3
+apache-airflow-providers-common-sql==1.4.0
     # via
     #   apache-airflow
     #   apache-airflow-providers-sqlite
-apache-airflow-providers-ftp==3.3.0
+apache-airflow-providers-ftp==3.3.1
     # via apache-airflow
-apache-airflow-providers-http==4.1.1
+apache-airflow-providers-http==4.3.0
     # via apache-airflow
 apache-airflow-providers-imap==3.1.1
     # via apache-airflow
-apache-airflow-providers-sqlite==3.3.1
+apache-airflow-providers-sqlite==3.3.2
     # via apache-airflow
 apispec[yaml]==3.3.2
     # via flask-appbuilder
-argcomplete==2.0.0
+argcomplete==3.0.8
     # via apache-airflow
+asgiref==3.6.0
+    # via apache-airflow-providers-http
 async-timeout==4.0.2
     # via
     #   -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+    #   aiohttp
     #   aiokafka
-attrs==22.2.0
+attrs==23.1.0
     # via
+    #   aiohttp
     #   apache-airflow
     #   cattrs
     #   jsonschema
-babel==2.11.0
+babel==2.12.1
     # via flask-babel
-blinker==1.5
+blinker==1.6.2
     # via apache-airflow
 cachelib==0.9.0
     # via
@@ -64,8 +72,10 @@ certifi==2022.12.7
     #   requests
 cffi==1.15.1
     # via cryptography
-charset-normalizer==3.0.1
-    # via requests
+charset-normalizer==3.1.0
+    # via
+    #   aiohttp
+    #   requests
 click==8.1.3
     # via
     #   clickclick
@@ -79,13 +89,13 @@ colorlog==4.8.0
     # via apache-airflow
 configupdater==3.1.1
     # via apache-airflow
-connexion[flask,swagger-ui]==2.14.2
+connexion[flask]==2.14.2
     # via apache-airflow
 cron-descriptor==1.2.35
     # via apache-airflow
-croniter==1.3.8
+croniter==1.3.14
     # via apache-airflow
-cryptography==39.0.0
+cryptography==40.0.2
     # via apache-airflow
 dataclasses==0.6
     # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
@@ -99,9 +109,9 @@ docutils==0.19
     # via python-daemon
 email-validator==1.3.1
     # via flask-appbuilder
-exceptiongroup==1.1.0
+exceptiongroup==1.1.1
     # via cattrs
-flask==2.2.2
+flask==2.2.4
     # via
     #   apache-airflow
     #   connexion
@@ -133,6 +143,10 @@ flask-wtf==1.1.1
     # via
     #   apache-airflow
     #   flask-appbuilder
+frozenlist==1.3.3
+    # via
+    #   aiohttp
+    #   aiosignal
 graphviz==0.20.1
     # via apache-airflow
 greenlet==2.0.2
@@ -141,23 +155,24 @@ gunicorn==20.1.0
     # via apache-airflow
 h11==0.14.0
     # via httpcore
-httpcore==0.16.3
+httpcore==0.17.0
     # via httpx
-httpx==0.23.3
+httpx==0.24.0
     # via apache-airflow
 idna==3.4
     # via
     #   anyio
     #   email-validator
+    #   httpx
     #   requests
-    #   rfc3986
-importlib-metadata==6.0.0
+    #   yarl
+importlib-metadata==4.13.0
     # via
     #   alembic
     #   apache-airflow
     #   flask
     #   markdown
-importlib-resources==5.10.2
+importlib-resources==5.12.0
     # via
     #   alembic
     #   apache-airflow
@@ -176,7 +191,6 @@ jinja2==3.1.2
     #   flask
     #   flask-babel
     #   python-nvd3
-    #   swagger-ui-bundle
 jsonschema==4.17.3
     # via
     #   apache-airflow
@@ -196,9 +210,9 @@ lockfile==0.12.2
     #   python-daemon
 mako==1.2.4
     # via alembic
-markdown==3.4.1
+markdown==3.4.3
     # via apache-airflow
-markdown-it-py==2.1.0
+markdown-it-py==2.2.0
     # via
     #   apache-airflow
     #   mdit-py-plugins
@@ -222,12 +236,16 @@ marshmallow-oneofschema==3.0.1
     # via apache-airflow
 marshmallow-sqlalchemy==0.26.1
     # via flask-appbuilder
-mdit-py-plugins==0.3.3
+mdit-py-plugins==0.3.5
     # via apache-airflow
 mdurl==0.1.2
     # via markdown-it-py
 motor==1.3.1
     # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+multidict==6.0.4
+    # via
+    #   aiohttp
+    #   yarl
 osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@master
     # via -r requirements-dev.in
 packaging==23.0
@@ -247,13 +265,13 @@ pluggy==1.0.0
     # via apache-airflow
 prison==0.2.1
     # via flask-appbuilder
-psutil==5.9.4
+psutil==5.9.5
     # via apache-airflow
 pycparser==2.21
     # via cffi
 pycryptodome==3.17
     # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-pygments==2.14.0
+pygments==2.15.1
     # via
     #   apache-airflow
     #   rich
@@ -268,7 +286,7 @@ pymongo==3.13.0
     #   motor
 pyrsistent==0.19.3
     # via jsonschema
-python-daemon==2.3.2
+python-daemon==3.0.1
     # via apache-airflow
 python-dateutil==2.8.2
     # via
@@ -278,11 +296,11 @@ python-dateutil==2.8.2
     #   pendulum
 python-nvd3==0.15.0
     # via apache-airflow
-python-slugify==8.0.0
+python-slugify==8.0.1
     # via
     #   apache-airflow
     #   python-nvd3
-pytz==2022.7.1
+pytz==2023.3
     # via
     #   babel
     #   flask-babel
@@ -294,16 +312,16 @@ pyyaml==5.4.1
     #   apispec
     #   clickclick
     #   connexion
-requests==2.28.2
+requests==2.29.0
     # via
     #   apache-airflow-providers-http
     #   connexion
     #   requests-toolbelt
 requests-toolbelt==0.10.1
     # via apache-airflow-providers-http
-rfc3986[idna2008]==1.5.0
-    # via httpx
-rich==13.3.1
+rfc3339-validator==0.1.4
+    # via apache-airflow
+rich==13.3.5
     # via apache-airflow
 setproctitle==1.3.2
     # via apache-airflow
@@ -311,12 +329,13 @@ six==1.16.0
     # via
     #   prison
     #   python-dateutil
+    #   rfc3339-validator
 sniffio==1.3.0
     # via
     #   anyio
     #   httpcore
     #   httpx
-sqlalchemy==1.4.46
+sqlalchemy==1.4.47
     # via
     #   alembic
     #   apache-airflow
@@ -327,44 +346,45 @@ sqlalchemy==1.4.46
     #   sqlalchemy-utils
 sqlalchemy-jsonfield==1.0.1.post0
     # via apache-airflow
-sqlalchemy-utils==0.39.0
+sqlalchemy-utils==0.41.1
     # via flask-appbuilder
-sqlparse==0.4.3
+sqlparse==0.4.4
     # via apache-airflow-providers-common-sql
-swagger-ui-bundle==0.0.9
-    # via connexion
 tabulate==0.9.0
     # via apache-airflow
-tenacity==8.1.0
+tenacity==8.2.2
     # via apache-airflow
-termcolor==2.2.0
+termcolor==2.3.0
     # via apache-airflow
 text-unidecode==1.3
     # via python-slugify
-typing-extensions==4.4.0
+typing-extensions==4.5.0
     # via
+    #   alembic
     #   apache-airflow
     #   rich
 uc-micro-py==1.0.1
     # via linkify-it-py
 unicodecsv==0.14.1
     # via apache-airflow
-urllib3==1.26.14
+urllib3==1.26.15
     # via requests
-werkzeug==2.2.2
+werkzeug==2.2.3
     # via
     #   apache-airflow
     #   connexion
     #   flask
     #   flask-jwt-extended
     #   flask-login
-wrapt==1.14.1
+wrapt==1.15.0
     # via deprecated
 wtforms==3.0.1
     # via
     #   flask-appbuilder
     #   flask-wtf
-zipp==3.12.0
+yarl==1.9.2
+    # via aiohttp
+zipp==3.15.0
     # via
     #   importlib-metadata
     #   importlib-resources
index 03ff6e9..7929c6f 100644 (file)
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+packaging==23.0
 stdeb
 setuptools-scm
 setuptools<60
index 7b15e9f..48f399b 100644 (file)
@@ -15,7 +15,9 @@
 # limitations under the License.
 #######################################################################################
 packaging==23.0
-    # via setuptools-scm
+    # via
+    #   -r requirements-dist.in
+    #   setuptools-scm
 setuptools-scm==7.1.0
     # via -r requirements-dist.in
 stdeb==0.10.0
index fd0128d..dc5a646 100644 (file)
@@ -14,9 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
-coverage==7.1.0
+coverage==7.2.3
     # via -r requirements-test.in
-mock==5.0.1
+mock==5.0.2
     # via -r requirements-test.in
 nose2==0.12.0
     # via -r requirements-test.in
index 6779aef..58bbbd4 100644 (file)
@@ -21,6 +21,8 @@ azure-mgmt-compute
 gnocchiclient
 google-api-python-client
 google-auth
+packaging==23.0
+prometheus-api-client
 prometheus-client
 python-ceilometerclient
 python-keystoneclient
index fc34d3f..953edd4 100644 (file)
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
-attrs==22.2.0
+attrs==23.1.0
     # via cmd2
 autopage==0.5.1
     # via cliff
@@ -33,6 +33,8 @@ azure-mgmt-compute==29.1.0
     # via -r requirements.in
 azure-mgmt-core==1.4.0
     # via azure-mgmt-compute
+backports-zoneinfo==0.2.1;python_version<"3.9"
+    # via tzlocal
 cachetools==5.3.0
     # via google-auth
 certifi==2022.12.7
@@ -47,26 +49,34 @@ cliff==4.2.0
     # via gnocchiclient
 cmd2==2.4.3
     # via cliff
-cryptography==40.0.1
+contourpy==1.0.7
+    # via matplotlib
+cryptography==40.0.2
     # via
     #   azure-identity
     #   msal
     #   pyjwt
+cycler==0.11.0
+    # via matplotlib
+dateparser==1.1.8
+    # via prometheus-api-client
 debtcollector==2.5.0
     # via
     #   gnocchiclient
     #   oslo-config
     #   oslo-utils
     #   python-keystoneclient
+fonttools==4.39.3
+    # via matplotlib
 futurist==2.4.1
     # via gnocchiclient
 gnocchiclient==7.0.8
     # via -r requirements.in
 google-api-core==2.11.0
     # via google-api-python-client
-google-api-python-client==2.84.0
+google-api-python-client==2.86.0
     # via -r requirements.in
-google-auth==2.17.2
+google-auth==2.17.3
     # via
     #   -r requirements.in
     #   google-api-core
@@ -76,14 +86,18 @@ google-auth-httplib2==0.1.0
     # via google-api-python-client
 googleapis-common-protos==1.59.0
     # via google-api-core
+httmock==1.4.0
+    # via prometheus-api-client
 httplib2==0.22.0
     # via
     #   google-api-python-client
     #   google-auth-httplib2
 idna==3.4
     # via requests
-importlib-metadata==6.3.0
+importlib-metadata==6.6.0
     # via cliff
+importlib-resources==5.12.0
+    # via matplotlib
 iso8601==1.1.0
     # via
     #   gnocchiclient
@@ -99,7 +113,11 @@ keystoneauth1==5.1.2
     #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
-msal==1.21.0
+kiwisolver==1.4.4
+    # via matplotlib
+matplotlib==3.7.1
+    # via prometheus-api-client
+msal==1.22.0
     # via
     #   azure-identity
     #   msal-extensions
@@ -115,6 +133,12 @@ netaddr==0.8.0
     #   oslo-utils
 netifaces==0.11.0
     # via oslo-utils
+numpy==1.24.3
+    # via
+    #   contourpy
+    #   matplotlib
+    #   pandas
+    #   prometheus-api-client
 oauthlib==3.2.2
     # via requests-oauthlib
 os-service-types==1.7.0
@@ -141,8 +165,12 @@ oslo-utils==6.1.0
     #   python-novaclient
 packaging==23.0
     # via
+    #   -r requirements.in
+    #   matplotlib
     #   oslo-utils
     #   python-keystoneclient
+pandas==2.0.1
+    # via prometheus-api-client
 pbr==5.11.1
     # via
     #   keystoneauth1
@@ -153,6 +181,8 @@ pbr==5.11.1
     #   python-keystoneclient
     #   python-novaclient
     #   stevedore
+pillow==9.5.0
+    # via matplotlib
 portalocker==2.7.0
     # via msal-extensions
 prettytable==0.7.2
@@ -160,17 +190,19 @@ prettytable==0.7.2
     #   cliff
     #   python-ceilometerclient
     #   python-novaclient
+prometheus-api-client==0.5.3
+    # via -r requirements.in
 prometheus-client==0.16.0
     # via -r requirements.in
-protobuf==4.22.1
+protobuf==4.22.3
     # via
     #   google-api-core
     #   googleapis-common-protos
-pyasn1==0.4.8
+pyasn1==0.5.0
     # via
     #   pyasn1-modules
     #   rsa
-pyasn1-modules==0.2.8
+pyasn1-modules==0.3.0
     # via google-auth
 pycparser==2.21
     # via cffi
@@ -179,34 +211,47 @@ pyjwt[crypto]==2.6.0
 pyparsing==3.0.9
     # via
     #   httplib2
+    #   matplotlib
     #   oslo-utils
 pyperclip==1.8.2
     # via cmd2
 python-ceilometerclient==2.9.0
     # via -r requirements.in
 python-dateutil==2.8.2
-    # via gnocchiclient
+    # via
+    #   dateparser
+    #   gnocchiclient
+    #   matplotlib
+    #   pandas
 python-keystoneclient==5.1.0
     # via -r requirements.in
 python-novaclient==18.3.0
     # via -r requirements.in
 pytz==2023.3
     # via
+    #   dateparser
     #   oslo-serialization
     #   oslo-utils
+    #   pandas
+pytz-deprecation-shim==0.1.0.post0
+    # via tzlocal
 pyyaml==5.4.1
     # via
     #   -r requirements.in
     #   cliff
     #   oslo-config
-requests==2.28.2
+regex==2023.3.23
+    # via dateparser
+requests==2.29.0
     # via
     #   azure-core
     #   google-api-core
+    #   httmock
     #   keystoneauth1
     #   msal
     #   msrest
     #   oslo-config
+    #   prometheus-api-client
     #   python-ceilometerclient
     #   python-keystoneclient
     #   requests-oauthlib
@@ -238,6 +283,10 @@ stevedore==5.0.0
     #   python-novaclient
 typing-extensions==4.5.0
     # via azure-core
+tzdata==2023.3
+    # via pandas
+tzlocal==4.3
+    # via dateparser
 ujson==5.7.0
     # via gnocchiclient
 uritemplate==4.1.1
@@ -249,4 +298,6 @@ wcwidth==0.2.6
 wrapt==1.15.0
     # via debtcollector
 zipp==3.15.0
-    # via importlib-metadata
+    # via
+    #   importlib-metadata
+    #   importlib-resources
index 390460a..c314893 100644 (file)
@@ -69,15 +69,19 @@ def alert_vdu():
             if status == "firing":
                 # Searching alerting rule in MongoDB
                 logger.info(
-                    f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+                    f"Searching healing alert rule in MongoDB: ns_id {ns_id}, "
                     f"vnf_member_index {vnf_member_index}, "
                     f"vdu_name {vdu_name}, "
                     f"vm_id {vm_id}"
                 )
                 alert = common_db.get_alert(
-                    nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+                    nsr_id=ns_id,
+                    vnf_member_index=vnf_member_index,
+                    vdu_id=None,
+                    vdu_name=vdu_name,
+                    action_type="healing",
                 )
-                if alert and alert["action_type"] == "healing":
+                if alert:
                     logger.info("Found an alert rule:")
                     logger.info(alert)
                     # Update alert status
@@ -166,7 +170,11 @@ def alert_vdu():
                     f"vm_id {vm_id}"
                 )
                 alert = common_db.get_alert(
-                    nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+                    nsr_id=ns_id,
+                    vnf_member_index=vnf_member_index,
+                    vdu_id=None,
+                    vdu_name=vdu_name,
+                    action_type="healing",
                 )
                 if alert:
                     logger.info("Found an alert rule, updating status")
diff --git a/src/osm_ngsa/dags/multivim_vm_metrics.py b/src/osm_ngsa/dags/multivim_vm_metrics.py
new file mode 100644 (file)
index 0000000..6f03292
--- /dev/null
@@ -0,0 +1,324 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime, timedelta
+import logging
+from math import ceil
+from typing import Dict, List
+
+from airflow import DAG
+from airflow.decorators import task
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.vim_connectors.openstack import OpenStackCollector
+from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
+
+
+SCHEDULE_INTERVAL = 5
+COLLECTOR_MAX_METRICS_PER_TASK = 100
+SUPPORTED_VIM_TYPES = ["openstack", "vio"]
+PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
+PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
+PROMETHEUS_METRICS = {
+    "cpu_utilization": {
+        "metric_name": "cpu_utilization",
+        "metric_descr": "CPU usage percentage",
+    },
+    "average_memory_utilization": {
+        "metric_name": "average_memory_utilization",
+        "metric_descr": "Volume of RAM in MB used by the VM",
+    },
+    "disk_read_ops": {
+        "metric_name": "disk_read_ops",
+        "metric_descr": "Number of read requests",
+    },
+    "disk_write_ops": {
+        "metric_name": "disk_write_ops",
+        "metric_descr": "Number of write requests",
+    },
+    "disk_read_bytes": {
+        "metric_name": "disk_read_bytes",
+        "metric_descr": "Volume of reads in bytes",
+    },
+    "disk_write_bytes": {
+        "metric_name": "disk_write_bytes",
+        "metric_descr": "Volume of writes in bytes",
+    },
+    "packets_received": {
+        "metric_name": "packets_received",
+        "metric_descr": "Number of incoming packets",
+    },
+    "packets_sent": {
+        "metric_name": "packets_sent",
+        "metric_descr": "Number of outgoing packets",
+    },
+    "packets_in_dropped": {
+        "metric_name": "packets_in_dropped",
+        "metric_descr": "Number of incoming dropped packets",
+    },
+    "packets_out_dropped": {
+        "metric_name": "packets_out_dropped",
+        "metric_descr": "Number of outgoing dropped packets",
+    },
+}
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+def get_all_vim():
+    """Get VIMs from MongoDB"""
+    logger.info("Getting VIM list")
+
+    cfg = Config()
+    logger.info(cfg.conf)
+    common_db = CommonDbClient(cfg)
+    vim_accounts = common_db.get_vim_accounts()
+    vim_list = []
+    for vim in vim_accounts:
+        logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
+        vim_list.append(
+            {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
+        )
+
+    logger.info(vim_list)
+    logger.info("Getting VIM list OK")
+    return vim_list
+
+
+def create_dag(dag_id, dag_number, dag_description, vim_id):
+    dag = DAG(
+        dag_id,
+        catchup=False,
+        default_args={
+            "depends_on_past": False,
+            "retries": 1,
+            # "retry_delay": timedelta(minutes=1),
+            "retry_delay": timedelta(seconds=10),
+        },
+        description=dag_description,
+        is_paused_upon_creation=False,
+        # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
+        schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
+        start_date=datetime(2022, 1, 1),
+        tags=["osm", "vdu"],
+    )
+
+    with dag:
+
+        @task(task_id="extract_metrics_from_vnfrs")
+        def extract_metrics_from_vnfrs(vim_id: str):
+            """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
+
+            # Get VNFDs that include "monitoring-parameter" from MongoDB
+            cfg = Config()
+            common_db = CommonDbClient(cfg)
+            logger.info("Getting VNFDs with monitoring parameters from MongoDB")
+            vnfd_list = common_db.get_monitoring_vnfds()
+            # Get VNFR list from MongoDB
+            logger.info("Getting VNFR list from MongoDB")
+            vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id)
+            # Only read metrics if ns state is one of the nsAllowedStatesSet
+            nsAllowedStatesSet = {"INSTANTIATED"}
+            metric_list = []
+            for vnfr in vnfr_list:
+                if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet:
+                    continue
+                # Check if VNFR is in "monitoring-parameter" VNFDs list
+                vnfd_id = vnfr["vnfd-id"]
+                vnfd = next(
+                    (item for item in vnfd_list if item["_id"] == vnfd_id), None
+                )
+                if not vnfd:
+                    continue
+                ns_id = vnfr["nsr-id-ref"]
+                vnf_index = vnfr["member-vnf-index-ref"]
+                logger.info(
+                    f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
+                )
+                project_list = vnfr.get("_admin", {}).get("projects_read", [])
+                project_id = "None"
+                if project_list:
+                    project_id = project_list[0]
+                for vdur in vnfr.get("vdur", []):
+                    vim_info = vdur.get("vim_info")
+                    if not vim_info:
+                        logger.error("Error: vim_info not available in vdur")
+                        continue
+                    if len(vim_info) != 1:
+                        logger.error("Error: more than one vim_info in vdur")
+                        continue
+                    vim_id = next(iter(vim_info))[4:]
+                    vm_id = vdur.get("vim-id")
+                    if not vm_id:
+                        logger.error("Error: vim-id not available in vdur")
+                        continue
+                    vdu_name = vdur.get("name", "UNKNOWN")
+                    vdu = next(
+                        filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"])
+                    )
+                    if "monitoring-parameter" not in vdu:
+                        logger.error("Error: no monitoring-parameter in descriptor")
+                        continue
+                    for param in vdu["monitoring-parameter"]:
+                        metric_name = param["performance-metric"]
+                        metric_id = param["id"]
+                        metric = {
+                            "metric": metric_name,
+                            "metric_id": metric_id,
+                            "vm_id": vm_id,
+                            "ns_id": ns_id,
+                            "project_id": project_id,
+                            "vdu_name": vdu_name,
+                            "vnf_member_index": vnf_index,
+                            "vdu_id": vdu["id"],
+                        }
+                        metric_list.append(metric)
+
+            logger.info(f"Metrics to collect: {len(metric_list)}")
+            return metric_list
+
+        @task(task_id="split_metrics")
+        def split_metrics(metric_list: List[Dict]):
+            """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
+            n_metrics = len(metric_list)
+            if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK:
+                return [metric_list]
+            metrics_per_chunk = ceil(
+                n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK)
+            )
+            logger.info(
+                f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
+            )
+            chunks = []
+            for i in range(0, n_metrics, metrics_per_chunk):
+                chunks.append(metric_list[i : i + metrics_per_chunk])
+            return chunks
+
+        @task(task_id="collect_metrics")
+        def collect_metrics(vim_id: str, metric_list: List[Dict]):
+            """Collect servers metrics"""
+            if not metric_list:
+                return []
+
+            # Get VIM account info from MongoDB
+            logger.info(f"Reading VIM info, id: {vim_id}")
+            cfg = Config()
+            common_db = CommonDbClient(cfg)
+            vim_account = common_db.get_vim_account(vim_account_id=vim_id)
+            # Create VIM metrics collector
+            vim_type = vim_account["vim_type"]
+            if "config" in vim_account and "vim_type" in vim_account["config"]:
+                vim_type = vim_account["config"]["vim_type"].lower()
+                if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
+                    vim_type = "openstack"
+            if vim_type == "openstack":
+                collector = OpenStackCollector(vim_account)
+            else:
+                logger.error(f"VIM type '{vim_type}' not supported")
+                return None
+            # Get metrics
+            results = []
+            if collector:
+                results = collector.collect_metrics(metric_list)
+            logger.info(results)
+            return results
+
+        @task(task_id="send_prometheus")
+        def send_prometheus(metric_lists: List[List[Dict]]):
+            """Send servers metrics to Prometheus Push Gateway"""
+            logger.info(metric_lists)
+
+            # Define Prometheus metrics
+            registry = CollectorRegistry()
+            prom_metrics = {}
+            prom_metrics_keys = PROMETHEUS_METRICS.keys()
+            for key in prom_metrics_keys:
+                prom_metrics[key] = Gauge(
+                    PROMETHEUS_METRICS[key]["metric_name"],
+                    PROMETHEUS_METRICS[key]["metric_descr"],
+                    labelnames=[
+                        "metric_id",
+                        "ns_id",
+                        "project_id",
+                        "vnf_member_index",
+                        "vm_id",
+                        "vim_id",
+                        "vdu_name",
+                        "vdu_id",
+                    ],
+                    registry=registry,
+                )
+
+            for metric_list in metric_lists:
+                for metric in metric_list:
+                    metric_name = metric["metric"]
+                    metric_id = metric["metric_id"]
+                    value = metric["value"]
+                    vm_id = metric["vm_id"]
+                    vm_name = metric.get("vdu_name", "")
+                    ns_id = metric["ns_id"]
+                    project_id = metric["project_id"]
+                    vnf_index = metric["vnf_member_index"]
+                    vdu_id = metric["vdu_id"]
+                    logger.info(
+                        f"  metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
+                    )
+                    if metric_name in prom_metrics_keys:
+                        prom_metrics[metric_name].labels(
+                            metric_id,
+                            ns_id,
+                            project_id,
+                            vnf_index,
+                            vm_id,
+                            vim_id,
+                            vm_name,
+                            vdu_id,
+                        ).set(value)
+
+            # Push to Prometheus
+            push_to_gateway(
+                gateway=PROMETHEUS_PUSHGW,
+                job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
+                registry=registry,
+            )
+            return
+
+        chunks = split_metrics(extract_metrics_from_vnfrs(vim_id))
+        send_prometheus(
+            collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks)
+        )
+
+    return dag
+
+
+vim_list = get_all_vim()
+for index, vim in enumerate(vim_list):
+    vim_type = vim["vim_type"]
+    if vim_type in SUPPORTED_VIM_TYPES:
+        vim_id = vim["_id"]
+        vim_name = vim["name"]
+        dag_description = f"Dag for collecting VM metrics from VIM {vim_name}"
+        dag_id = f"vm_metrics_vim_{vim_id}"
+        logger.info(f"Creating DAG {dag_id}")
+        globals()[dag_id] = create_dag(
+            dag_id=dag_id,
+            dag_number=index,
+            dag_description=dag_description,
+            vim_id=vim_id,
+        )
+    else:
+        logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics")
diff --git a/src/osm_ngsa/dags/scalein_vdu.py b/src/osm_ngsa/dags/scalein_vdu.py
new file mode 100644 (file)
index 0000000..c5daefd
--- /dev/null
@@ -0,0 +1,211 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+    catchup=False,
+    default_args={
+        "depends_on_past": False,
+        "retries": 1,
+        "retry_delay": timedelta(seconds=15),
+    },
+    description="Webhook callback for scale-in alarm from Prometheus AlertManager",
+    is_paused_upon_creation=False,
+    schedule_interval=None,
+    start_date=datetime(2022, 1, 1),
+    tags=["osm", "webhook"],
+)
+def scalein_vdu():
+    @task(task_id="main_task")
+    def main_task():
+        logger.debug("Running main task...")
+        # Read input parameters
+        context = get_current_context()
+        conf = context["dag_run"].conf
+        for alarm in conf["alerts"]:
+            logger.info("Scale-in alarm:")
+            status = alarm["status"]
+            logger.info(f"  status: {status}")
+            logger.info(f'  annotations: {alarm["annotations"]}')
+            logger.info(f'  startsAt: {alarm["startsAt"]}')
+            logger.info(f'  endsAt: {alarm["endsAt"]}')
+            logger.info(f'  labels: {alarm["labels"]}')
+            alertname = alarm["labels"].get("alertname")
+            if not alertname.startswith("scalein_"):
+                continue
+            # scalein_vdu alert type
+            config = Config()
+            common_db = CommonDbClient(config)
+            ns_id = alarm["labels"]["ns_id"]
+            vdu_id = alarm["labels"]["vdu_id"]
+            vnf_member_index = alarm["labels"]["vnf_member_index"]
+            if status == "firing":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching scale-in alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                    f"vdu_id {vdu_id}, "
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id,
+                    vnf_member_index=vnf_member_index,
+                    vdu_id=vdu_id,
+                    vdu_name=None,
+                    action_type="scale_in",
+                )
+                if alert:
+                    logger.info("Found an alert rule:")
+                    logger.info(alert)
+                    # Update alert status
+                    common_db.update_alert_status(
+                        uuid=alert["uuid"], alarm_status="alarm"
+                    )
+                    # Get VNFR from MongoDB
+                    vnfr = common_db.get_vnfr(
+                        nsr_id=ns_id, member_index=vnf_member_index
+                    )
+                    logger.info(
+                        f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+                    )
+                    # Check cooldown-time before scale-in
+                    send_lcm = 1
+                    if "cooldown-time" in alert["action"]:
+                        cooldown_time = alert["action"]["cooldown-time"]
+                        cooldown_time = cooldown_time * 60
+                        now = time.time()
+                        since = now - cooldown_time
+                        logger.info(
+                            f"Looking for scale operations in cooldown interval ({cooldown_time} s)"
+                        )
+                        nslcmops = common_db.get_nslcmop(
+                            nsr_id=ns_id, operation_type="scale", since=since
+                        )
+                        op = next(
+                            (
+                                sub
+                                for sub in nslcmops
+                                if ("scaleVnfData" in sub["operationParams"])
+                                and (
+                                    "scaleByStepData"
+                                    in sub["operationParams"]["scaleVnfData"]
+                                )
+                                and (
+                                    "member-vnf-index"
+                                    in sub["operationParams"]["scaleVnfData"][
+                                        "scaleByStepData"
+                                    ]
+                                )
+                                and (
+                                    sub["operationParams"]["scaleVnfData"][
+                                        "scaleByStepData"
+                                    ]["member-vnf-index"]
+                                    == vnf_member_index
+                                )
+                            ),
+                            None,
+                        )
+                        if op:
+                            logger.info(
+                                f"No scale-in will be launched, found a previous scale operation in cooldown interval: {op}"
+                            )
+                            send_lcm = 0
+
+                    if send_lcm:
+                        # Save nslcmop object in MongoDB
+                        msg_bus = MessageBusClient(config)
+                        loop = asyncio.get_event_loop()
+                        _id = str(uuid.uuid4())
+                        now = time.time()
+                        projects_read = vnfr["_admin"]["projects_read"]
+                        projects_write = vnfr["_admin"]["projects_write"]
+                        scaling_group = alert["action"]["scaling-group"]
+                        params = {
+                            "scaleType": "SCALE_VNF",
+                            "scaleVnfData": {
+                                "scaleVnfType": "SCALE_IN",
+                                "scaleByStepData": {
+                                    "scaling-group-descriptor": scaling_group,
+                                    "member-vnf-index": vnf_member_index,
+                                },
+                            },
+                            "scaleTime": "{}Z".format(datetime.utcnow().isoformat()),
+                        }
+                        nslcmop = {
+                            "id": _id,
+                            "_id": _id,
+                            "operationState": "PROCESSING",
+                            "statusEnteredTime": now,
+                            "nsInstanceId": ns_id,
+                            "lcmOperationType": "scale",
+                            "startTime": now,
+                            "isAutomaticInvocation": True,
+                            "operationParams": params,
+                            "isCancelPending": False,
+                            "links": {
+                                "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+                                "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+                            },
+                            "_admin": {
+                                "projects_read": projects_read,
+                                "projects_write": projects_write,
+                            },
+                        }
+                        common_db.create_nslcmop(nslcmop)
+                        # Send Kafka message to LCM
+                        logger.info("Sending scale-in action message:")
+                        logger.info(nslcmop)
+                        loop.run_until_complete(
+                            msg_bus.aiowrite("ns", "scale", nslcmop)
+                        )
+                else:
+                    logger.info("No alert rule was found")
+            elif status == "resolved":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id,
+                    vnf_member_index=vnf_member_index,
+                    vdu_id=vdu_id,
+                    vdu_name=None,
+                    action_type="scale_in",
+                )
+                if alert:
+                    logger.info("Found an alert rule, updating status")
+                    # Update alert status
+                    common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+    main_task()
+
+
+dag = scalein_vdu()
diff --git a/src/osm_ngsa/dags/scaleout_vdu.py b/src/osm_ngsa/dags/scaleout_vdu.py
new file mode 100644 (file)
index 0000000..978ab3f
--- /dev/null
@@ -0,0 +1,210 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+    catchup=False,
+    default_args={
+        "depends_on_past": False,
+        "retries": 1,
+        "retry_delay": timedelta(seconds=15),
+    },
+    description="Webhook callback for scale-out alarm from Prometheus AlertManager",
+    is_paused_upon_creation=False,
+    schedule_interval=None,
+    start_date=datetime(2022, 1, 1),
+    tags=["osm", "webhook"],
+)
+def scaleout_vdu():
+    @task(task_id="main_task")
+    def main_task():
+        logger.debug("Running main task...")
+        # Read input parameters
+        context = get_current_context()
+        conf = context["dag_run"].conf
+        for alarm in conf["alerts"]:
+            logger.info("Scale-out alarm:")
+            status = alarm["status"]
+            logger.info(f"  status: {status}")
+            logger.info(f'  annotations: {alarm["annotations"]}')
+            logger.info(f'  startsAt: {alarm["startsAt"]}')
+            logger.info(f'  endsAt: {alarm["endsAt"]}')
+            logger.info(f'  labels: {alarm["labels"]}')
+            alertname = alarm["labels"].get("alertname")
+            if not alertname.startswith("scaleout_"):
+                continue
+            # scaleout_vdu alert type
+            config = Config()
+            common_db = CommonDbClient(config)
+            ns_id = alarm["labels"]["ns_id"]
+            vdu_id = alarm["labels"]["vdu_id"]
+            vnf_member_index = alarm["labels"]["vnf_member_index"]
+            if status == "firing":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching scale-out alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                    f"vdu_id {vdu_id}, "
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id,
+                    vnf_member_index=vnf_member_index,
+                    vdu_id=vdu_id,
+                    vdu_name=None,
+                    action_type="scale_out",
+                )
+                if alert:
+                    logger.info("Found an alert rule:")
+                    logger.info(alert)
+                    # Update alert status
+                    common_db.update_alert_status(
+                        uuid=alert["uuid"], alarm_status="alarm"
+                    )
+                    # Get VNFR from MongoDB
+                    vnfr = common_db.get_vnfr(
+                        nsr_id=ns_id, member_index=vnf_member_index
+                    )
+                    logger.info(
+                        f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+                    )
+                    # Check cooldown-time before scale-out
+                    send_lcm = 1
+                    if "cooldown-time" in alert["action"]:
+                        cooldown_time = alert["action"]["cooldown-time"]
+                        cooldown_time = cooldown_time * 60
+                        now = time.time()
+                        since = now - cooldown_time
+                        logger.info(
+                            f"Looking for scale operations in cooldown interval ({cooldown_time} s)"
+                        )
+                        nslcmops = common_db.get_nslcmop(
+                            nsr_id=ns_id, operation_type="scale", since=since
+                        )
+                        op = next(
+                            (
+                                sub
+                                for sub in nslcmops
+                                if ("scaleVnfData" in sub["operationParams"])
+                                and (
+                                    "scaleByStepData"
+                                    in sub["operationParams"]["scaleVnfData"]
+                                )
+                                and (
+                                    "member-vnf-index"
+                                    in sub["operationParams"]["scaleVnfData"][
+                                        "scaleByStepData"
+                                    ]
+                                )
+                                and (
+                                    sub["operationParams"]["scaleVnfData"][
+                                        "scaleByStepData"
+                                    ]["member-vnf-index"]
+                                    == vnf_member_index
+                                )
+                            ),
+                            None,
+                        )
+                        if op:
+                            logger.info(
+                                f"No scale-out will be launched, found a previous scale operation in cooldown interval: {op}"
+                            )
+                            send_lcm = 0
+
+                    if send_lcm:
+                        # Save nslcmop object in MongoDB
+                        msg_bus = MessageBusClient(config)
+                        loop = asyncio.get_event_loop()
+                        _id = str(uuid.uuid4())
+                        projects_read = vnfr["_admin"]["projects_read"]
+                        projects_write = vnfr["_admin"]["projects_write"]
+                        scaling_group = alert["action"]["scaling-group"]
+                        params = {
+                            "scaleType": "SCALE_VNF",
+                            "scaleVnfData": {
+                                "scaleVnfType": "SCALE_OUT",
+                                "scaleByStepData": {
+                                    "scaling-group-descriptor": scaling_group,
+                                    "member-vnf-index": vnf_member_index,
+                                },
+                            },
+                            "scaleTime": "{}Z".format(datetime.utcnow().isoformat()),
+                        }
+                        nslcmop = {
+                            "id": _id,
+                            "_id": _id,
+                            "operationState": "PROCESSING",
+                            "statusEnteredTime": now,
+                            "nsInstanceId": ns_id,
+                            "lcmOperationType": "scale",
+                            "startTime": now,
+                            "isAutomaticInvocation": True,
+                            "operationParams": params,
+                            "isCancelPending": False,
+                            "links": {
+                                "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+                                "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+                            },
+                            "_admin": {
+                                "projects_read": projects_read,
+                                "projects_write": projects_write,
+                            },
+                        }
+                        common_db.create_nslcmop(nslcmop)
+                        # Send Kafka message to LCM
+                        logger.info("Sending scale-out action message:")
+                        logger.info(nslcmop)
+                        loop.run_until_complete(
+                            msg_bus.aiowrite("ns", "scale", nslcmop)
+                        )
+                else:
+                    logger.info("No alert rule was found")
+            elif status == "resolved":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id,
+                    vnf_member_index=vnf_member_index,
+                    vdu_id=vdu_id,
+                    vdu_name=None,
+                    action_type="scale_out",
+                )
+                if alert:
+                    logger.info("Found an alert rule, updating status")
+                    # Update alert status
+                    common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+    main_task()
+
+
+dag = scaleout_vdu()
index 93254b1..465bb0d 100644 (file)
@@ -54,6 +54,14 @@ class CommonDbClient:
         nsr = self.common_db.get_one("nsrs", {"id": nsr_id})
         return nsr
 
+    def get_vnfds(self):
+        return self.common_db.get_list("vnfds")
+
+    def get_monitoring_vnfds(self):
+        return self.common_db.get_list(
+            "vnfds", {"vdu.monitoring-parameter": {"$exists": "true"}}
+        )
+
     def decrypt_vim_password(self, vim_password: str, schema_version: str, vim_id: str):
         return self.common_db.decrypt(vim_password, schema_version, vim_id)
 
@@ -89,14 +97,25 @@ class CommonDbClient:
                     )
         return vim_account
 
-    def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str):
+    def get_alert(
+        self,
+        nsr_id: str,
+        vnf_member_index: str,
+        vdu_id: str,
+        vdu_name: str,
+        action_type: str,
+    ):
+        q_filter = {"action_type": action_type}
+        if nsr_id:
+            q_filter["tags.ns_id"] = nsr_id
+        if vnf_member_index:
+            q_filter["tags.vnf_member_index"] = vnf_member_index
+        if vdu_id:
+            q_filter["tags.vdu_id"] = vdu_id
+        if vdu_name:
+            q_filter["tags.vdu_name"] = vdu_name
         alert = self.common_db.get_one(
-            "alerts",
-            {
-                "tags.ns_id": nsr_id,
-                "tags.vnf_member_index": vnf_member_index,
-                "tags.vdu_name": vdu_name,
-            },
+            table="alerts", q_filter=q_filter, fail_on_empty=False
         )
         return alert
 
@@ -108,3 +127,14 @@ class CommonDbClient:
 
     def create_nslcmop(self, nslcmop: dict):
         self.common_db.create("nslcmops", nslcmop)
+
+    def get_nslcmop(self, nsr_id: str, operation_type: str, since: str):
+        q_filter = {}
+        if nsr_id:
+            q_filter["nsInstanceId"] = nsr_id
+        if operation_type:
+            q_filter["lcmOperationType"] = operation_type
+        if since:
+            q_filter["startTime"] = {"$gt": since}
+        ops = self.common_db.get_list(table="nslcmops", q_filter=q_filter)
+        return ops
index d37973d..1eb33af 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
+from enum import Enum
 import logging
+import time
 from typing import Dict, List
 
+from ceilometerclient import client as ceilometer_client
+from ceilometerclient.exc import HTTPException
+import gnocchiclient.exceptions
+from gnocchiclient.v1 import client as gnocchi_client
 from keystoneauth1 import session
+from keystoneauth1.exceptions.catalog import EndpointNotFound
 from keystoneauth1.identity import v3
 from novaclient import client as nova_client
 from osm_mon.vim_connectors.base_vim import VIMConnector
+from prometheus_api_client import PrometheusConnect as prometheus_client
 
 log = logging.getLogger(__name__)
 
+METRIC_MULTIPLIERS = {"cpu": 0.0000001}
+
+METRIC_AGGREGATORS = {"cpu": "rate:mean"}
+
+INTERFACE_METRICS = [
+    "packets_in_dropped",
+    "packets_out_dropped",
+    "packets_received",
+    "packets_sent",
+]
+
+INSTANCE_DISK = [
+    "disk_read_ops",
+    "disk_write_ops",
+    "disk_read_bytes",
+    "disk_write_bytes",
+]
+
+METRIC_MAPPINGS = {
+    "average_memory_utilization": "memory.usage",
+    "disk_read_ops": "disk.device.read.requests",
+    "disk_write_ops": "disk.device.write.requests",
+    "disk_read_bytes": "disk.device.read.bytes",
+    "disk_write_bytes": "disk.device.write.bytes",
+    "packets_in_dropped": "network.outgoing.packets.drop",
+    "packets_out_dropped": "network.incoming.packets.drop",
+    "packets_received": "network.incoming.packets",
+    "packets_sent": "network.outgoing.packets",
+    "cpu_utilization": "cpu",
+}
+
+METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD = {
+    "cpu_utilization": "cpu",
+    "average_memory_utilization": "memory_usage",
+    "disk_read_ops": "disk_device_read_requests",
+    "disk_write_ops": "disk_device_write_requests",
+    "disk_read_bytes": "disk_device_read_bytes",
+    "disk_write_bytes": "disk_device_write_bytes",
+    "packets_in_dropped": "network_incoming_packets_drop",
+    "packets_out_dropped": "network_outgoing_packets_drop",
+    "packets_received": "network_incoming_packets",
+    "packets_sent": "network_outgoing_packets",
+}
+
+
+class MetricType(Enum):
+    INSTANCE = "instance"
+    INTERFACE_ALL = "interface_all"
+    INTERFACE_ONE = "interface_one"
+    INSTANCEDISK = "instancedisk"
+
 
 class CertificateNotCreated(Exception):
     pass
@@ -31,13 +90,16 @@ class CertificateNotCreated(Exception):
 
 class OpenStackCollector(VIMConnector):
     def __init__(self, vim_account: Dict):
-        log.info("__init__")
+        log.debug("__init__")
         self.vim_account = vim_account
         self.vim_session = None
         self.vim_session = self._get_session(vim_account)
         self.nova = self._build_nova_client()
+        # self.gnocchi = self._build_gnocchi_client()
+        self.backend = self._get_backend(vim_account, self.vim_session)
 
     def _get_session(self, creds: Dict):
+        log.debug("_get_session")
         verify_ssl = True
         project_domain_name = "Default"
         user_domain_name = "Default"
@@ -67,11 +129,35 @@ class OpenStackCollector(VIMConnector):
         except CertificateNotCreated as e:
             log.error(e)
 
+    def _get_backend(self, vim_account: dict, vim_session: object):
+        if vim_account.get("prometheus-config"):
+            # try:
+            #     tsbd = PrometheusTSBDBackend(vim_account)
+            #     log.debug("Using prometheustsbd backend to collect metric")
+            #     return tsbd
+            # except Exception as e:
+            #     log.error(f"Can't create prometheus client, {e}")
+            #     return None
+            return None
+        try:
+            gnocchi = GnocchiBackend(vim_account, vim_session)
+            gnocchi.client.metric.list(limit=1)
+            log.debug("Using gnocchi backend to collect metric")
+            return gnocchi
+        except (HTTPException, EndpointNotFound):
+            ceilometer = CeilometerBackend(vim_account, vim_session)
+            ceilometer.client.capabilities.get()
+            log.debug("Using ceilometer backend to collect metric")
+            return ceilometer
+
     def _build_nova_client(self) -> nova_client.Client:
         return nova_client.Client("2", session=self.vim_session, timeout=10)
 
+    def _build_gnocchi_client(self) -> gnocchi_client.Client:
+        return gnocchi_client.Client(session=self.vim_session)
+
     def collect_servers_status(self) -> List[Dict]:
-        log.info("collect_servers_status")
+        log.debug("collect_servers_status")
         servers = []
         for server in self.nova.servers.list(detailed=True):
             vm = {
@@ -83,9 +169,252 @@ class OpenStackCollector(VIMConnector):
         return servers
 
     def is_vim_ok(self) -> bool:
+        log.debug("is_vim_ok")
         try:
             self.nova.servers.list()
             return True
         except Exception as e:
             log.warning("VIM status is not OK: %s" % e)
             return False
+
+    def _get_metric_type(self, metric_name: str) -> MetricType:
+        if metric_name not in INTERFACE_METRICS:
+            if metric_name not in INSTANCE_DISK:
+                return MetricType.INSTANCE
+            else:
+                return MetricType.INSTANCEDISK
+        else:
+            return MetricType.INTERFACE_ALL
+
+    def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+        log.debug("collect_metrics")
+        if not self.backend:
+            log.error("Undefined backend")
+            return []
+
+        if type(self.backend) is PrometheusTSBDBackend:
+            log.info("Using Prometheus as backend (NOT SUPPORTED)")
+            return []
+
+        metric_results = []
+        for metric in metric_list:
+            server = metric["vm_id"]
+            metric_name = metric["metric"]
+            openstack_metric_name = METRIC_MAPPINGS[metric_name]
+            metric_type = self._get_metric_type(metric_name)
+            log.info(f"Collecting metric {openstack_metric_name} for {server}")
+            try:
+                value = self.backend.collect_metric(
+                    metric_type, openstack_metric_name, server
+                )
+                if value is not None:
+                    log.info(f"value: {value}")
+                    metric["value"] = value
+                    metric_results.append(metric)
+                else:
+                    log.info("metric value is empty")
+            except Exception as e:
+                log.error("Error in metric collection: %s" % e)
+        return metric_results
+
+
+class OpenstackBackend:
+    def collect_metric(
+        self, metric_type: MetricType, metric_name: str, resource_id: str
+    ):
+        pass
+
+
+class PrometheusTSBDBackend(OpenstackBackend):
+    def __init__(self, vim_account: dict):
+        self.map = self._build_map(vim_account)
+        self.cred = vim_account["prometheus-config"].get("prometheus-cred")
+        self.client = self._build_prometheus_client(
+            vim_account["prometheus-config"]["prometheus-url"]
+        )
+
+    def _build_prometheus_client(self, url: str) -> prometheus_client:
+        return prometheus_client(url, disable_ssl=True)
+
+    def _build_map(self, vim_account: dict) -> dict:
+        custom_map = METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD
+        if "prometheus-map" in vim_account["prometheus-config"]:
+            custom_map.update(vim_account["prometheus-config"]["prometheus-map"])
+        return custom_map
+
+    def collect_metric(
+        self, metric_type: MetricType, metric_name: str, resource_id: str
+    ):
+        metric = self.query_metric(metric_name, resource_id)
+        return metric["value"][1] if metric else None
+
+    def map_metric(self, metric_name: str):
+        return self.map[metric_name]
+
+    def query_metric(self, metric_name, resource_id=None):
+        metrics = self.client.get_current_metric_value(metric_name=metric_name)
+        if resource_id:
+            metric = next(
+                filter(lambda x: resource_id in x["metric"]["resource_id"], metrics)
+            )
+            return metric
+        return metrics
+
+
+class GnocchiBackend(OpenstackBackend):
+    def __init__(self, vim_account: dict, vim_session: object):
+        self.client = self._build_gnocchi_client(vim_account, vim_session)
+
+    def _build_gnocchi_client(
+        self, vim_account: dict, vim_session: object
+    ) -> gnocchi_client.Client:
+        return gnocchi_client.Client(session=vim_session)
+
+    def collect_metric(
+        self, metric_type: MetricType, metric_name: str, resource_id: str
+    ):
+        if metric_type == MetricType.INTERFACE_ALL:
+            return self._collect_interface_all_metric(metric_name, resource_id)
+
+        elif metric_type == MetricType.INSTANCE:
+            return self._collect_instance_metric(metric_name, resource_id)
+
+        elif metric_type == MetricType.INSTANCEDISK:
+            return self._collect_instance_disk_metric(metric_name, resource_id)
+
+        else:
+            raise Exception("Unknown metric type %s" % metric_type.value)
+
+    def _collect_interface_all_metric(self, openstack_metric_name, resource_id):
+        total_measure = None
+        interfaces = self.client.resource.search(
+            resource_type="instance_network_interface",
+            query={"=": {"instance_id": resource_id}},
+        )
+        for interface in interfaces:
+            try:
+                measures = self.client.metric.get_measures(
+                    openstack_metric_name, resource_id=interface["id"], limit=1
+                )
+                if measures:
+                    if not total_measure:
+                        total_measure = 0.0
+                    total_measure += measures[-1][2]
+            except (gnocchiclient.exceptions.NotFound, TypeError) as e:
+                # Gnocchi in some Openstack versions raise TypeError instead of NotFound
+                log.debug(
+                    "No metric %s found for interface %s: %s",
+                    openstack_metric_name,
+                    interface["id"],
+                    e,
+                )
+        return total_measure
+
+    def _collect_instance_disk_metric(self, openstack_metric_name, resource_id):
+        value = None
+        instances = self.client.resource.search(
+            resource_type="instance_disk",
+            query={"=": {"instance_id": resource_id}},
+        )
+        for instance in instances:
+            try:
+                measures = self.client.metric.get_measures(
+                    openstack_metric_name, resource_id=instance["id"], limit=1
+                )
+                if measures:
+                    value = measures[-1][2]
+
+            except gnocchiclient.exceptions.NotFound as e:
+                log.debug(
+                    "No metric %s found for instance disk %s: %s",
+                    openstack_metric_name,
+                    instance["id"],
+                    e,
+                )
+        return value
+
+    def _collect_instance_metric(self, openstack_metric_name, resource_id):
+        value = None
+        try:
+            aggregation = METRIC_AGGREGATORS.get(openstack_metric_name)
+
+            try:
+                measures = self.client.metric.get_measures(
+                    openstack_metric_name,
+                    aggregation=aggregation,
+                    start=time.time() - 1200,
+                    resource_id=resource_id,
+                )
+                if measures:
+                    value = measures[-1][2]
+            except (
+                gnocchiclient.exceptions.NotFound,
+                gnocchiclient.exceptions.BadRequest,
+                TypeError,
+            ) as e:
+                # CPU metric in previous Openstack versions do not support rate:mean aggregation method
+                # Gnocchi in some Openstack versions raise TypeError instead of NotFound or BadRequest
+                if openstack_metric_name == "cpu":
+                    log.debug(
+                        "No metric %s found for instance %s: %s",
+                        openstack_metric_name,
+                        resource_id,
+                        e,
+                    )
+                    log.info(
+                        "Retrying to get metric %s for instance %s without aggregation",
+                        openstack_metric_name,
+                        resource_id,
+                    )
+                    measures = self.client.metric.get_measures(
+                        openstack_metric_name, resource_id=resource_id, limit=1
+                    )
+                else:
+                    raise e
+                # measures[-1] is the last measure
+                # measures[-2] is the previous measure
+                # measures[x][2] is the value of the metric
+                if measures and len(measures) >= 2:
+                    value = measures[-1][2] - measures[-2][2]
+            if value:
+                # measures[-1][0] is the time of the reporting interval
+                # measures[-1][1] is the duration of the reporting interval
+                if aggregation:
+                    # If this is an aggregate, we need to divide the total over the reported time period.
+                    # Even if the aggregation method is not supported by Openstack, the code will execute it
+                    # because aggregation is specified in METRIC_AGGREGATORS
+                    value = value / measures[-1][1]
+                if openstack_metric_name in METRIC_MULTIPLIERS:
+                    value = value * METRIC_MULTIPLIERS[openstack_metric_name]
+        except gnocchiclient.exceptions.NotFound as e:
+            log.debug(
+                "No metric %s found for instance %s: %s",
+                openstack_metric_name,
+                resource_id,
+                e,
+            )
+        return value
+
+
+class CeilometerBackend(OpenstackBackend):
+    def __init__(self, vim_account: dict, vim_session: object):
+        self.client = self._build_ceilometer_client(vim_account, vim_session)
+
+    def _build_ceilometer_client(
+        self, vim_account: dict, vim_session: object
+    ) -> ceilometer_client.Client:
+        return ceilometer_client.Client("2", session=vim_session)
+
+    def collect_metric(
+        self, metric_type: MetricType, metric_name: str, resource_id: str
+    ):
+        if metric_type != MetricType.INSTANCE:
+            raise NotImplementedError(
+                "Ceilometer backend only support instance metrics"
+            )
+        measures = self.client.samples.list(
+            meter_name=metric_name,
+            limit=1,
+            q=[{"field": "resource_id", "op": "eq", "value": resource_id}],
+        )
+        return measures[0].counter_volume if measures else None