X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcore%2Fagent.py;h=4a2052760db192a2bc17fb7235359af8d68cc2f6;hb=055c4ee00f83647e7e807aa3d44c3384d9d79aa2;hp=a73cd5b6a560a47650c4255701050b029b4d050b;hpb=1d7fa33ffdd43fe36f8a83088b5b51bc0d875925;p=osm%2FPOL.git diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index a73cd5b..4a20527 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -30,28 +30,34 @@ import peewee from osm_policy_module.alarming.service import AlarmingService from osm_policy_module.autoscaling.service import AutoscalingService +from osm_policy_module.healing.service import HealingService from osm_policy_module.common.common_db_client import CommonDbClient from osm_policy_module.common.message_bus_client import MessageBusClient from osm_policy_module.core.config import Config log = logging.getLogger(__name__) -ALLOWED_KAFKA_KEYS = ["instantiated", "scaled", "terminated", "notify_alarm", "vnf_terminated"] +ALLOWED_KAFKA_KEYS = [ + "instantiated", + "scaled", + "terminated", + "notify_alarm", + "policy_updated", + "vnf_terminated", +] class PolicyModuleAgent: - def __init__(self, config: Config, loop=None): + def __init__(self, config: Config): self.conf = config - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop self.msg_bus = MessageBusClient(config) self.db_client = CommonDbClient(config) - self.autoscaling_service = AutoscalingService(config, loop) - self.alarming_service = AlarmingService(config, loop) + self.autoscaling_service = AutoscalingService(config) + self.alarming_service = AlarmingService(config) + self.healing_service = HealingService(config) def run(self): - self.loop.run_until_complete(self.start()) + asyncio.run(self.start()) async def start(self): Path("/tmp/osm_pol_agent_health_flag").touch() @@ -66,7 +72,6 @@ class PolicyModuleAgent: log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg) try: if key in ALLOWED_KAFKA_KEYS: - if key == "instantiated": await self._handle_instantiated(msg) @@ -79,6 +84,9 @@ class PolicyModuleAgent: if key == "notify_alarm": await self._handle_alarm_notification(msg) + if key == "policy_updated": + await self._handle_policy_update(msg) + if key == "vnf_terminated": await self._handle_vnf_terminated(msg) else: @@ -95,6 +103,7 @@ class PolicyModuleAgent: status = content["notify_details"]["status"] await self.autoscaling_service.handle_alarm(alarm_uuid, status) await self.alarming_service.handle_alarm(alarm_uuid, status, content) + await self.healing_service.handle_alarm(alarm_uuid, status) async def _handle_instantiated(self, content): log.debug("_handle_instantiated: %s", content) @@ -108,6 +117,7 @@ class PolicyModuleAgent: log.info("Configuring nsr_id: %s", nsr_id) await self.autoscaling_service.configure_scaling_groups(nsr_id) await self.alarming_service.configure_vnf_alarms(nsr_id) + await self.healing_service.configure_healing_alarms(nsr_id) else: log.info( "Network_service is not in COMPLETED or PARTIALLY_COMPLETED state. " @@ -128,6 +138,8 @@ class PolicyModuleAgent: await self.autoscaling_service.configure_scaling_groups(nsr_id) await self.autoscaling_service.delete_orphaned_alarms(nsr_id) await self.alarming_service.configure_vnf_alarms(nsr_id) + await self.healing_service.configure_healing_alarms(nsr_id) + await self.healing_service.delete_orphaned_healing_alarms(nsr_id) else: log.debug( "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " @@ -148,6 +160,38 @@ class PolicyModuleAgent: ) await self.autoscaling_service.delete_scaling_groups(nsr_id) await self.alarming_service.delete_vnf_alarms(nsr_id) + await self.healing_service.delete_healing_alarms(nsr_id) + else: + log.info( + "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " + "Current state is %s. Skipping...", + content["operationState"], + ) + + async def _handle_policy_update(self, content): + log.info("_handle_policy_update: %s", content) + nsr_id = content["nsr_id"] + vnf_member_index = content["vnf_member_index"] + if ( + content["operationState"] == "COMPLETED" + or content["operationState"] == "PARTIALLY_COMPLETED" + ): + log.info( + "Updating policies of VNF with nsr_id: %s and vnf-member-index: %s" + % (nsr_id, vnf_member_index) + ) + await self.autoscaling_service.delete_scaling_groups( + nsr_id, vnf_member_index + ) + await self.alarming_service.delete_vnf_alarms(nsr_id, vnf_member_index) + await self.healing_service.delete_healing_alarms(nsr_id, vnf_member_index) + await self.autoscaling_service.configure_scaling_groups( + nsr_id, vnf_member_index + ) + await self.alarming_service.configure_vnf_alarms(nsr_id, vnf_member_index) + await self.healing_service.configure_healing_alarms( + nsr_id, vnf_member_index + ) else: log.info( "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " @@ -156,19 +200,23 @@ class PolicyModuleAgent: ) async def _handle_vnf_terminated(self, content): - nsr_id = content['nsr_id'] - vnf_member_index = content['vnf_member_index'] + nsr_id = content["nsr_id"] + vnf_member_index = content["vnf_member_index"] if ( content["operationState"] == "COMPLETED" or content["operationState"] == "PARTIALLY_COMPLETED" ): log.info( "Deleting policies of VNF with nsr_id: %s and vnf-member-index: %s" - % (nsr_id, vnf_member_index)) - await self.autoscaling_service.delete_scaling_groups(nsr_id, vnf_member_index) + % (nsr_id, vnf_member_index) + ) + await self.autoscaling_service.delete_scaling_groups( + nsr_id, vnf_member_index + ) await self.alarming_service.delete_vnf_alarms(nsr_id, vnf_member_index) else: log.info( "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " "Current state is %s. Skipping...", - content['operationState']) + content["operationState"], + )