Fixes discrepancies with IM related to monitoring params
[osm/POL.git] / osm_policy_module / core / agent.py
index ba35391..24f1004 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import asyncio
 import datetime
 import json
 import logging
-import threading
 from json import JSONDecodeError
 
 import yaml
-from kafka import KafkaConsumer
+from aiokafka import AIOKafkaConsumer
 
-from osm_policy_module.common.db_client import DbClient
+from osm_policy_module.common.common_db_client import CommonDbClient
 from osm_policy_module.common.lcm_client import LcmClient
 from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
@@ -43,26 +43,38 @@ ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'notify_alarm']
 
 
 class PolicyModuleAgent:
-    def __init__(self):
+    def __init__(self, loop=None):
         cfg = Config.instance()
-        self.db_client = DbClient()
-        self.mon_client = MonClient()
-        self.lcm_client = LcmClient()
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+        self.db_client = CommonDbClient()
+        self.mon_client = MonClient(loop=self.loop)
+        self.lcm_client = LcmClient(loop=self.loop)
         self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
                                            cfg.OSMPOL_MESSAGE_PORT)
 
     def run(self):
-        consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
-                                 key_deserializer=bytes.decode,
-                                 value_deserializer=bytes.decode,
-                                 group_id='pol-consumer')
-        consumer.subscribe(["ns", "alarm_response"])
+        self.loop.run_until_complete(self.start())
 
-        for message in consumer:
-            t = threading.Thread(target=self._process_msg, args=(message.topic, message.key, message.value,))
-            t.start()
+    async def start(self):
+        consumer = AIOKafkaConsumer(
+            "ns",
+            "alarm_response",
+            loop=self.loop,
+            bootstrap_servers=self.kafka_server,
+            group_id="pol-consumer",
+            key_deserializer=bytes.decode,
+            value_deserializer=bytes.decode,
+        )
+        await consumer.start()
+        try:
+            async for msg in consumer:
+                await self._process_msg(msg.topic, msg.key, msg.value)
+        finally:
+            await consumer.stop()
 
-    def _process_msg(self, topic, key, msg):
+    async def _process_msg(self, topic, key, msg):
         log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
         try:
             if key in ALLOWED_KAFKA_KEYS:
@@ -72,16 +84,16 @@ class PolicyModuleAgent:
                     content = yaml.safe_load(msg)
 
                 if key == 'instantiated' or key == 'scaled':
-                    self._handle_instantiated_or_scaled(content)
+                    await self._handle_instantiated_or_scaled(content)
 
                 if key == 'notify_alarm':
-                    self._handle_alarm_notification(content)
+                    await self._handle_alarm_notification(content)
             else:
                 log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
         except Exception:
             log.exception("Error consuming message: ")
 
-    def _handle_alarm_notification(self, content):
+    async def _handle_alarm_notification(self, content):
         log.debug("_handle_alarm_notification: %s", content)
         alarm_id = content['notify_details']['alarm_uuid']
         metric_name = content['notify_details']['metric_name']
@@ -109,30 +121,30 @@ class PolicyModuleAgent:
                 log.info("Time between last scale and now is less than cooldown time. Skipping.")
                 return
             log.info("Sending scaling action message for ns: %s", alarm_id)
-            self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
-                                  alarm.scaling_criteria.scaling_policy.scaling_group.name,
-                                  alarm.vnf_member_index,
-                                  alarm.action)
+            await self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+                                        alarm.scaling_criteria.scaling_policy.scaling_group.name,
+                                        alarm.vnf_member_index,
+                                        alarm.action)
             alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now()
             alarm.scaling_criteria.scaling_policy.save()
         except ScalingAlarm.DoesNotExist:
             log.info("There is no action configured for alarm %s.", alarm_id)
 
-    def _handle_instantiated_or_scaled(self, content):
+    async def _handle_instantiated_or_scaled(self, content):
         log.debug("_handle_instantiated_or_scaled: %s", content)
         nslcmop_id = content['nslcmop_id']
         nslcmop = self.db_client.get_nslcmop(nslcmop_id)
         if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
             nsr_id = nslcmop['nsInstanceId']
             log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
-            self._configure_scaling_groups(nsr_id)
+            await self._configure_scaling_groups(nsr_id)
         else:
             log.info(
                 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
                 "Current state is %s. Skipping...",
                 nslcmop['operationState'])
 
-    def _configure_scaling_groups(self, nsr_id: str):
+    async def _configure_scaling_groups(self, nsr_id: str):
         log.debug("_configure_scaling_groups: %s", nsr_id)
         # TODO: Add support for non-nfvi metrics
         alarms_created = []
@@ -209,9 +221,15 @@ class PolicyModuleAgent:
 
                                     for vdu_ref in scaling_group['vdu']:
                                         vnf_monitoring_param = next(
-                                            filter(lambda param: param['id'] == scaling_criteria[
-                                                'vnf-monitoring-param-ref'], vnf_monitoring_params))
-                                        if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']:
+                                            filter(
+                                                lambda param: param['id'] == scaling_criteria[
+                                                    'vnf-monitoring-param-ref'
+                                                ],
+                                                vnf_monitoring_params)
+                                        )
+                                        if vdu_ref['vdu-id-ref'] != vnf_monitoring_param['vdu-monitoring-param'][
+                                            'vdu-ref'
+                                        ]:
                                             continue
                                         vdu = next(
                                             filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu'])
@@ -220,22 +238,34 @@ class PolicyModuleAgent:
                                         vdu_monitoring_param = next(
                                             filter(
                                                 lambda param: param['id'] == vnf_monitoring_param[
-                                                    'vdu-monitoring-param-ref'],
+                                                    'vdu-monitoring-param'
+                                                ][
+                                                    'vdu-monitoring-param-ref'
+                                                ],
                                                 vdu_monitoring_params))
                                         vdurs = list(
-                                            filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'],
-                                                   vnfr['vdur']))
+                                            filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param[
+                                                'vdu-monitoring-param'
+                                            ][
+                                                'vdu-ref'
+                                            ], vnfr['vdur']))
                                         for vdur in vdurs:
                                             try:
-                                                ScalingAlarm.select().join(ScalingCriteria).where(
+                                                (ScalingAlarm.select()
+                                                 .join(ScalingCriteria)
+                                                 .join(ScalingPolicy)
+                                                 .join(ScalingGroup)
+                                                 .where(
                                                     ScalingAlarm.vdu_name == vdur['name'],
-                                                    ScalingCriteria.name == scaling_criteria['name']
-                                                ).get()
+                                                    ScalingCriteria.name == scaling_criteria['name'],
+                                                    ScalingPolicy.name == scaling_policy['name'],
+                                                    ScalingGroup.nsr_id == nsr_id
+                                                ).get())
                                                 log.debug("vdu %s already has an alarm configured", vdur['name'])
                                                 continue
                                             except ScalingAlarm.DoesNotExist:
                                                 pass
-                                            alarm_uuid = self.mon_client.create_alarm(
+                                            alarm_uuid = await self.mon_client.create_alarm(
                                                 metric_name=vdu_monitoring_param['nfvi-metric'],
                                                 ns_id=nsr_id,
                                                 vdu_name=vdur['name'],
@@ -251,7 +281,7 @@ class PolicyModuleAgent:
                                                 vdu_name=vdur['name'],
                                                 scaling_criteria=scaling_criteria_record
                                             )
-                                            alarm_uuid = self.mon_client.create_alarm(
+                                            alarm_uuid = await self.mon_client.create_alarm(
                                                 metric_name=vdu_monitoring_param['nfvi-metric'],
                                                 ns_id=nsr_id,
                                                 vdu_name=vdur['name'],
@@ -273,5 +303,5 @@ class PolicyModuleAgent:
                 if len(alarms_created) > 0:
                     log.info("Cleaning alarm resources in MON")
                     for alarm in alarms_created:
-                        self.mon_client.delete_alarm(*alarm)
+                        await self.mon_client.delete_alarm(*alarm)
                 raise e