X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=lcm%2Fosm_common%2Fmsgkafka.py;h=de1e764a23e6b5fc4f05dbde38262a993fbbe6d4;hb=refs%2Fchanges%2F37%2F6137%2F3;hp=7b294fc048b2ba25b1bc57d527d2bfe11807a3b2;hpb=13fd64d81fed0ccbc67bcd2cbe9d63c8b06d20eb;p=osm%2FRO.git diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py index 7b294fc0..de1e764a 100644 --- a/lcm/osm_common/msgkafka.py +++ b/lcm/osm_common/msgkafka.py @@ -1,67 +1,104 @@ +import logging +import asyncio +import yaml from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from aiokafka.errors import KafkaError from msgbase import MsgBase, MsgException -import asyncio -import yaml #import json -class msgKafka(MsgBase): - def __init__(self): + +class MsgKafka(MsgBase): + def __init__(self, logger_name='msg'): + self.logger = logging.getLogger(logger_name) self.host = None self.port = None self.consumer = None self.producer = None - # create a different file for each topic - #self.files = {} def connect(self, config): try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) self.host = config["host"] self.port = config["port"] - self.topic_lst = [] self.loop = asyncio.get_event_loop() self.broker = str(self.host) + ":" + str(self.port) except Exception as e: # TODO refine raise MsgException(str(e)) - def write(self, topic, msg, key): + def disconnect(self): + try: + self.loop.close() + except Exception as e: # TODO refine + raise MsgException(str(e)) + def write(self, topic, key, msg): try: - self.loop.run_until_complete(self.aiowrite(key, msg=yaml.safe_dump(msg, default_flow_style=True), topic=topic)) + self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, + msg=yaml.safe_dump(msg, default_flow_style=True), + loop=self.loop)) except Exception as e: raise MsgException("Error writing {} topic: {}".format(topic, str(e))) def read(self, topic): - self.topic_lst.append(topic) + """ + Read from one or several topics. it is non blocking returning None if nothing is available + :param topic: can be str: single topic; or str list: several topics + :return: topic, key, message; or None + """ try: - return self.loop.run_until_complete(self.aioread(self.topic_lst)) + return self.loop.run_until_complete(self.aioread(topic, self.loop)) + except MsgException: + raise except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) - async def aiowrite(self, key, msg, topic): + async def aiowrite(self, topic, key, msg, loop=None): + + if not loop: + loop = self.loop try: - self.producer = AIOKafkaProducer(loop=self.loop, key_serializer=str.encode, value_serializer=str.encode, + self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) await self.producer.start() - await self.producer.send(topic=topic, key=key, value=msg) + await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True)) except Exception as e: - raise MsgException("Error publishing to {} topic: {}".format(topic, str(e))) + raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e)) finally: await self.producer.stop() - async def aioread(self, topic): - self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker) - await self.consumer.start() - self.consumer.subscribe(topic) + async def aioread(self, topic, loop=None, callback=None, *args): + """ + Asyncio read from one or several topics. It blocks + :param topic: can be str: single topic; or str list: several topics + :param loop: asyncio loop + :callback: callback function that will handle the message in kafka bus + :*args: optional arguments for callback function + :return: topic, key, message + """ + + if not loop: + loop = self.loop try: + if isinstance(topic, (list, tuple)): + topic_list = topic + else: + topic_list = (topic,) + + self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) + await self.consumer.start() + self.consumer.subscribe(topic_list) + async for message in self.consumer: - return yaml.load(message.key), yaml.load(message.value) + if callback: + callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args) + else: + return message.topic, yaml.load(message.key), yaml.load(message.value) except KafkaError as e: raise MsgException(str(e)) finally: await self.consumer.stop() -