Exits process when encountered by a database exception
[osm/POL.git] / osm_policy_module / core / agent.py
index 3e64652..205b98c 100644 (file)
@@ -27,6 +27,7 @@ import json
 import logging
 from json import JSONDecodeError
 
+import peewee
 import yaml
 from aiokafka import AIOKafkaConsumer
 
@@ -36,6 +37,7 @@ from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
 from osm_policy_module.core.config import Config
 from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, DatabaseManager
+from osm_policy_module.core.exceptions import VdurNotFound
 from osm_policy_module.utils.vnfd import VnfdUtils
 
 log = logging.getLogger(__name__)
@@ -76,6 +78,7 @@ class PolicyModuleAgent:
                 await self._process_msg(msg.topic, msg.key, msg.value)
         finally:
             await consumer.stop()
+        log.critical("Exiting...")
 
     async def _process_msg(self, topic, key, msg):
         log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
@@ -96,6 +99,9 @@ class PolicyModuleAgent:
                     await self._handle_alarm_notification(content)
             else:
                 log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
+        except peewee.PeeweeException:
+            log.exception("Database error consuming message: ")
+            raise
         except Exception:
             log.exception("Error consuming message: ")
 
@@ -144,6 +150,8 @@ class PolicyModuleAgent:
             nsr_id = nslcmop['nsInstanceId']
             log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
             await self._configure_scaling_groups(nsr_id)
+            log.info("Checking for orphaned alarms to be deleted for network service with nsr_id: %s", nsr_id)
+            await self._delete_orphaned_alarms(nsr_id)
         else:
             log.info(
                 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
@@ -363,3 +371,29 @@ class PolicyModuleAgent:
                 log.exception("Error deleting scaling groups and alarms:")
                 tx.rollback()
                 raise e
+
+    async def _delete_orphaned_alarms(self, nsr_id):
+        with database.db.atomic() as tx:
+            try:
+                for scaling_group in ScalingGroup.select().where(ScalingGroup.nsr_id == nsr_id):
+                    for scaling_policy in scaling_group.scaling_policies:
+                        for scaling_criteria in scaling_policy.scaling_criterias:
+                            for alarm in scaling_criteria.scaling_alarms:
+                                try:
+                                    self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
+                                except VdurNotFound:
+                                    log.info("Deleting orphaned alarm %s", alarm.alarm_uuid)
+                                    try:
+                                        await self.mon_client.delete_alarm(
+                                            alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+                                            alarm.vnf_member_index,
+                                            alarm.vdu_name,
+                                            alarm.alarm_uuid)
+                                    except ValueError:
+                                        log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
+                                    alarm.delete_instance()
+
+            except Exception as e:
+                log.exception("Error deleting orphaned alarms:")
+                tx.rollback()
+                raise e