X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fmsgkafka.py;h=96456afcf7283be1872275136b589986b29fd436;hp=90c9c7ff9992ef21f1ceb7e283e66ef32e073591;hb=80d61ec5bbbdd0c45602a9e53c41fe3dff9b50be;hpb=c94c3df90aa64298a7935a80b221f80f3c043260 diff --git a/osm_nbi/msgkafka.py b/osm_nbi/msgkafka.py index 90c9c7f..96456af 100644 --- a/osm_nbi/msgkafka.py +++ b/osm_nbi/msgkafka.py @@ -31,24 +31,36 @@ class MsgKafka(MsgBase): except Exception as e: # TODO refine raise MsgException(str(e)) + def disconnect(self): + try: + self.loop.close() + except Exception as e: # TODO refine + raise MsgException(str(e)) + def write(self, topic, key, msg): try: - self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True))) + self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, + msg=yaml.safe_dump(msg, default_flow_style=True), + loop=self.loop)) except Exception as e: raise MsgException("Error writing {} topic: {}".format(topic, str(e))) def read(self, topic): - #self.topic_lst.append(topic) + """ + Read from one or several topics. it is non blocking returning None if nothing is available + :param topic: can be str: single topic; or str list: several topics + :return: topic, key, message; or None + """ try: - return self.loop.run_until_complete(self.aioread(topic)) + return self.loop.run_until_complete(self.aioread(topic, self.loop)) + except MsgException: + raise except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) - async def aiowrite(self, topic, key, msg, loop=None): + async def aiowrite(self, topic, key, msg, loop): try: - if not loop: - loop = self.loop self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) await self.producer.start() @@ -58,15 +70,24 @@ class MsgKafka(MsgBase): finally: await self.producer.stop() - async def aioread(self, topic, loop=None): - if not loop: - loop = self.loop - self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) - await self.consumer.start() - self.consumer.subscribe([topic]) + async def aioread(self, topic, loop): + """ + Asyncio read from one or several topics. It blocks + :param topic: can be str: single topic; or str list: several topics + :param loop: asyncio loop + :return: topic, key, message + """ try: + if isinstance(topic, (list, tuple)): + topic_list = topic + else: + topic_list = (topic,) + + self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) + await self.consumer.start() + self.consumer.subscribe(topic_list) async for message in self.consumer: - return yaml.load(message.key), yaml.load(message.value) + return message.topic, yaml.load(message.key), yaml.load(message.value) except KafkaError as e: raise MsgException(str(e)) finally: