X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=lcm%2Fosm_common%2Fmsgkafka.py;h=f3507452802b5cc0b63766cfa5d57e64c6b2b887;hb=c887cc2fdd02c898ecd1f08d6927043027fd07eb;hp=7b294fc048b2ba25b1bc57d527d2bfe11807a3b2;hpb=13fd64d81fed0ccbc67bcd2cbe9d63c8b06d20eb;p=osm%2FRO.git diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py index 7b294fc0..f3507452 100644 --- a/lcm/osm_common/msgkafka.py +++ b/lcm/osm_common/msgkafka.py @@ -6,7 +6,7 @@ import asyncio import yaml #import json -class msgKafka(MsgBase): +class MsgKafka(MsgBase): def __init__(self): self.host = None self.port = None @@ -26,24 +26,25 @@ class msgKafka(MsgBase): except Exception as e: # TODO refine raise MsgException(str(e)) - def write(self, topic, msg, key): - + def write(self, topic, key, msg): try: - self.loop.run_until_complete(self.aiowrite(key, msg=yaml.safe_dump(msg, default_flow_style=True), topic=topic)) + self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True))) except Exception as e: raise MsgException("Error writing {} topic: {}".format(topic, str(e))) def read(self, topic): - self.topic_lst.append(topic) + #self.topic_lst.append(topic) try: - return self.loop.run_until_complete(self.aioread(self.topic_lst)) + return self.loop.run_until_complete(self.aioread(topic)) except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) - async def aiowrite(self, key, msg, topic): + async def aiowrite(self, topic, key, msg, loop=None): try: - self.producer = AIOKafkaProducer(loop=self.loop, key_serializer=str.encode, value_serializer=str.encode, + if not loop: + loop = self.loop + self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) await self.producer.start() await self.producer.send(topic=topic, key=key, value=msg) @@ -52,10 +53,12 @@ class msgKafka(MsgBase): finally: await self.producer.stop() - async def aioread(self, topic): - self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker) + async def aioread(self, topic, loop=None): + if not loop: + loop = self.loop + self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) await self.consumer.start() - self.consumer.subscribe(topic) + self.consumer.subscribe([topic]) try: async for message in self.consumer: return yaml.load(message.key), yaml.load(message.value)