X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=lcm%2Fosm_common%2Fmsgkafka.py;h=459513d8b407e04773aa89050ef71ff8a67f917d;hb=refs%2Fchanges%2F39%2F5839%2F5;hp=96456afcf7283be1872275136b589986b29fd436;hpb=f3a5443bfaf65f22540eff98a49be087f0479424;p=osm%2FRO.git diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py index 96456afc..459513d8 100644 --- a/lcm/osm_common/msgkafka.py +++ b/lcm/osm_common/msgkafka.py @@ -15,8 +15,6 @@ class MsgKafka(MsgBase): self.port = None self.consumer = None self.producer = None - # create a different file for each topic - #self.files = {} def connect(self, config): try: @@ -24,7 +22,6 @@ class MsgKafka(MsgBase): self.logger = logging.getLogger(config["logger_name"]) self.host = config["host"] self.port = config["port"] - self.topic_lst = [] self.loop = asyncio.get_event_loop() self.broker = str(self.host) + ":" + str(self.port) @@ -60,6 +57,9 @@ class MsgKafka(MsgBase): raise MsgException("Error reading {} topic: {}".format(topic, str(e))) async def aiowrite(self, topic, key, msg, loop): + + if not loop: + loop = self.loop try: self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) @@ -70,13 +70,18 @@ class MsgKafka(MsgBase): finally: await self.producer.stop() - async def aioread(self, topic, loop): + async def aioread(self, topic, loop=None, callback=None, *args): """ Asyncio read from one or several topics. It blocks :param topic: can be str: single topic; or str list: several topics :param loop: asyncio loop + :callback: callback function that will handle the message in kafka bus + :*args: optional arguments for callback function :return: topic, key, message """ + + if not loop: + loop = self.loop try: if isinstance(topic, (list, tuple)): topic_list = topic @@ -86,11 +91,14 @@ class MsgKafka(MsgBase): self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) await self.consumer.start() self.consumer.subscribe(topic_list) + async for message in self.consumer: - return message.topic, yaml.load(message.key), yaml.load(message.value) + if callback: + callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args) + else: + return message.topic, yaml.load(message.key), yaml.load(message.value) except KafkaError as e: raise MsgException(str(e)) finally: await self.consumer.stop() -