fix(aiokafka): version now fixed to 0.6.0 (this is the latest at the moment)
[osm/POL.git] / osm_policy_module / core / agent.py
index 350f383..3f87d16 100644 (file)
 ##
 import asyncio
 import logging
+from pathlib import Path
+import os
 
 import peewee
 
 from osm_policy_module.alarming.service import AlarmingService
 from osm_policy_module.autoscaling.service import AutoscalingService
+from osm_policy_module.common.common_db_client import CommonDbClient
 from osm_policy_module.common.message_bus_client import MessageBusClient
 from osm_policy_module.core.config import Config
 
@@ -43,6 +46,7 @@ class PolicyModuleAgent:
             loop = asyncio.get_event_loop()
         self.loop = loop
         self.msg_bus = MessageBusClient(config)
+        self.db_client = CommonDbClient(config)
         self.autoscaling_service = AutoscalingService(config, loop)
         self.alarming_service = AlarmingService(config, loop)
 
@@ -50,14 +54,18 @@ class PolicyModuleAgent:
         self.loop.run_until_complete(self.start())
 
     async def start(self):
+        Path('/tmp/osm_pol_agent_health_flag').touch()
         topics = [
             "ns",
             "alarm_response"
         ]
         await self.msg_bus.aioread(topics, self._process_msg)
         log.critical("Exiting...")
+        if os.path.exists('/tmp/osm_pol_agent_health_flag'):
+            os.remove('/tmp/osm_pol_agent_health_flag')
 
     async def _process_msg(self, topic, key, msg):
+        Path('/tmp/osm_pol_agent_health_flag').touch()
         log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
         try:
             if key in ALLOWED_KAFKA_KEYS:
@@ -91,7 +99,7 @@ class PolicyModuleAgent:
     async def _handle_instantiated(self, content):
         log.debug("_handle_instantiated: %s", content)
         nslcmop_id = content['nslcmop_id']
-        nslcmop = self.autoscaling_service.get_nslcmop(nslcmop_id)
+        nslcmop = self.db_client.get_nslcmop(nslcmop_id)
         if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
             nsr_id = nslcmop['nsInstanceId']
             log.info("Configuring nsr_id: %s", nsr_id)
@@ -106,7 +114,7 @@ class PolicyModuleAgent:
     async def _handle_scaled(self, content):
         log.debug("_handle_scaled: %s", content)
         nslcmop_id = content['nslcmop_id']
-        nslcmop = self.autoscaling_service.get_nslcmop(nslcmop_id)
+        nslcmop = self.db_client.get_nslcmop(nslcmop_id)
         if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
             nsr_id = nslcmop['nsInstanceId']
             log.info("Configuring scaled service with nsr_id: %s", nsr_id)