Make common methods threading safe. pytest enhancements
[osm/common.git] / osm_common / msgkafka.py
index c819c81..b782685 100644 (file)
@@ -1,23 +1,40 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 import logging
 import asyncio
 import yaml
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 import logging
 import asyncio
 import yaml
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
-from msgbase import MsgBase, MsgException
-#import json
+from osm_common.msgbase import MsgBase, MsgException
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
              "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
              "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+
+
 class MsgKafka(MsgBase):
 class MsgKafka(MsgBase):
-    def __init__(self, logger_name='msg'):
-        self.logger = logging.getLogger(logger_name)
+    def __init__(self, logger_name='msg', lock=False):
+        super().__init__(logger_name, lock)
         self.host = None
         self.port = None
         self.consumer = None
         self.producer = None
         self.loop = None
         self.broker = None
         self.host = None
         self.port = None
         self.consumer = None
         self.producer = None
         self.loop = None
         self.broker = None
+        self.group_id = None
 
     def connect(self, config):
         try:
 
     def connect(self, config):
         try:
@@ -27,28 +44,35 @@ class MsgKafka(MsgBase):
             self.port = config["port"]
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
             self.port = config["port"]
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
+            self.group_id = config.get("group_id")
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
     def disconnect(self):
         try:
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
     def disconnect(self):
         try:
-            self.loop.close()
+            pass
+            # self.loop.close()
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
     def write(self, topic, key, msg):
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
     def write(self, topic, key, msg):
+        """
+        Write a message at kafka bus
+        :param topic: message topic, must be string
+        :param key: message key, must be string
+        :param msg: message content, can be string or dictionary
+        :return: None or raises MsgException on failing
+        """
         try:
         try:
-            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
-                                                       msg=yaml.safe_dump(msg, default_flow_style=True),
-                                                       loop=self.loop))
+            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
         """
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
         """
-        Read from one or several topics. it is non blocking returning None if nothing is available
+        Read from one or several topics.
         :param topic: can be str: single topic; or str list: several topics
         :return: topic, key, message; or None
         """
         :param topic: can be str: single topic; or str list: several topics
         :return: topic, key, message; or None
         """
@@ -67,20 +91,21 @@ class MsgKafka(MsgBase):
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
-            await self.producer.send(topic=topic, key=key, value=msg)
+            await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
         except Exception as e:
         except Exception as e:
-            raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+            raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
         finally:
             await self.producer.stop()
 
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic, loop=None, callback=None, *args):
+    async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
         """
         """
-        Asyncio read from one or several topics. It blocks
+        Asyncio read from one or several topics. It blocks.
         :param topic: can be str: single topic; or str list: several topics
         :param loop: asyncio loop
         :param topic: can be str: single topic; or str list: several topics
         :param loop: asyncio loop
-        :callback: callback function that will handle the message in kafka bus
-        :*args: optional arguments for callback function
-        :return: topic, key, message
+        :param callback: synchronous callback function that will handle the message in kafka bus
+        :param aiocallback: async callback function that will handle the message in kafka bus
+        :param kwargs: optional keyword arguments for callback function
+        :return: If no callback defined, it returns (topic, key, message)
         """
 
         if not loop:
         """
 
         if not loop:
@@ -91,17 +116,18 @@ class MsgKafka(MsgBase):
             else:
                 topic_list = (topic,)
 
             else:
                 topic_list = (topic,)
 
-            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
             await self.consumer.start()
             self.consumer.subscribe(topic_list)
 
             async for message in self.consumer:
                 if callback:
             await self.consumer.start()
             self.consumer.subscribe(topic_list)
 
             async for message in self.consumer:
                 if callback:
-                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+                elif aiocallback:
+                    await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
                 else:
                     return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
                 else:
                     return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
-