Feature 10981: added autohealing DAG and updated requirements
Change-Id: Ib1ed56c220969d54480ddd2382beae03e536b72b
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
diff --git a/requirements-dist.txt b/requirements-dist.txt
index cd29f8a..7b15e9f 100644
--- a/requirements-dist.txt
+++ b/requirements-dist.txt
@@ -22,7 +22,7 @@
# via -r requirements-dist.in
tomli==2.0.1
# via setuptools-scm
-typing-extensions==4.4.0
+typing-extensions==4.5.0
# via setuptools-scm
# The following packages are considered to be unsafe in a requirements file:
diff --git a/requirements.in b/requirements.in
index 6d8f0a8..6779aef 100644
--- a/requirements.in
+++ b/requirements.in
@@ -18,9 +18,11 @@
azure-common
azure-identity
azure-mgmt-compute
+gnocchiclient
google-api-python-client
google-auth
prometheus-client
+python-ceilometerclient
python-keystoneclient
python-novaclient
pyyaml==5.4.1
diff --git a/requirements.txt b/requirements.txt
index 59f9d17..fc34d3f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -14,11 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
+attrs==22.2.0
+ # via cmd2
+autopage==0.5.1
+ # via cliff
azure-common==1.1.28
# via
# -r requirements.in
# azure-mgmt-compute
-azure-core==1.26.2
+azure-core==1.26.4
# via
# azure-identity
# azure-mgmt-core
@@ -27,7 +31,7 @@
# via -r requirements.in
azure-mgmt-compute==29.1.0
# via -r requirements.in
-azure-mgmt-core==1.3.2
+azure-mgmt-core==1.4.0
# via azure-mgmt-compute
cachetools==5.3.0
# via google-auth
@@ -37,23 +41,32 @@
# requests
cffi==1.15.1
# via cryptography
-charset-normalizer==3.0.1
+charset-normalizer==3.1.0
# via requests
-cryptography==39.0.0
+cliff==4.2.0
+ # via gnocchiclient
+cmd2==2.4.3
+ # via cliff
+cryptography==40.0.1
# via
# azure-identity
# msal
# pyjwt
debtcollector==2.5.0
# via
+ # gnocchiclient
# oslo-config
# oslo-utils
# python-keystoneclient
+futurist==2.4.1
+ # via gnocchiclient
+gnocchiclient==7.0.8
+ # via -r requirements.in
google-api-core==2.11.0
# via google-api-python-client
-google-api-python-client==2.74.0
+google-api-python-client==2.84.0
# via -r requirements.in
-google-auth==2.16.0
+google-auth==2.17.2
# via
# -r requirements.in
# google-api-core
@@ -61,32 +74,38 @@
# google-auth-httplib2
google-auth-httplib2==0.1.0
# via google-api-python-client
-googleapis-common-protos==1.58.0
+googleapis-common-protos==1.59.0
# via google-api-core
-httplib2==0.21.0
+httplib2==0.22.0
# via
# google-api-python-client
# google-auth-httplib2
idna==3.4
# via requests
+importlib-metadata==6.3.0
+ # via cliff
iso8601==1.1.0
# via
+ # gnocchiclient
# keystoneauth1
# oslo-utils
+ # python-ceilometerclient
# python-novaclient
isodate==0.6.1
# via msrest
-keystoneauth1==5.1.1
+keystoneauth1==5.1.2
# via
+ # gnocchiclient
+ # python-ceilometerclient
# python-keystoneclient
# python-novaclient
-msal==1.20.0
+msal==1.21.0
# via
# azure-identity
# msal-extensions
msal-extensions==1.0.0
# via azure-identity
-msgpack==1.0.4
+msgpack==1.0.5
# via oslo-serialization
msrest==0.7.1
# via azure-mgmt-compute
@@ -100,21 +119,24 @@
# via requests-oauthlib
os-service-types==1.7.0
# via keystoneauth1
-oslo-config==9.1.0
+oslo-config==9.1.1
# via python-keystoneclient
-oslo-i18n==5.1.0
+oslo-i18n==6.0.0
# via
# oslo-config
# oslo-utils
+ # python-ceilometerclient
# python-keystoneclient
# python-novaclient
-oslo-serialization==5.0.0
+oslo-serialization==5.1.1
# via
+ # python-ceilometerclient
# python-keystoneclient
# python-novaclient
oslo-utils==6.1.0
# via
# oslo-serialization
+ # python-ceilometerclient
# python-keystoneclient
# python-novaclient
packaging==23.0
@@ -127,16 +149,20 @@
# os-service-types
# oslo-i18n
# oslo-serialization
+ # python-ceilometerclient
# python-keystoneclient
# python-novaclient
# stevedore
portalocker==2.7.0
# via msal-extensions
-prettytable==3.6.0
- # via python-novaclient
+prettytable==0.7.2
+ # via
+ # cliff
+ # python-ceilometerclient
+ # python-novaclient
prometheus-client==0.16.0
# via -r requirements.in
-protobuf==4.21.12
+protobuf==4.22.1
# via
# google-api-core
# googleapis-common-protos
@@ -154,17 +180,24 @@
# via
# httplib2
# oslo-utils
-python-keystoneclient==5.0.1
+pyperclip==1.8.2
+ # via cmd2
+python-ceilometerclient==2.9.0
# via -r requirements.in
-python-novaclient==18.2.0
+python-dateutil==2.8.2
+ # via gnocchiclient
+python-keystoneclient==5.1.0
# via -r requirements.in
-pytz==2022.7.1
+python-novaclient==18.3.0
+ # via -r requirements.in
+pytz==2023.3
# via
# oslo-serialization
# oslo-utils
pyyaml==5.4.1
# via
# -r requirements.in
+ # cliff
# oslo-config
requests==2.28.2
# via
@@ -174,6 +207,7 @@
# msal
# msrest
# oslo-config
+ # python-ceilometerclient
# python-keystoneclient
# requests-oauthlib
requests-oauthlib==1.3.1
@@ -186,24 +220,33 @@
# via
# azure-core
# azure-identity
+ # gnocchiclient
# google-auth
# google-auth-httplib2
# isodate
# keystoneauth1
+ # python-ceilometerclient
+ # python-dateutil
# python-keystoneclient
-stevedore==4.1.1
+stevedore==5.0.0
# via
+ # cliff
# keystoneauth1
# oslo-config
+ # python-ceilometerclient
# python-keystoneclient
# python-novaclient
-typing-extensions==4.4.0
+typing-extensions==4.5.0
# via azure-core
+ujson==5.7.0
+ # via gnocchiclient
uritemplate==4.1.1
# via google-api-python-client
-urllib3==1.26.14
+urllib3==1.26.15
# via requests
wcwidth==0.2.6
- # via prettytable
-wrapt==1.14.1
+ # via cmd2
+wrapt==1.15.0
# via debtcollector
+zipp==3.15.0
+ # via importlib-metadata
diff --git a/src/osm_ngsa/dags/alert_vdu.py b/src/osm_ngsa/dags/alert_vdu.py
new file mode 100644
index 0000000..390460a
--- /dev/null
+++ b/src/osm_ngsa/dags/alert_vdu.py
@@ -0,0 +1,179 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ "retry_delay": timedelta(seconds=5),
+ },
+ description="Webhook callback for VDU alarm from Prometheus AlertManager",
+ is_paused_upon_creation=False,
+ schedule_interval=None,
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "webhook"],
+)
+def alert_vdu():
+ @task(task_id="main_task")
+ def main_task():
+ logger.debug("Running main task...")
+ context = get_current_context()
+ conf = context["dag_run"].conf
+ for alarm in conf["alerts"]:
+ logger.info("VDU alarm:")
+ status = alarm["status"]
+ logger.info(f" status: {status}")
+ logger.info(f' annotations: {alarm["annotations"]}')
+ logger.info(f' startsAt: {alarm["startsAt"]}')
+ logger.info(f' endsAt: {alarm["endsAt"]}')
+ logger.info(f' labels: {alarm["labels"]}')
+ # vdu_down alert type
+ if alarm["labels"]["alertname"] != "vdu_down":
+ continue
+ config = Config()
+ common_db = CommonDbClient(config)
+ ns_id = alarm["labels"]["ns_id"]
+ vdu_name = alarm["labels"]["vdu_name"]
+ vnf_member_index = alarm["labels"]["vnf_member_index"]
+ vm_id = alarm["labels"]["vm_id"]
+ if status == "firing":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ f"vdu_name {vdu_name}, "
+ f"vm_id {vm_id}"
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+ )
+ if alert and alert["action_type"] == "healing":
+ logger.info("Found an alert rule:")
+ logger.info(alert)
+ # Update alert status
+ common_db.update_alert_status(
+ uuid=alert["uuid"], alarm_status="alarm"
+ )
+ # Get VNFR from MongoDB
+ vnfr = common_db.get_vnfr(
+ nsr_id=ns_id, member_index=vnf_member_index
+ )
+ logger.info(
+ f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+ )
+ count_index = None
+ for vdu in vnfr.get("vdur", []):
+ if vdu["vim-id"] == vm_id:
+ count_index = vdu["count-index"]
+ break
+ if count_index is None:
+ logger.error(f"VDU {vm_id} not found in VNFR")
+ break
+ # Auto-healing type rule
+ vnf_id = alarm["labels"]["vnf_id"]
+ msg_bus = MessageBusClient(config)
+ loop = asyncio.get_event_loop()
+ _id = str(uuid.uuid4())
+ now = time.time()
+ vdu_id = alert["action"]["vdu-id"]
+ day1 = alert["action"]["day1"]
+ projects_read = vnfr["_admin"]["projects_read"]
+ projects_write = vnfr["_admin"]["projects_write"]
+ params = {
+ "lcmOperationType": "heal",
+ "nsInstanceId": ns_id,
+ "healVnfData": [
+ {
+ "vnfInstanceId": vnf_id,
+ "cause": "default",
+ "additionalParams": {
+ "run-day1": day1,
+ "vdu": [
+ {
+ "run-day1": day1,
+ "count-index": count_index,
+ "vdu-id": vdu_id,
+ }
+ ],
+ },
+ }
+ ],
+ }
+ nslcmop = {
+ "id": _id,
+ "_id": _id,
+ "operationState": "PROCESSING",
+ "statusEnteredTime": now,
+ "nsInstanceId": ns_id,
+ "member-vnf-index": vnf_member_index,
+ "lcmOperationType": "heal",
+ "startTime": now,
+ "location": "default",
+ "isAutomaticInvocation": True,
+ "operationParams": params,
+ "isCancelPending": False,
+ "links": {
+ "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+ "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+ },
+ "_admin": {
+ "projects_read": projects_read,
+ "projects_write": projects_write,
+ },
+ }
+ common_db.create_nslcmop(nslcmop)
+ logger.info("Sending heal action message:")
+ logger.info(nslcmop)
+ loop.run_until_complete(msg_bus.aiowrite("ns", "heal", nslcmop))
+ else:
+ logger.info("No alert rule was found")
+ elif status == "resolved":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ f"vdu_name {vdu_name}, "
+ f"vm_id {vm_id}"
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+ )
+ if alert:
+ logger.info("Found an alert rule, updating status")
+ # Update alert status
+ common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+ main_task()
+
+
+dag = alert_vdu()
diff --git a/src/osm_ngsa/osm_mon/core/common_db.py b/src/osm_ngsa/osm_mon/core/common_db.py
index 7c579c3..93254b1 100644
--- a/src/osm_ngsa/osm_mon/core/common_db.py
+++ b/src/osm_ngsa/osm_mon/core/common_db.py
@@ -30,9 +30,9 @@
)
self.common_db.db_connect(config.get("database"))
- def get_vnfr(self, nsr_id: str, member_index: int):
+ def get_vnfr(self, nsr_id: str, member_index: str):
vnfr = self.common_db.get_one(
- "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}
+ "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": member_index}
)
return vnfr
@@ -88,3 +88,23 @@
vim_account_id,
)
return vim_account
+
+ def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str):
+ alert = self.common_db.get_one(
+ "alerts",
+ {
+ "tags.ns_id": nsr_id,
+ "tags.vnf_member_index": vnf_member_index,
+ "tags.vdu_name": vdu_name,
+ },
+ )
+ return alert
+
+ def update_alert_status(self, uuid: str, alarm_status: str):
+ modified_count = self.common_db.set_one(
+ "alerts", {"uuid": uuid}, {"alarm_status": alarm_status}
+ )
+ return modified_count
+
+ def create_nslcmop(self, nslcmop: dict):
+ self.common_db.create("nslcmops", nslcmop)
diff --git a/src/osm_ngsa/osm_mon/core/config.yaml b/src/osm_ngsa/osm_mon/core/config.yaml
index 197c818..4bff5a4 100644
--- a/src/osm_ngsa/osm_mon/core/config.yaml
+++ b/src/osm_ngsa/osm_mon/core/config.yaml
@@ -25,3 +25,7 @@
name: osm
commonkey: gj7LmbCexbmII7memwbGRRdfbYuT3nvy
+message:
+ driver: kafka
+ host: kafka
+ port: 9092
diff --git a/src/osm_ngsa/osm_mon/core/message_bus_client.py b/src/osm_ngsa/osm_mon/core/message_bus_client.py
new file mode 100644
index 0000000..2ae895c
--- /dev/null
+++ b/src/osm_ngsa/osm_mon/core/message_bus_client.py
@@ -0,0 +1,66 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from typing import Callable, List
+
+from osm_common import msgkafka, msglocal
+from osm_mon.core.config import Config
+
+
+class MessageBusClient:
+ def __init__(self, config: Config, loop=None):
+ if config.get("message", "driver") == "local":
+ self.msg_bus = msglocal.MsgLocal()
+ elif config.get("message", "driver") == "kafka":
+ self.msg_bus = msgkafka.MsgKafka()
+ else:
+ raise Exception(
+ "Unknown message bug driver {}".format(config.get("section", "driver"))
+ )
+ self.msg_bus.connect(config.get("message"))
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
+
+ async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
+ """
+ Retrieves messages continuously from bus and executes callback for each message consumed.
+ :param topics: List of message bus topics to consume from.
+ :param callback: Async callback function to be called for each message received.
+ :param kwargs: Keyword arguments to be passed to callback function.
+ :return: None
+ """
+ await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+
+ async def aiowrite(self, topic: str, key: str, msg: dict):
+ """
+ Writes message to bus.
+ :param topic: Topic to write to.
+ :param key: Key to write to.
+ :param msg: Dictionary containing message to be written.
+ :return: None
+ """
+ await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+
+ async def aioread_once(self, topic: str):
+ """
+ Retrieves last message from bus.
+ :param topic: topic to retrieve message from.
+ :return: tuple(topic, key, message)
+ """
+ result = await self.msg_bus.aioread(topic, self.loop)
+ return result