Bug Fix - 2305: Automated scaling of Vnf is not happening through metrics collected...
[osm/POL.git] / osm_policy_module / common / mon_client.py
index b8c3779..e9216aa 100644 (file)
@@ -21,7 +21,6 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-import asyncio
 import json
 import logging
 import random
@@ -36,13 +35,10 @@ log = logging.getLogger(__name__)
 
 
 class MonClient:
-    def __init__(self, config: Config, loop=None):
+    def __init__(self, config: Config):
         self.kafka_server = "{}:{}".format(
             config.get("message", "host"), config.get("message", "port")
         )
-        if not loop:
-            loop = asyncio.get_event_loop()
-        self.loop = loop
 
     async def create_alarm(
         self,
@@ -53,7 +49,9 @@ class MonClient:
         threshold: int,
         operation: str,
         statistic: str = "AVERAGE",
-        action: str = '',
+        action: str = "",
+        vnfr: object = None,
+        vnfd: object = None,
     ):
         cor_id = random.randint(1, 10e7)
         msg = self._build_create_alarm_payload(
@@ -66,10 +64,11 @@ class MonClient:
             statistic,
             operation,
             action,
+            vnfr,
+            vnfd,
         )
         log.debug("Sending create_alarm_request %s", msg)
         producer = AIOKafkaProducer(
-            loop=self.loop,
             bootstrap_servers=self.kafka_server,
             key_serializer=str.encode,
             value_serializer=str.encode,
@@ -84,7 +83,6 @@ class MonClient:
         log.debug("Waiting for create_alarm_response...")
         consumer = AIOKafkaConsumer(
             "alarm_response_" + str(cor_id),
-            loop=self.loop,
             bootstrap_servers=self.kafka_server,
             key_deserializer=bytes.decode,
             value_deserializer=bytes.decode,
@@ -119,7 +117,6 @@ class MonClient:
         )
         log.debug("Sending delete_alarm_request %s", msg)
         producer = AIOKafkaProducer(
-            loop=self.loop,
             bootstrap_servers=self.kafka_server,
             key_serializer=str.encode,
             value_serializer=str.encode,
@@ -134,7 +131,6 @@ class MonClient:
         log.debug("Waiting for delete_alarm_response...")
         consumer = AIOKafkaConsumer(
             "alarm_response_" + str(cor_id),
-            loop=self.loop,
             bootstrap_servers=self.kafka_server,
             key_deserializer=bytes.decode,
             value_deserializer=bytes.decode,
@@ -173,8 +169,26 @@ class MonClient:
         statistic: str,
         operation: str,
         action: str,
+        vnfr=None,
+        vnfd=None,
     ):
-
+        tags = {
+            "ns_id": ns_id,
+            "vdu_name": vdu_name,
+            "vnf_member_index": vnf_member_index,
+        }
+        if vnfr and vnfd:
+            # TODO: Change for multiple DF support
+            df = vnfd.get("df", [{}])[0]
+            if "exporters-endpoints" in df:
+                metric_port = df["exporters-endpoints"].get("metric-port", 9100)
+            if metric_name.startswith("kpi_"):
+                metric_name = metric_name.replace("kpi_", "")
+                metric_name.strip()
+                for vdu in vnfr["vdur"]:
+                    if vdu["name"] == vdu_name:
+                        vdu_ip = vdu["ip-address"]
+                        tags = {"instance": vdu_ip + ":" + str(metric_port)}
         alarm_create_request = {
             "correlation_id": cor_id,
             "alarm_name": "osm_alarm_{}_{}_{}_{}".format(
@@ -186,11 +200,7 @@ class MonClient:
             "threshold_value": threshold,
             "statistic": statistic,
             "action": action,
-            "tags": {
-                "ns_id": ns_id,
-                "vdu_name": vdu_name,
-                "vnf_member_index": vnf_member_index,
-            },
+            "tags": tags,
         }
         msg = {
             "alarm_create_request": alarm_create_request,