Feature 10981: add Openstack metrics collector and scale-out/in DAGs for autoscaling
[osm/NG-SA.git] / src / osm_ngsa / dags / scaleout_vdu.py
diff --git a/src/osm_ngsa/dags/scaleout_vdu.py b/src/osm_ngsa/dags/scaleout_vdu.py
new file mode 100644 (file)
index 0000000..978ab3f
--- /dev/null
@@ -0,0 +1,210 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+    catchup=False,
+    default_args={
+        "depends_on_past": False,
+        "retries": 1,
+        "retry_delay": timedelta(seconds=15),
+    },
+    description="Webhook callback for scale-out alarm from Prometheus AlertManager",
+    is_paused_upon_creation=False,
+    schedule_interval=None,
+    start_date=datetime(2022, 1, 1),
+    tags=["osm", "webhook"],
+)
+def scaleout_vdu():
+    @task(task_id="main_task")
+    def main_task():
+        logger.debug("Running main task...")
+        # Read input parameters
+        context = get_current_context()
+        conf = context["dag_run"].conf
+        for alarm in conf["alerts"]:
+            logger.info("Scale-out alarm:")
+            status = alarm["status"]
+            logger.info(f"  status: {status}")
+            logger.info(f'  annotations: {alarm["annotations"]}')
+            logger.info(f'  startsAt: {alarm["startsAt"]}')
+            logger.info(f'  endsAt: {alarm["endsAt"]}')
+            logger.info(f'  labels: {alarm["labels"]}')
+            alertname = alarm["labels"].get("alertname")
+            if not alertname.startswith("scaleout_"):
+                continue
+            # scaleout_vdu alert type
+            config = Config()
+            common_db = CommonDbClient(config)
+            ns_id = alarm["labels"]["ns_id"]
+            vdu_id = alarm["labels"]["vdu_id"]
+            vnf_member_index = alarm["labels"]["vnf_member_index"]
+            if status == "firing":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching scale-out alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                    f"vdu_id {vdu_id}, "
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id,
+                    vnf_member_index=vnf_member_index,
+                    vdu_id=vdu_id,
+                    vdu_name=None,
+                    action_type="scale_out",
+                )
+                if alert:
+                    logger.info("Found an alert rule:")
+                    logger.info(alert)
+                    # Update alert status
+                    common_db.update_alert_status(
+                        uuid=alert["uuid"], alarm_status="alarm"
+                    )
+                    # Get VNFR from MongoDB
+                    vnfr = common_db.get_vnfr(
+                        nsr_id=ns_id, member_index=vnf_member_index
+                    )
+                    logger.info(
+                        f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+                    )
+                    # Check cooldown-time before scale-out
+                    send_lcm = 1
+                    if "cooldown-time" in alert["action"]:
+                        cooldown_time = alert["action"]["cooldown-time"]
+                        cooldown_time = cooldown_time * 60
+                        now = time.time()
+                        since = now - cooldown_time
+                        logger.info(
+                            f"Looking for scale operations in cooldown interval ({cooldown_time} s)"
+                        )
+                        nslcmops = common_db.get_nslcmop(
+                            nsr_id=ns_id, operation_type="scale", since=since
+                        )
+                        op = next(
+                            (
+                                sub
+                                for sub in nslcmops
+                                if ("scaleVnfData" in sub["operationParams"])
+                                and (
+                                    "scaleByStepData"
+                                    in sub["operationParams"]["scaleVnfData"]
+                                )
+                                and (
+                                    "member-vnf-index"
+                                    in sub["operationParams"]["scaleVnfData"][
+                                        "scaleByStepData"
+                                    ]
+                                )
+                                and (
+                                    sub["operationParams"]["scaleVnfData"][
+                                        "scaleByStepData"
+                                    ]["member-vnf-index"]
+                                    == vnf_member_index
+                                )
+                            ),
+                            None,
+                        )
+                        if op:
+                            logger.info(
+                                f"No scale-out will be launched, found a previous scale operation in cooldown interval: {op}"
+                            )
+                            send_lcm = 0
+
+                    if send_lcm:
+                        # Save nslcmop object in MongoDB
+                        msg_bus = MessageBusClient(config)
+                        loop = asyncio.get_event_loop()
+                        _id = str(uuid.uuid4())
+                        projects_read = vnfr["_admin"]["projects_read"]
+                        projects_write = vnfr["_admin"]["projects_write"]
+                        scaling_group = alert["action"]["scaling-group"]
+                        params = {
+                            "scaleType": "SCALE_VNF",
+                            "scaleVnfData": {
+                                "scaleVnfType": "SCALE_OUT",
+                                "scaleByStepData": {
+                                    "scaling-group-descriptor": scaling_group,
+                                    "member-vnf-index": vnf_member_index,
+                                },
+                            },
+                            "scaleTime": "{}Z".format(datetime.utcnow().isoformat()),
+                        }
+                        nslcmop = {
+                            "id": _id,
+                            "_id": _id,
+                            "operationState": "PROCESSING",
+                            "statusEnteredTime": now,
+                            "nsInstanceId": ns_id,
+                            "lcmOperationType": "scale",
+                            "startTime": now,
+                            "isAutomaticInvocation": True,
+                            "operationParams": params,
+                            "isCancelPending": False,
+                            "links": {
+                                "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+                                "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+                            },
+                            "_admin": {
+                                "projects_read": projects_read,
+                                "projects_write": projects_write,
+                            },
+                        }
+                        common_db.create_nslcmop(nslcmop)
+                        # Send Kafka message to LCM
+                        logger.info("Sending scale-out action message:")
+                        logger.info(nslcmop)
+                        loop.run_until_complete(
+                            msg_bus.aiowrite("ns", "scale", nslcmop)
+                        )
+                else:
+                    logger.info("No alert rule was found")
+            elif status == "resolved":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id,
+                    vnf_member_index=vnf_member_index,
+                    vdu_id=vdu_id,
+                    vdu_name=None,
+                    action_type="scale_out",
+                )
+                if alert:
+                    logger.info("Found an alert rule, updating status")
+                    # Update alert status
+                    common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+    main_task()
+
+
+dag = scaleout_vdu()