Implements aiokafka and modifies code to support asyncio
[osm/POL.git] / osm_policy_module / common / lcm_client.py
index 34e212f..3d8012f 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import asyncio
 import datetime
 import json
 import logging
 import time
 import uuid
 
-from kafka import KafkaProducer
+from aiokafka import AIOKafkaProducer
 from osm_common import dbmongo
 
 from osm_policy_module.core.config import Config
@@ -36,25 +37,34 @@ log = logging.getLogger(__name__)
 
 
 class LcmClient:
-    def __init__(self):
+    def __init__(self, loop=None):
         cfg = Config.instance()
         self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
                                            cfg.OSMPOL_MESSAGE_PORT)
-        self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
-                                      key_serializer=str.encode,
-                                      value_serializer=str.encode)
         self.common_db = dbmongo.DbMongo()
         self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
                                    'port': int(cfg.OSMPOL_DATABASE_PORT),
                                    'name': 'osm'})
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
 
-    def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+    async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
         nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
         self.common_db.create("nslcmops", nslcmop)
         log.info("Sending scale action message: %s", json.dumps(nslcmop))
-        self.producer.send(topic='ns', key='scale', value=json.dumps(nslcmop))
-        self.producer.flush()
+        producer = AIOKafkaProducer(loop=self.loop,
+                                    bootstrap_servers=self.kafka_server,
+                                    key_serializer=str.encode,
+                                    value_serializer=str.encode)
+        await producer.start()
+        try:
+            # Produce message
+            await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
+        finally:
+            # Wait for all pending messages to be delivered or expire.
+            await producer.stop()
 
     def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)