Replaces direct use of aiokafka with osm_common message bus in agent and
lcmclient

Changes config handling to comply with the way it is handled in other modules,
by using a config file and overriding it with env vars.

Adds unit tests for message_bus_client.

Mon client remains using aiokafka directly, as there is no support yet for
auto_offset_reset configuration in osm_common.

Change-Id: I99615287cc934ce310105e86544a6bfe26bc0673
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
diff --git a/osm_policy_module/common/lcm_client.py b/osm_policy_module/common/lcm_client.py
index 05378d7..2efa241 100644
--- a/osm_policy_module/common/lcm_client.py
+++ b/osm_policy_module/common/lcm_client.py
@@ -28,22 +28,17 @@
 import time
 import uuid
 
-from aiokafka import AIOKafkaProducer
-from osm_common import dbmongo
-
+from osm_policy_module.common.common_db_client import CommonDbClient
+from osm_policy_module.common.message_bus_client import MessageBusClient
 from osm_policy_module.core.config import Config
 
 log = logging.getLogger(__name__)
 
 
 class LcmClient:
-    def __init__(self, loop=None):
-        cfg = Config.instance()
-        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                           cfg.OSMPOL_MESSAGE_PORT)
-        self.common_db = dbmongo.DbMongo()
-        self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI,
-                                   'name': 'osm'})
+    def __init__(self, config: Config, loop=None):
+        self.db_client = CommonDbClient(config)
+        self.msg_bus = MessageBusClient(config)
         if not loop:
             loop = asyncio.get_event_loop()
         self.loop = loop
@@ -51,19 +46,9 @@
     async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
         nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
-        self.common_db.create("nslcmops", nslcmop)
+        self.db_client.create_nslcmop(nslcmop)
         log.info("Sending scale action message: %s", json.dumps(nslcmop))
-        producer = AIOKafkaProducer(loop=self.loop,
-                                    bootstrap_servers=self.kafka_server,
-                                    key_serializer=str.encode,
-                                    value_serializer=str.encode)
-        await producer.start()
-        try:
-            # Produce message
-            await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
-        finally:
-            # Wait for all pending messages to be delivered or expire.
-            await producer.stop()
+        await self.msg_bus.aiowrite("ns", "scale", nslcmop)
 
     def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)