# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import asyncio
import datetime
import json
import logging
import time
import uuid
-from kafka import KafkaProducer
+from aiokafka import AIOKafkaProducer
from osm_common import dbmongo
from osm_policy_module.core.config import Config
class LcmClient:
- def __init__(self):
+ def __init__(self, loop=None):
cfg = Config.instance()
self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
cfg.OSMPOL_MESSAGE_PORT)
- self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
self.common_db = dbmongo.DbMongo()
self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
'port': int(cfg.OSMPOL_DATABASE_PORT),
'name': 'osm'})
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
- def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+ async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
self.common_db.create("nslcmops", nslcmop)
log.info("Sending scale action message: %s", json.dumps(nslcmop))
- self.producer.send(topic='ns', key='scale', value=json.dumps(nslcmop))
- self.producer.flush()
+ producer = AIOKafkaProducer(loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ await producer.start()
+ try:
+ # Produce message
+ await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
+ finally:
+ # Wait for all pending messages to be delivered or expire.
+ await producer.stop()
def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)