lightweight exception capturing, logging
[osm/RO.git] / lcm / osm_common / msgkafka.py
index 7b294fc..90c9c7f 100644 (file)
@@ -1,13 +1,16 @@
+import logging
+import asyncio
+import yaml
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from msgbase import MsgBase, MsgException
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from msgbase import MsgBase, MsgException
-import asyncio
-import yaml
 #import json
 
 #import json
 
-class msgKafka(MsgBase):
-    def __init__(self):
+
+class MsgKafka(MsgBase):
+    def __init__(self, logger_name='msg'):
+        self.logger = logging.getLogger(logger_name)
         self.host = None
         self.port = None
         self.consumer = None
         self.host = None
         self.port = None
         self.consumer = None
@@ -17,6 +20,8 @@ class msgKafka(MsgBase):
 
     def connect(self, config):
         try:
 
     def connect(self, config):
         try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
             self.host = config["host"]
             self.port = config["port"]
             self.topic_lst = []
             self.host = config["host"]
             self.port = config["port"]
             self.topic_lst = []
@@ -26,24 +31,25 @@ class msgKafka(MsgBase):
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
-    def write(self, topic, msg, key):
-
+    def write(self, topic, key, msg):
         try:
         try:
-            self.loop.run_until_complete(self.aiowrite(key, msg=yaml.safe_dump(msg, default_flow_style=True), topic=topic))
+            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True)))
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
-        self.topic_lst.append(topic)
+        #self.topic_lst.append(topic)
         try:
         try:
-            return self.loop.run_until_complete(self.aioread(self.topic_lst))
+            return self.loop.run_until_complete(self.aioread(topic))
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
-    async def aiowrite(self, key, msg, topic):
+    async def aiowrite(self, topic, key, msg, loop=None):
         try:
         try:
-            self.producer = AIOKafkaProducer(loop=self.loop, key_serializer=str.encode, value_serializer=str.encode,
+            if not loop:
+                loop = self.loop
+            self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
             await self.producer.send(topic=topic, key=key, value=msg)
                                              bootstrap_servers=self.broker)
             await self.producer.start()
             await self.producer.send(topic=topic, key=key, value=msg)
@@ -52,10 +58,12 @@ class msgKafka(MsgBase):
         finally:
             await self.producer.stop()
 
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic):
-        self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker)
+    async def aioread(self, topic, loop=None):
+        if not loop:
+            loop = self.loop
+        self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
         await self.consumer.start()
         await self.consumer.start()
-        self.consumer.subscribe(topic)
+        self.consumer.subscribe([topic])
         try:
             async for message in self.consumer:
                 return yaml.load(message.key), yaml.load(message.value)
         try:
             async for message in self.consumer:
                 return yaml.load(message.key), yaml.load(message.value)