From 13fd64d81fed0ccbc67bcd2cbe9d63c8b06d20eb Mon Sep 17 00:00:00 2001 From: gcalvino Date: Fri, 2 Feb 2018 10:53:30 +0100 Subject: [PATCH] lightweight kafka support Change-Id: I1c124d7fbb64dd16990f18f677c2e61d18aa767e Signed-off-by: gcalvino --- lcm/osm_common/msgkafka.py | 67 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 lcm/osm_common/msgkafka.py diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py new file mode 100644 index 00000000..7b294fc0 --- /dev/null +++ b/lcm/osm_common/msgkafka.py @@ -0,0 +1,67 @@ +from aiokafka import AIOKafkaConsumer +from aiokafka import AIOKafkaProducer +from aiokafka.errors import KafkaError +from msgbase import MsgBase, MsgException +import asyncio +import yaml +#import json + +class msgKafka(MsgBase): + def __init__(self): + self.host = None + self.port = None + self.consumer = None + self.producer = None + # create a different file for each topic + #self.files = {} + + def connect(self, config): + try: + self.host = config["host"] + self.port = config["port"] + self.topic_lst = [] + self.loop = asyncio.get_event_loop() + self.broker = str(self.host) + ":" + str(self.port) + + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def write(self, topic, msg, key): + + try: + self.loop.run_until_complete(self.aiowrite(key, msg=yaml.safe_dump(msg, default_flow_style=True), topic=topic)) + + except Exception as e: + raise MsgException("Error writing {} topic: {}".format(topic, str(e))) + + def read(self, topic): + self.topic_lst.append(topic) + try: + return self.loop.run_until_complete(self.aioread(self.topic_lst)) + except Exception as e: + raise MsgException("Error reading {} topic: {}".format(topic, str(e))) + + async def aiowrite(self, key, msg, topic): + try: + self.producer = AIOKafkaProducer(loop=self.loop, key_serializer=str.encode, value_serializer=str.encode, + bootstrap_servers=self.broker) + await self.producer.start() + await self.producer.send(topic=topic, key=key, value=msg) + except Exception as e: + raise MsgException("Error publishing to {} topic: {}".format(topic, str(e))) + finally: + await self.producer.stop() + + async def aioread(self, topic): + self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker) + await self.consumer.start() + self.consumer.subscribe(topic) + try: + async for message in self.consumer: + return yaml.load(message.key), yaml.load(message.value) + except KafkaError as e: + raise MsgException(str(e)) + finally: + await self.consumer.stop() + + -- 2.25.1