projects
/
osm
/
RO.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
lightweight kafka support with callback
[osm/RO.git]
/
lcm
/
osm_common
/
msgkafka.py
diff --git
a/lcm/osm_common/msgkafka.py
b/lcm/osm_common/msgkafka.py
index
96456af
..
459513d
100644
(file)
--- a/
lcm/osm_common/msgkafka.py
+++ b/
lcm/osm_common/msgkafka.py
@@
-15,8
+15,6
@@
class MsgKafka(MsgBase):
self.port = None
self.consumer = None
self.producer = None
self.port = None
self.consumer = None
self.producer = None
- # create a different file for each topic
- #self.files = {}
def connect(self, config):
try:
def connect(self, config):
try:
@@
-24,7
+22,6
@@
class MsgKafka(MsgBase):
self.logger = logging.getLogger(config["logger_name"])
self.host = config["host"]
self.port = config["port"]
self.logger = logging.getLogger(config["logger_name"])
self.host = config["host"]
self.port = config["port"]
- self.topic_lst = []
self.loop = asyncio.get_event_loop()
self.broker = str(self.host) + ":" + str(self.port)
self.loop = asyncio.get_event_loop()
self.broker = str(self.host) + ":" + str(self.port)
@@
-60,6
+57,9
@@
class MsgKafka(MsgBase):
raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
async def aiowrite(self, topic, key, msg, loop):
raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
async def aiowrite(self, topic, key, msg, loop):
+
+ if not loop:
+ loop = self.loop
try:
self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
bootstrap_servers=self.broker)
try:
self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
bootstrap_servers=self.broker)
@@
-70,13
+70,18
@@
class MsgKafka(MsgBase):
finally:
await self.producer.stop()
finally:
await self.producer.stop()
- async def aioread(self, topic, loop):
+ async def aioread(self, topic, loop
=None, callback=None, *args
):
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics
:param loop: asyncio loop
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics
:param loop: asyncio loop
+ :callback: callback function that will handle the message in kafka bus
+ :*args: optional arguments for callback function
:return: topic, key, message
"""
:return: topic, key, message
"""
+
+ if not loop:
+ loop = self.loop
try:
if isinstance(topic, (list, tuple)):
topic_list = topic
try:
if isinstance(topic, (list, tuple)):
topic_list = topic
@@
-86,11
+91,14
@@
class MsgKafka(MsgBase):
self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
await self.consumer.start()
self.consumer.subscribe(topic_list)
self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
await self.consumer.start()
self.consumer.subscribe(topic_list)
+
async for message in self.consumer:
async for message in self.consumer:
- return message.topic, yaml.load(message.key), yaml.load(message.value)
+ if callback:
+ callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+ else:
+ return message.topic, yaml.load(message.key), yaml.load(message.value)
except KafkaError as e:
raise MsgException(str(e))
finally:
await self.consumer.stop()
except KafkaError as e:
raise MsgException(str(e))
finally:
await self.consumer.stop()
-