X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcore%2Fagent.py;h=777723bfd25f57c6a5ad7d2d99f6e9fc2fb798b9;hb=cb5642a2af495af4319beb1dba2d417b78f3200a;hp=3f87d168cb47c8e4cb507694ea47921361d466ea;hpb=946821f0dcc61f0bf0fe6d78bc2dc7db2636296c;p=osm%2FPOL.git diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index 3f87d16..777723b 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -36,7 +36,7 @@ from osm_policy_module.core.config import Config log = logging.getLogger(__name__) -ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'terminated', 'notify_alarm'] +ALLOWED_KAFKA_KEYS = ["instantiated", "scaled", "terminated", "notify_alarm", "policy_updated", "vnf_terminated"] class PolicyModuleAgent: @@ -54,33 +54,36 @@ class PolicyModuleAgent: self.loop.run_until_complete(self.start()) async def start(self): - Path('/tmp/osm_pol_agent_health_flag').touch() - topics = [ - "ns", - "alarm_response" - ] + Path("/tmp/osm_pol_agent_health_flag").touch() + topics = ["ns", "alarm_response"] await self.msg_bus.aioread(topics, self._process_msg) log.critical("Exiting...") - if os.path.exists('/tmp/osm_pol_agent_health_flag'): - os.remove('/tmp/osm_pol_agent_health_flag') + if os.path.exists("/tmp/osm_pol_agent_health_flag"): + os.remove("/tmp/osm_pol_agent_health_flag") async def _process_msg(self, topic, key, msg): - Path('/tmp/osm_pol_agent_health_flag').touch() + Path("/tmp/osm_pol_agent_health_flag").touch() log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg) try: if key in ALLOWED_KAFKA_KEYS: - if key == 'instantiated': + if key == "instantiated": await self._handle_instantiated(msg) - if key == 'scaled': + if key == "scaled": await self._handle_scaled(msg) - if key == 'terminated': + if key == "terminated": await self._handle_terminated(msg) - if key == 'notify_alarm': + if key == "notify_alarm": await self._handle_alarm_notification(msg) + + if key == "policy_updated": + await self._handle_policy_update(msg) + + if key == "vnf_terminated": + await self._handle_vnf_terminated(msg) else: log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key) except peewee.PeeweeException: @@ -91,17 +94,20 @@ class PolicyModuleAgent: async def _handle_alarm_notification(self, content): log.debug("_handle_alarm_notification: %s", content) - alarm_uuid = content['notify_details']['alarm_uuid'] - status = content['notify_details']['status'] + alarm_uuid = content["notify_details"]["alarm_uuid"] + status = content["notify_details"]["status"] await self.autoscaling_service.handle_alarm(alarm_uuid, status) await self.alarming_service.handle_alarm(alarm_uuid, status, content) async def _handle_instantiated(self, content): log.debug("_handle_instantiated: %s", content) - nslcmop_id = content['nslcmop_id'] + nslcmop_id = content["nslcmop_id"] nslcmop = self.db_client.get_nslcmop(nslcmop_id) - if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': - nsr_id = nslcmop['nsInstanceId'] + if ( + nslcmop["operationState"] == "COMPLETED" + or nslcmop["operationState"] == "PARTIALLY_COMPLETED" + ): + nsr_id = nslcmop["nsInstanceId"] log.info("Configuring nsr_id: %s", nsr_id) await self.autoscaling_service.configure_scaling_groups(nsr_id) await self.alarming_service.configure_vnf_alarms(nsr_id) @@ -109,14 +115,18 @@ class PolicyModuleAgent: log.info( "Network_service is not in COMPLETED or PARTIALLY_COMPLETED state. " "Current state is %s. Skipping...", - nslcmop['operationState']) + nslcmop["operationState"], + ) async def _handle_scaled(self, content): log.debug("_handle_scaled: %s", content) - nslcmop_id = content['nslcmop_id'] + nslcmop_id = content["nslcmop_id"] nslcmop = self.db_client.get_nslcmop(nslcmop_id) - if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': - nsr_id = nslcmop['nsInstanceId'] + if ( + nslcmop["operationState"] == "COMPLETED" + or nslcmop["operationState"] == "PARTIALLY_COMPLETED" + ): + nsr_id = nslcmop["nsInstanceId"] log.info("Configuring scaled service with nsr_id: %s", nsr_id) await self.autoscaling_service.configure_scaling_groups(nsr_id) await self.autoscaling_service.delete_orphaned_alarms(nsr_id) @@ -125,15 +135,62 @@ class PolicyModuleAgent: log.debug( "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " "Current state is %s. Skipping...", - nslcmop['operationState']) + nslcmop["operationState"], + ) async def _handle_terminated(self, content): log.debug("_handle_deleted: %s", content) - nsr_id = content['nsr_id'] - if content['operationState'] == 'COMPLETED' or content['operationState'] == 'PARTIALLY_COMPLETED': - log.info("Deleting scaling groups and alarms for network autoscaling_service with nsr_id: %s", nsr_id) + nsr_id = content["nsr_id"] + if ( + content["operationState"] == "COMPLETED" + or content["operationState"] == "PARTIALLY_COMPLETED" + ): + log.info( + "Deleting scaling groups and alarms for network autoscaling_service with nsr_id: %s", + nsr_id, + ) await self.autoscaling_service.delete_scaling_groups(nsr_id) await self.alarming_service.delete_vnf_alarms(nsr_id) + else: + log.info( + "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " + "Current state is %s. Skipping...", + content["operationState"], + ) + + async def _handle_policy_update(self, content): + log.info("_handle_policy_update: %s", content) + nsr_id = content['nsr_id'] + vnf_member_index = content['vnf_member_index'] + if ( + content["operationState"] == "COMPLETED" + or content["operationState"] == "PARTIALLY_COMPLETED" + ): + log.info( + "Updating policies of VNF with nsr_id: %s and vnf-member-index: %s" + % (nsr_id, vnf_member_index)) + await self.autoscaling_service.delete_scaling_groups(nsr_id, vnf_member_index) + await self.alarming_service.delete_vnf_alarms(nsr_id, vnf_member_index) + await self.autoscaling_service.configure_scaling_groups(nsr_id, vnf_member_index) + await self.alarming_service.configure_vnf_alarms(nsr_id, vnf_member_index) + else: + log.info( + "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " + "Current state is %s. Skipping...", + content['operationState']) + + async def _handle_vnf_terminated(self, content): + nsr_id = content['nsr_id'] + vnf_member_index = content['vnf_member_index'] + if ( + content["operationState"] == "COMPLETED" + or content["operationState"] == "PARTIALLY_COMPLETED" + ): + log.info( + "Deleting policies of VNF with nsr_id: %s and vnf-member-index: %s" + % (nsr_id, vnf_member_index)) + await self.autoscaling_service.delete_scaling_groups(nsr_id, vnf_member_index) + await self.alarming_service.delete_vnf_alarms(nsr_id, vnf_member_index) else: log.info( "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "