projects
/
osm
/
RO.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Add juju support to lcm
[osm/RO.git]
/
lcm
/
osm_common
/
msgkafka.py
diff --git
a/lcm/osm_common/msgkafka.py
b/lcm/osm_common/msgkafka.py
index
7b294fc
..
f350745
100644
(file)
--- a/
lcm/osm_common/msgkafka.py
+++ b/
lcm/osm_common/msgkafka.py
@@
-6,7
+6,7
@@
import asyncio
import yaml
#import json
import yaml
#import json
-class
m
sgKafka(MsgBase):
+class
M
sgKafka(MsgBase):
def __init__(self):
self.host = None
self.port = None
def __init__(self):
self.host = None
self.port = None
@@
-26,24
+26,25
@@
class msgKafka(MsgBase):
except Exception as e: # TODO refine
raise MsgException(str(e))
except Exception as e: # TODO refine
raise MsgException(str(e))
- def write(self, topic, msg, key):
-
+ def write(self, topic, key, msg):
try:
try:
- self.loop.run_until_complete(self.aiowrite(
key, msg=yaml.safe_dump(msg, default_flow_style=True), topic=topic
))
+ self.loop.run_until_complete(self.aiowrite(
topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True)
))
except Exception as e:
raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
def read(self, topic):
except Exception as e:
raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
def read(self, topic):
- self.topic_lst.append(topic)
+
#
self.topic_lst.append(topic)
try:
try:
- return self.loop.run_until_complete(self.aioread(
self.topic_lst
))
+ return self.loop.run_until_complete(self.aioread(
topic
))
except Exception as e:
raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
except Exception as e:
raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
- async def aiowrite(self,
key, msg, topic
):
+ async def aiowrite(self,
topic, key, msg, loop=None
):
try:
try:
- self.producer = AIOKafkaProducer(loop=self.loop, key_serializer=str.encode, value_serializer=str.encode,
+ if not loop:
+ loop = self.loop
+ self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
bootstrap_servers=self.broker)
await self.producer.start()
await self.producer.send(topic=topic, key=key, value=msg)
bootstrap_servers=self.broker)
await self.producer.start()
await self.producer.send(topic=topic, key=key, value=msg)
@@
-52,10
+53,12
@@
class msgKafka(MsgBase):
finally:
await self.producer.stop()
finally:
await self.producer.stop()
- async def aioread(self, topic):
- self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker)
+ async def aioread(self, topic, loop=None):
+ if not loop:
+ loop = self.loop
+ self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
await self.consumer.start()
await self.consumer.start()
- self.consumer.subscribe(
topic
)
+ self.consumer.subscribe(
[topic]
)
try:
async for message in self.consumer:
return yaml.load(message.key), yaml.load(message.value)
try:
async for message in self.consumer:
return yaml.load(message.key), yaml.load(message.value)