X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=lcm%2Fosm_common%2Fmsgkafka.py;h=de1e764a23e6b5fc4f05dbde38262a993fbbe6d4;hb=40d97800f7b48f533abfcdef98ee992daf9acb37;hp=f3507452802b5cc0b63766cfa5d57e64c6b2b887;hpb=f3c4dbc42e206bcc0d4d3369f6d0d156d7ffe669;p=osm%2FRO.git diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py index f3507452..de1e764a 100644 --- a/lcm/osm_common/msgkafka.py +++ b/lcm/osm_common/msgkafka.py @@ -1,70 +1,104 @@ +import logging +import asyncio +import yaml from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from aiokafka.errors import KafkaError from msgbase import MsgBase, MsgException -import asyncio -import yaml #import json + class MsgKafka(MsgBase): - def __init__(self): + def __init__(self, logger_name='msg'): + self.logger = logging.getLogger(logger_name) self.host = None self.port = None self.consumer = None self.producer = None - # create a different file for each topic - #self.files = {} def connect(self, config): try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) self.host = config["host"] self.port = config["port"] - self.topic_lst = [] self.loop = asyncio.get_event_loop() self.broker = str(self.host) + ":" + str(self.port) except Exception as e: # TODO refine raise MsgException(str(e)) + def disconnect(self): + try: + self.loop.close() + except Exception as e: # TODO refine + raise MsgException(str(e)) + def write(self, topic, key, msg): try: - self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True))) + self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, + msg=yaml.safe_dump(msg, default_flow_style=True), + loop=self.loop)) except Exception as e: raise MsgException("Error writing {} topic: {}".format(topic, str(e))) def read(self, topic): - #self.topic_lst.append(topic) + """ + Read from one or several topics. it is non blocking returning None if nothing is available + :param topic: can be str: single topic; or str list: several topics + :return: topic, key, message; or None + """ try: - return self.loop.run_until_complete(self.aioread(topic)) + return self.loop.run_until_complete(self.aioread(topic, self.loop)) + except MsgException: + raise except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) async def aiowrite(self, topic, key, msg, loop=None): + + if not loop: + loop = self.loop try: - if not loop: - loop = self.loop self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) await self.producer.start() - await self.producer.send(topic=topic, key=key, value=msg) + await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True)) except Exception as e: - raise MsgException("Error publishing to {} topic: {}".format(topic, str(e))) + raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e)) finally: await self.producer.stop() - async def aioread(self, topic, loop=None): + async def aioread(self, topic, loop=None, callback=None, *args): + """ + Asyncio read from one or several topics. It blocks + :param topic: can be str: single topic; or str list: several topics + :param loop: asyncio loop + :callback: callback function that will handle the message in kafka bus + :*args: optional arguments for callback function + :return: topic, key, message + """ + if not loop: loop = self.loop - self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) - await self.consumer.start() - self.consumer.subscribe([topic]) try: + if isinstance(topic, (list, tuple)): + topic_list = topic + else: + topic_list = (topic,) + + self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) + await self.consumer.start() + self.consumer.subscribe(topic_list) + async for message in self.consumer: - return yaml.load(message.key), yaml.load(message.value) + if callback: + callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args) + else: + return message.topic, yaml.load(message.key), yaml.load(message.value) except KafkaError as e: raise MsgException(str(e)) finally: await self.consumer.stop() -