Replaces direct use of aiokafka with osm_common message bus in agent and
[osm/POL.git] / osm_policy_module / common / lcm_client.py
index 05378d7..2efa241 100644 (file)
@@ -28,22 +28,17 @@ import logging
 import time
 import uuid
 
-from aiokafka import AIOKafkaProducer
-from osm_common import dbmongo
-
+from osm_policy_module.common.common_db_client import CommonDbClient
+from osm_policy_module.common.message_bus_client import MessageBusClient
 from osm_policy_module.core.config import Config
 
 log = logging.getLogger(__name__)
 
 
 class LcmClient:
-    def __init__(self, loop=None):
-        cfg = Config.instance()
-        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                           cfg.OSMPOL_MESSAGE_PORT)
-        self.common_db = dbmongo.DbMongo()
-        self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI,
-                                   'name': 'osm'})
+    def __init__(self, config: Config, loop=None):
+        self.db_client = CommonDbClient(config)
+        self.msg_bus = MessageBusClient(config)
         if not loop:
             loop = asyncio.get_event_loop()
         self.loop = loop
@@ -51,19 +46,9 @@ class LcmClient:
     async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
         nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
-        self.common_db.create("nslcmops", nslcmop)
+        self.db_client.create_nslcmop(nslcmop)
         log.info("Sending scale action message: %s", json.dumps(nslcmop))
-        producer = AIOKafkaProducer(loop=self.loop,
-                                    bootstrap_servers=self.kafka_server,
-                                    key_serializer=str.encode,
-                                    value_serializer=str.encode)
-        await producer.start()
-        try:
-            # Produce message
-            await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
-        finally:
-            # Wait for all pending messages to be delivered or expire.
-            await producer.stop()
+        await self.msg_bus.aiowrite("ns", "scale", nslcmop)
 
     def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)