Feature 10981: added autohealing DAG and updated requirements 98/13098/10
authoraguilard <e.dah.tid@telefonica.com>
Mon, 27 Mar 2023 11:51:10 +0000 (11:51 +0000)
committeraguilard <e.dah.tid@telefonica.com>
Tue, 11 Apr 2023 14:04:23 +0000 (14:04 +0000)
Change-Id: Ib1ed56c220969d54480ddd2382beae03e536b72b
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
requirements-dist.txt
requirements.in
requirements.txt
src/osm_ngsa/dags/alert_vdu.py [new file with mode: 0644]
src/osm_ngsa/osm_mon/core/common_db.py
src/osm_ngsa/osm_mon/core/config.yaml
src/osm_ngsa/osm_mon/core/message_bus_client.py [new file with mode: 0644]

index cd29f8a..7b15e9f 100644 (file)
@@ -22,7 +22,7 @@ stdeb==0.10.0
     # via -r requirements-dist.in
 tomli==2.0.1
     # via setuptools-scm
-typing-extensions==4.4.0
+typing-extensions==4.5.0
     # via setuptools-scm
 
 # The following packages are considered to be unsafe in a requirements file:
index 6d8f0a8..6779aef 100644 (file)
 azure-common
 azure-identity
 azure-mgmt-compute
+gnocchiclient
 google-api-python-client
 google-auth
 prometheus-client
+python-ceilometerclient
 python-keystoneclient
 python-novaclient
 pyyaml==5.4.1
index 59f9d17..fc34d3f 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
+attrs==22.2.0
+    # via cmd2
+autopage==0.5.1
+    # via cliff
 azure-common==1.1.28
     # via
     #   -r requirements.in
     #   azure-mgmt-compute
-azure-core==1.26.2
+azure-core==1.26.4
     # via
     #   azure-identity
     #   azure-mgmt-core
@@ -27,7 +31,7 @@ azure-identity==1.12.0
     # via -r requirements.in
 azure-mgmt-compute==29.1.0
     # via -r requirements.in
-azure-mgmt-core==1.3.2
+azure-mgmt-core==1.4.0
     # via azure-mgmt-compute
 cachetools==5.3.0
     # via google-auth
@@ -37,23 +41,32 @@ certifi==2022.12.7
     #   requests
 cffi==1.15.1
     # via cryptography
-charset-normalizer==3.0.1
+charset-normalizer==3.1.0
     # via requests
-cryptography==39.0.0
+cliff==4.2.0
+    # via gnocchiclient
+cmd2==2.4.3
+    # via cliff
+cryptography==40.0.1
     # via
     #   azure-identity
     #   msal
     #   pyjwt
 debtcollector==2.5.0
     # via
+    #   gnocchiclient
     #   oslo-config
     #   oslo-utils
     #   python-keystoneclient
+futurist==2.4.1
+    # via gnocchiclient
+gnocchiclient==7.0.8
+    # via -r requirements.in
 google-api-core==2.11.0
     # via google-api-python-client
-google-api-python-client==2.74.0
+google-api-python-client==2.84.0
     # via -r requirements.in
-google-auth==2.16.0
+google-auth==2.17.2
     # via
     #   -r requirements.in
     #   google-api-core
@@ -61,32 +74,38 @@ google-auth==2.16.0
     #   google-auth-httplib2
 google-auth-httplib2==0.1.0
     # via google-api-python-client
-googleapis-common-protos==1.58.0
+googleapis-common-protos==1.59.0
     # via google-api-core
-httplib2==0.21.0
+httplib2==0.22.0
     # via
     #   google-api-python-client
     #   google-auth-httplib2
 idna==3.4
     # via requests
+importlib-metadata==6.3.0
+    # via cliff
 iso8601==1.1.0
     # via
+    #   gnocchiclient
     #   keystoneauth1
     #   oslo-utils
+    #   python-ceilometerclient
     #   python-novaclient
 isodate==0.6.1
     # via msrest
-keystoneauth1==5.1.1
+keystoneauth1==5.1.2
     # via
+    #   gnocchiclient
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
-msal==1.20.0
+msal==1.21.0
     # via
     #   azure-identity
     #   msal-extensions
 msal-extensions==1.0.0
     # via azure-identity
-msgpack==1.0.4
+msgpack==1.0.5
     # via oslo-serialization
 msrest==0.7.1
     # via azure-mgmt-compute
@@ -100,21 +119,24 @@ oauthlib==3.2.2
     # via requests-oauthlib
 os-service-types==1.7.0
     # via keystoneauth1
-oslo-config==9.1.0
+oslo-config==9.1.1
     # via python-keystoneclient
-oslo-i18n==5.1.0
+oslo-i18n==6.0.0
     # via
     #   oslo-config
     #   oslo-utils
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
-oslo-serialization==5.0.0
+oslo-serialization==5.1.1
     # via
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
 oslo-utils==6.1.0
     # via
     #   oslo-serialization
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
 packaging==23.0
@@ -127,16 +149,20 @@ pbr==5.11.1
     #   os-service-types
     #   oslo-i18n
     #   oslo-serialization
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
     #   stevedore
 portalocker==2.7.0
     # via msal-extensions
-prettytable==3.6.0
-    # via python-novaclient
+prettytable==0.7.2
+    # via
+    #   cliff
+    #   python-ceilometerclient
+    #   python-novaclient
 prometheus-client==0.16.0
     # via -r requirements.in
-protobuf==4.21.12
+protobuf==4.22.1
     # via
     #   google-api-core
     #   googleapis-common-protos
@@ -154,17 +180,24 @@ pyparsing==3.0.9
     # via
     #   httplib2
     #   oslo-utils
-python-keystoneclient==5.0.1
+pyperclip==1.8.2
+    # via cmd2
+python-ceilometerclient==2.9.0
+    # via -r requirements.in
+python-dateutil==2.8.2
+    # via gnocchiclient
+python-keystoneclient==5.1.0
     # via -r requirements.in
-python-novaclient==18.2.0
+python-novaclient==18.3.0
     # via -r requirements.in
-pytz==2022.7.1
+pytz==2023.3
     # via
     #   oslo-serialization
     #   oslo-utils
 pyyaml==5.4.1
     # via
     #   -r requirements.in
+    #   cliff
     #   oslo-config
 requests==2.28.2
     # via
@@ -174,6 +207,7 @@ requests==2.28.2
     #   msal
     #   msrest
     #   oslo-config
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   requests-oauthlib
 requests-oauthlib==1.3.1
@@ -186,24 +220,33 @@ six==1.16.0
     # via
     #   azure-core
     #   azure-identity
+    #   gnocchiclient
     #   google-auth
     #   google-auth-httplib2
     #   isodate
     #   keystoneauth1
+    #   python-ceilometerclient
+    #   python-dateutil
     #   python-keystoneclient
-stevedore==4.1.1
+stevedore==5.0.0
     # via
+    #   cliff
     #   keystoneauth1
     #   oslo-config
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
-typing-extensions==4.4.0
+typing-extensions==4.5.0
     # via azure-core
+ujson==5.7.0
+    # via gnocchiclient
 uritemplate==4.1.1
     # via google-api-python-client
-urllib3==1.26.14
+urllib3==1.26.15
     # via requests
 wcwidth==0.2.6
-    # via prettytable
-wrapt==1.14.1
+    # via cmd2
+wrapt==1.15.0
     # via debtcollector
+zipp==3.15.0
+    # via importlib-metadata
diff --git a/src/osm_ngsa/dags/alert_vdu.py b/src/osm_ngsa/dags/alert_vdu.py
new file mode 100644 (file)
index 0000000..390460a
--- /dev/null
@@ -0,0 +1,179 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+    catchup=False,
+    default_args={
+        "depends_on_past": False,
+        "retries": 1,
+        "retry_delay": timedelta(seconds=5),
+    },
+    description="Webhook callback for VDU alarm from Prometheus AlertManager",
+    is_paused_upon_creation=False,
+    schedule_interval=None,
+    start_date=datetime(2022, 1, 1),
+    tags=["osm", "webhook"],
+)
+def alert_vdu():
+    @task(task_id="main_task")
+    def main_task():
+        logger.debug("Running main task...")
+        context = get_current_context()
+        conf = context["dag_run"].conf
+        for alarm in conf["alerts"]:
+            logger.info("VDU alarm:")
+            status = alarm["status"]
+            logger.info(f"  status: {status}")
+            logger.info(f'  annotations: {alarm["annotations"]}')
+            logger.info(f'  startsAt: {alarm["startsAt"]}')
+            logger.info(f'  endsAt: {alarm["endsAt"]}')
+            logger.info(f'  labels: {alarm["labels"]}')
+            # vdu_down alert type
+            if alarm["labels"]["alertname"] != "vdu_down":
+                continue
+            config = Config()
+            common_db = CommonDbClient(config)
+            ns_id = alarm["labels"]["ns_id"]
+            vdu_name = alarm["labels"]["vdu_name"]
+            vnf_member_index = alarm["labels"]["vnf_member_index"]
+            vm_id = alarm["labels"]["vm_id"]
+            if status == "firing":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                    f"vdu_name {vdu_name}, "
+                    f"vm_id {vm_id}"
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+                )
+                if alert and alert["action_type"] == "healing":
+                    logger.info("Found an alert rule:")
+                    logger.info(alert)
+                    # Update alert status
+                    common_db.update_alert_status(
+                        uuid=alert["uuid"], alarm_status="alarm"
+                    )
+                    # Get VNFR from MongoDB
+                    vnfr = common_db.get_vnfr(
+                        nsr_id=ns_id, member_index=vnf_member_index
+                    )
+                    logger.info(
+                        f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+                    )
+                    count_index = None
+                    for vdu in vnfr.get("vdur", []):
+                        if vdu["vim-id"] == vm_id:
+                            count_index = vdu["count-index"]
+                            break
+                    if count_index is None:
+                        logger.error(f"VDU {vm_id} not found in VNFR")
+                        break
+                    # Auto-healing type rule
+                    vnf_id = alarm["labels"]["vnf_id"]
+                    msg_bus = MessageBusClient(config)
+                    loop = asyncio.get_event_loop()
+                    _id = str(uuid.uuid4())
+                    now = time.time()
+                    vdu_id = alert["action"]["vdu-id"]
+                    day1 = alert["action"]["day1"]
+                    projects_read = vnfr["_admin"]["projects_read"]
+                    projects_write = vnfr["_admin"]["projects_write"]
+                    params = {
+                        "lcmOperationType": "heal",
+                        "nsInstanceId": ns_id,
+                        "healVnfData": [
+                            {
+                                "vnfInstanceId": vnf_id,
+                                "cause": "default",
+                                "additionalParams": {
+                                    "run-day1": day1,
+                                    "vdu": [
+                                        {
+                                            "run-day1": day1,
+                                            "count-index": count_index,
+                                            "vdu-id": vdu_id,
+                                        }
+                                    ],
+                                },
+                            }
+                        ],
+                    }
+                    nslcmop = {
+                        "id": _id,
+                        "_id": _id,
+                        "operationState": "PROCESSING",
+                        "statusEnteredTime": now,
+                        "nsInstanceId": ns_id,
+                        "member-vnf-index": vnf_member_index,
+                        "lcmOperationType": "heal",
+                        "startTime": now,
+                        "location": "default",
+                        "isAutomaticInvocation": True,
+                        "operationParams": params,
+                        "isCancelPending": False,
+                        "links": {
+                            "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+                            "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+                        },
+                        "_admin": {
+                            "projects_read": projects_read,
+                            "projects_write": projects_write,
+                        },
+                    }
+                    common_db.create_nslcmop(nslcmop)
+                    logger.info("Sending heal action message:")
+                    logger.info(nslcmop)
+                    loop.run_until_complete(msg_bus.aiowrite("ns", "heal", nslcmop))
+                else:
+                    logger.info("No alert rule was found")
+            elif status == "resolved":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                    f"vdu_name {vdu_name}, "
+                    f"vm_id {vm_id}"
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+                )
+                if alert:
+                    logger.info("Found an alert rule, updating status")
+                    # Update alert status
+                    common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+    main_task()
+
+
+dag = alert_vdu()
index 7c579c3..93254b1 100644 (file)
@@ -30,9 +30,9 @@ class CommonDbClient:
             )
         self.common_db.db_connect(config.get("database"))
 
-    def get_vnfr(self, nsr_id: str, member_index: int):
+    def get_vnfr(self, nsr_id: str, member_index: str):
         vnfr = self.common_db.get_one(
-            "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}
+            "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": member_index}
         )
         return vnfr
 
@@ -88,3 +88,23 @@ class CommonDbClient:
                         vim_account_id,
                     )
         return vim_account
+
+    def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str):
+        alert = self.common_db.get_one(
+            "alerts",
+            {
+                "tags.ns_id": nsr_id,
+                "tags.vnf_member_index": vnf_member_index,
+                "tags.vdu_name": vdu_name,
+            },
+        )
+        return alert
+
+    def update_alert_status(self, uuid: str, alarm_status: str):
+        modified_count = self.common_db.set_one(
+            "alerts", {"uuid": uuid}, {"alarm_status": alarm_status}
+        )
+        return modified_count
+
+    def create_nslcmop(self, nslcmop: dict):
+        self.common_db.create("nslcmops", nslcmop)
index 197c818..4bff5a4 100644 (file)
@@ -25,3 +25,7 @@ database:
   name: osm
   commonkey: gj7LmbCexbmII7memwbGRRdfbYuT3nvy
 
+message:
+  driver: kafka
+  host: kafka
+  port: 9092
diff --git a/src/osm_ngsa/osm_mon/core/message_bus_client.py b/src/osm_ngsa/osm_mon/core/message_bus_client.py
new file mode 100644 (file)
index 0000000..2ae895c
--- /dev/null
@@ -0,0 +1,66 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from typing import Callable, List
+
+from osm_common import msgkafka, msglocal
+from osm_mon.core.config import Config
+
+
+class MessageBusClient:
+    def __init__(self, config: Config, loop=None):
+        if config.get("message", "driver") == "local":
+            self.msg_bus = msglocal.MsgLocal()
+        elif config.get("message", "driver") == "kafka":
+            self.msg_bus = msgkafka.MsgKafka()
+        else:
+            raise Exception(
+                "Unknown message bug driver {}".format(config.get("section", "driver"))
+            )
+        self.msg_bus.connect(config.get("message"))
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+
+    async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
+        """
+        Retrieves messages continuously from bus and executes callback for each message consumed.
+        :param topics: List of message bus topics to consume from.
+        :param callback: Async callback function to be called for each message received.
+        :param kwargs: Keyword arguments to be passed to callback function.
+        :return: None
+        """
+        await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+
+    async def aiowrite(self, topic: str, key: str, msg: dict):
+        """
+        Writes message to bus.
+        :param topic: Topic to write to.
+        :param key: Key to write to.
+        :param msg: Dictionary containing message to be written.
+        :return: None
+        """
+        await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+
+    async def aioread_once(self, topic: str):
+        """
+        Retrieves last message from bus.
+        :param topic: topic to retrieve message from.
+        :return: tuple(topic, key, message)
+        """
+        result = await self.msg_bus.aioread(topic, self.loop)
+        return result