From: aguilard Date: Mon, 27 Mar 2023 11:51:10 +0000 (+0000) Subject: Feature 10981: added autohealing DAG and updated requirements X-Git-Tag: release-v14.0-start~15 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F98%2F13098%2F10;p=osm%2FNG-SA.git Feature 10981: added autohealing DAG and updated requirements Change-Id: Ib1ed56c220969d54480ddd2382beae03e536b72b Signed-off-by: aguilard --- diff --git a/requirements-dist.txt b/requirements-dist.txt index cd29f8a..7b15e9f 100644 --- a/requirements-dist.txt +++ b/requirements-dist.txt @@ -22,7 +22,7 @@ stdeb==0.10.0 # via -r requirements-dist.in tomli==2.0.1 # via setuptools-scm -typing-extensions==4.4.0 +typing-extensions==4.5.0 # via setuptools-scm # The following packages are considered to be unsafe in a requirements file: diff --git a/requirements.in b/requirements.in index 6d8f0a8..6779aef 100644 --- a/requirements.in +++ b/requirements.in @@ -18,9 +18,11 @@ azure-common azure-identity azure-mgmt-compute +gnocchiclient google-api-python-client google-auth prometheus-client +python-ceilometerclient python-keystoneclient python-novaclient pyyaml==5.4.1 diff --git a/requirements.txt b/requirements.txt index 59f9d17..fc34d3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,11 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. ####################################################################################### +attrs==22.2.0 + # via cmd2 +autopage==0.5.1 + # via cliff azure-common==1.1.28 # via # -r requirements.in # azure-mgmt-compute -azure-core==1.26.2 +azure-core==1.26.4 # via # azure-identity # azure-mgmt-core @@ -27,7 +31,7 @@ azure-identity==1.12.0 # via -r requirements.in azure-mgmt-compute==29.1.0 # via -r requirements.in -azure-mgmt-core==1.3.2 +azure-mgmt-core==1.4.0 # via azure-mgmt-compute cachetools==5.3.0 # via google-auth @@ -37,23 +41,32 @@ certifi==2022.12.7 # requests cffi==1.15.1 # via cryptography -charset-normalizer==3.0.1 +charset-normalizer==3.1.0 # via requests -cryptography==39.0.0 +cliff==4.2.0 + # via gnocchiclient +cmd2==2.4.3 + # via cliff +cryptography==40.0.1 # via # azure-identity # msal # pyjwt debtcollector==2.5.0 # via + # gnocchiclient # oslo-config # oslo-utils # python-keystoneclient +futurist==2.4.1 + # via gnocchiclient +gnocchiclient==7.0.8 + # via -r requirements.in google-api-core==2.11.0 # via google-api-python-client -google-api-python-client==2.74.0 +google-api-python-client==2.84.0 # via -r requirements.in -google-auth==2.16.0 +google-auth==2.17.2 # via # -r requirements.in # google-api-core @@ -61,32 +74,38 @@ google-auth==2.16.0 # google-auth-httplib2 google-auth-httplib2==0.1.0 # via google-api-python-client -googleapis-common-protos==1.58.0 +googleapis-common-protos==1.59.0 # via google-api-core -httplib2==0.21.0 +httplib2==0.22.0 # via # google-api-python-client # google-auth-httplib2 idna==3.4 # via requests +importlib-metadata==6.3.0 + # via cliff iso8601==1.1.0 # via + # gnocchiclient # keystoneauth1 # oslo-utils + # python-ceilometerclient # python-novaclient isodate==0.6.1 # via msrest -keystoneauth1==5.1.1 +keystoneauth1==5.1.2 # via + # gnocchiclient + # python-ceilometerclient # python-keystoneclient # python-novaclient -msal==1.20.0 +msal==1.21.0 # via # azure-identity # msal-extensions msal-extensions==1.0.0 # via azure-identity -msgpack==1.0.4 +msgpack==1.0.5 # via oslo-serialization msrest==0.7.1 # via azure-mgmt-compute @@ -100,21 +119,24 @@ oauthlib==3.2.2 # via requests-oauthlib os-service-types==1.7.0 # via keystoneauth1 -oslo-config==9.1.0 +oslo-config==9.1.1 # via python-keystoneclient -oslo-i18n==5.1.0 +oslo-i18n==6.0.0 # via # oslo-config # oslo-utils + # python-ceilometerclient # python-keystoneclient # python-novaclient -oslo-serialization==5.0.0 +oslo-serialization==5.1.1 # via + # python-ceilometerclient # python-keystoneclient # python-novaclient oslo-utils==6.1.0 # via # oslo-serialization + # python-ceilometerclient # python-keystoneclient # python-novaclient packaging==23.0 @@ -127,16 +149,20 @@ pbr==5.11.1 # os-service-types # oslo-i18n # oslo-serialization + # python-ceilometerclient # python-keystoneclient # python-novaclient # stevedore portalocker==2.7.0 # via msal-extensions -prettytable==3.6.0 - # via python-novaclient +prettytable==0.7.2 + # via + # cliff + # python-ceilometerclient + # python-novaclient prometheus-client==0.16.0 # via -r requirements.in -protobuf==4.21.12 +protobuf==4.22.1 # via # google-api-core # googleapis-common-protos @@ -154,17 +180,24 @@ pyparsing==3.0.9 # via # httplib2 # oslo-utils -python-keystoneclient==5.0.1 +pyperclip==1.8.2 + # via cmd2 +python-ceilometerclient==2.9.0 + # via -r requirements.in +python-dateutil==2.8.2 + # via gnocchiclient +python-keystoneclient==5.1.0 # via -r requirements.in -python-novaclient==18.2.0 +python-novaclient==18.3.0 # via -r requirements.in -pytz==2022.7.1 +pytz==2023.3 # via # oslo-serialization # oslo-utils pyyaml==5.4.1 # via # -r requirements.in + # cliff # oslo-config requests==2.28.2 # via @@ -174,6 +207,7 @@ requests==2.28.2 # msal # msrest # oslo-config + # python-ceilometerclient # python-keystoneclient # requests-oauthlib requests-oauthlib==1.3.1 @@ -186,24 +220,33 @@ six==1.16.0 # via # azure-core # azure-identity + # gnocchiclient # google-auth # google-auth-httplib2 # isodate # keystoneauth1 + # python-ceilometerclient + # python-dateutil # python-keystoneclient -stevedore==4.1.1 +stevedore==5.0.0 # via + # cliff # keystoneauth1 # oslo-config + # python-ceilometerclient # python-keystoneclient # python-novaclient -typing-extensions==4.4.0 +typing-extensions==4.5.0 # via azure-core +ujson==5.7.0 + # via gnocchiclient uritemplate==4.1.1 # via google-api-python-client -urllib3==1.26.14 +urllib3==1.26.15 # via requests wcwidth==0.2.6 - # via prettytable -wrapt==1.14.1 + # via cmd2 +wrapt==1.15.0 # via debtcollector +zipp==3.15.0 + # via importlib-metadata diff --git a/src/osm_ngsa/dags/alert_vdu.py b/src/osm_ngsa/dags/alert_vdu.py new file mode 100644 index 0000000..390460a --- /dev/null +++ b/src/osm_ngsa/dags/alert_vdu.py @@ -0,0 +1,179 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +import asyncio +from datetime import datetime, timedelta +import logging +import time +import uuid + +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config +from osm_mon.core.message_bus_client import MessageBusClient + +# Logging +logger = logging.getLogger("airflow.task") + + +@dag( + catchup=False, + default_args={ + "depends_on_past": False, + "retries": 1, + "retry_delay": timedelta(seconds=5), + }, + description="Webhook callback for VDU alarm from Prometheus AlertManager", + is_paused_upon_creation=False, + schedule_interval=None, + start_date=datetime(2022, 1, 1), + tags=["osm", "webhook"], +) +def alert_vdu(): + @task(task_id="main_task") + def main_task(): + logger.debug("Running main task...") + context = get_current_context() + conf = context["dag_run"].conf + for alarm in conf["alerts"]: + logger.info("VDU alarm:") + status = alarm["status"] + logger.info(f" status: {status}") + logger.info(f' annotations: {alarm["annotations"]}') + logger.info(f' startsAt: {alarm["startsAt"]}') + logger.info(f' endsAt: {alarm["endsAt"]}') + logger.info(f' labels: {alarm["labels"]}') + # vdu_down alert type + if alarm["labels"]["alertname"] != "vdu_down": + continue + config = Config() + common_db = CommonDbClient(config) + ns_id = alarm["labels"]["ns_id"] + vdu_name = alarm["labels"]["vdu_name"] + vnf_member_index = alarm["labels"]["vnf_member_index"] + vm_id = alarm["labels"]["vm_id"] + if status == "firing": + # Searching alerting rule in MongoDB + logger.info( + f"Searching alert rule in MongoDB: ns_id {ns_id}, " + f"vnf_member_index {vnf_member_index}, " + f"vdu_name {vdu_name}, " + f"vm_id {vm_id}" + ) + alert = common_db.get_alert( + nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name + ) + if alert and alert["action_type"] == "healing": + logger.info("Found an alert rule:") + logger.info(alert) + # Update alert status + common_db.update_alert_status( + uuid=alert["uuid"], alarm_status="alarm" + ) + # Get VNFR from MongoDB + vnfr = common_db.get_vnfr( + nsr_id=ns_id, member_index=vnf_member_index + ) + logger.info( + f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}" + ) + count_index = None + for vdu in vnfr.get("vdur", []): + if vdu["vim-id"] == vm_id: + count_index = vdu["count-index"] + break + if count_index is None: + logger.error(f"VDU {vm_id} not found in VNFR") + break + # Auto-healing type rule + vnf_id = alarm["labels"]["vnf_id"] + msg_bus = MessageBusClient(config) + loop = asyncio.get_event_loop() + _id = str(uuid.uuid4()) + now = time.time() + vdu_id = alert["action"]["vdu-id"] + day1 = alert["action"]["day1"] + projects_read = vnfr["_admin"]["projects_read"] + projects_write = vnfr["_admin"]["projects_write"] + params = { + "lcmOperationType": "heal", + "nsInstanceId": ns_id, + "healVnfData": [ + { + "vnfInstanceId": vnf_id, + "cause": "default", + "additionalParams": { + "run-day1": day1, + "vdu": [ + { + "run-day1": day1, + "count-index": count_index, + "vdu-id": vdu_id, + } + ], + }, + } + ], + } + nslcmop = { + "id": _id, + "_id": _id, + "operationState": "PROCESSING", + "statusEnteredTime": now, + "nsInstanceId": ns_id, + "member-vnf-index": vnf_member_index, + "lcmOperationType": "heal", + "startTime": now, + "location": "default", + "isAutomaticInvocation": True, + "operationParams": params, + "isCancelPending": False, + "links": { + "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id, + "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id, + }, + "_admin": { + "projects_read": projects_read, + "projects_write": projects_write, + }, + } + common_db.create_nslcmop(nslcmop) + logger.info("Sending heal action message:") + logger.info(nslcmop) + loop.run_until_complete(msg_bus.aiowrite("ns", "heal", nslcmop)) + else: + logger.info("No alert rule was found") + elif status == "resolved": + # Searching alerting rule in MongoDB + logger.info( + f"Searching alert rule in MongoDB: ns_id {ns_id}, " + f"vnf_member_index {vnf_member_index}, " + f"vdu_name {vdu_name}, " + f"vm_id {vm_id}" + ) + alert = common_db.get_alert( + nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name + ) + if alert: + logger.info("Found an alert rule, updating status") + # Update alert status + common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok") + + main_task() + + +dag = alert_vdu() diff --git a/src/osm_ngsa/osm_mon/core/common_db.py b/src/osm_ngsa/osm_mon/core/common_db.py index 7c579c3..93254b1 100644 --- a/src/osm_ngsa/osm_mon/core/common_db.py +++ b/src/osm_ngsa/osm_mon/core/common_db.py @@ -30,9 +30,9 @@ class CommonDbClient: ) self.common_db.db_connect(config.get("database")) - def get_vnfr(self, nsr_id: str, member_index: int): + def get_vnfr(self, nsr_id: str, member_index: str): vnfr = self.common_db.get_one( - "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)} + "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": member_index} ) return vnfr @@ -88,3 +88,23 @@ class CommonDbClient: vim_account_id, ) return vim_account + + def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str): + alert = self.common_db.get_one( + "alerts", + { + "tags.ns_id": nsr_id, + "tags.vnf_member_index": vnf_member_index, + "tags.vdu_name": vdu_name, + }, + ) + return alert + + def update_alert_status(self, uuid: str, alarm_status: str): + modified_count = self.common_db.set_one( + "alerts", {"uuid": uuid}, {"alarm_status": alarm_status} + ) + return modified_count + + def create_nslcmop(self, nslcmop: dict): + self.common_db.create("nslcmops", nslcmop) diff --git a/src/osm_ngsa/osm_mon/core/config.yaml b/src/osm_ngsa/osm_mon/core/config.yaml index 197c818..4bff5a4 100644 --- a/src/osm_ngsa/osm_mon/core/config.yaml +++ b/src/osm_ngsa/osm_mon/core/config.yaml @@ -25,3 +25,7 @@ database: name: osm commonkey: gj7LmbCexbmII7memwbGRRdfbYuT3nvy +message: + driver: kafka + host: kafka + port: 9092 diff --git a/src/osm_ngsa/osm_mon/core/message_bus_client.py b/src/osm_ngsa/osm_mon/core/message_bus_client.py new file mode 100644 index 0000000..2ae895c --- /dev/null +++ b/src/osm_ngsa/osm_mon/core/message_bus_client.py @@ -0,0 +1,66 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +import asyncio +from typing import Callable, List + +from osm_common import msgkafka, msglocal +from osm_mon.core.config import Config + + +class MessageBusClient: + def __init__(self, config: Config, loop=None): + if config.get("message", "driver") == "local": + self.msg_bus = msglocal.MsgLocal() + elif config.get("message", "driver") == "kafka": + self.msg_bus = msgkafka.MsgKafka() + else: + raise Exception( + "Unknown message bug driver {}".format(config.get("section", "driver")) + ) + self.msg_bus.connect(config.get("message")) + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + + async def aioread(self, topics: List[str], callback: Callable = None, **kwargs): + """ + Retrieves messages continuously from bus and executes callback for each message consumed. + :param topics: List of message bus topics to consume from. + :param callback: Async callback function to be called for each message received. + :param kwargs: Keyword arguments to be passed to callback function. + :return: None + """ + await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs) + + async def aiowrite(self, topic: str, key: str, msg: dict): + """ + Writes message to bus. + :param topic: Topic to write to. + :param key: Key to write to. + :param msg: Dictionary containing message to be written. + :return: None + """ + await self.msg_bus.aiowrite(topic, key, msg, self.loop) + + async def aioread_once(self, topic: str): + """ + Retrieves last message from bus. + :param topic: topic to retrieve message from. + :return: tuple(topic, key, message) + """ + result = await self.msg_bus.aioread(topic, self.loop) + return result