X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=lcm%2Fosm_common%2Fmsgkafka.py;h=90c9c7ff9992ef21f1ceb7e283e66ef32e073591;hb=ae501920e1c0e03c8571bece610dd5518e6e86b9;hp=7b294fc048b2ba25b1bc57d527d2bfe11807a3b2;hpb=13fd64d81fed0ccbc67bcd2cbe9d63c8b06d20eb;p=osm%2FRO.git diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py index 7b294fc0..90c9c7ff 100644 --- a/lcm/osm_common/msgkafka.py +++ b/lcm/osm_common/msgkafka.py @@ -1,13 +1,16 @@ +import logging +import asyncio +import yaml from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from aiokafka.errors import KafkaError from msgbase import MsgBase, MsgException -import asyncio -import yaml #import json -class msgKafka(MsgBase): - def __init__(self): + +class MsgKafka(MsgBase): + def __init__(self, logger_name='msg'): + self.logger = logging.getLogger(logger_name) self.host = None self.port = None self.consumer = None @@ -17,6 +20,8 @@ class msgKafka(MsgBase): def connect(self, config): try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) self.host = config["host"] self.port = config["port"] self.topic_lst = [] @@ -26,24 +31,25 @@ class msgKafka(MsgBase): except Exception as e: # TODO refine raise MsgException(str(e)) - def write(self, topic, msg, key): - + def write(self, topic, key, msg): try: - self.loop.run_until_complete(self.aiowrite(key, msg=yaml.safe_dump(msg, default_flow_style=True), topic=topic)) + self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True))) except Exception as e: raise MsgException("Error writing {} topic: {}".format(topic, str(e))) def read(self, topic): - self.topic_lst.append(topic) + #self.topic_lst.append(topic) try: - return self.loop.run_until_complete(self.aioread(self.topic_lst)) + return self.loop.run_until_complete(self.aioread(topic)) except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) - async def aiowrite(self, key, msg, topic): + async def aiowrite(self, topic, key, msg, loop=None): try: - self.producer = AIOKafkaProducer(loop=self.loop, key_serializer=str.encode, value_serializer=str.encode, + if not loop: + loop = self.loop + self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) await self.producer.start() await self.producer.send(topic=topic, key=key, value=msg) @@ -52,10 +58,12 @@ class msgKafka(MsgBase): finally: await self.producer.stop() - async def aioread(self, topic): - self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker) + async def aioread(self, topic, loop=None): + if not loop: + loop = self.loop + self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) await self.consumer.start() - self.consumer.subscribe(topic) + self.consumer.subscribe([topic]) try: async for message in self.consumer: return yaml.load(message.key), yaml.load(message.value)