Reformat common to standardized format
[osm/common.git] / osm_common / msgkafka.py
index 4d02024..5caa5b1 100644 (file)
@@ -21,12 +21,14 @@ from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from osm_common.msgbase import MsgBase, MsgException
 
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
-             "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+__author__ = (
+    "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, "
+    "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+)
 
 
 class MsgKafka(MsgBase):
-    def __init__(self, logger_name='msg', lock=False):
+    def __init__(self, logger_name="msg", lock=False):
         super().__init__(logger_name, lock)
         self.host = None
         self.port = None
@@ -64,15 +66,19 @@ class MsgKafka(MsgBase):
         :param msg: message content, can be string or dictionary
         :return: None or raises MsgException on failing
         """
-        retry = 2   # Try two times
+        retry = 2  # Try two times
         while retry:
             try:
-                self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
+                self.loop.run_until_complete(
+                    self.aiowrite(topic=topic, key=key, msg=msg)
+                )
                 break
             except Exception as e:
                 retry -= 1
                 if retry == 0:
-                    raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
+                    raise MsgException(
+                        "Error writing {} topic: {}".format(topic, str(e))
+                    )
 
     def read(self, topic):
         """
@@ -100,17 +106,33 @@ class MsgKafka(MsgBase):
         if not loop:
             loop = self.loop
         try:
-            self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
-                                             bootstrap_servers=self.broker)
+            self.producer = AIOKafkaProducer(
+                loop=loop,
+                key_serializer=str.encode,
+                value_serializer=str.encode,
+                bootstrap_servers=self.broker,
+            )
             await self.producer.start()
-            await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
+            await self.producer.send(
+                topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True)
+            )
         except Exception as e:
-            raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
+            raise MsgException(
+                "Error publishing topic '{}', key '{}': {}".format(topic, key, e)
+            )
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, from_beginning=None,
-                      **kwargs):
+    async def aioread(
+        self,
+        topic,
+        loop=None,
+        callback=None,
+        aiocallback=None,
+        group_id=None,
+        from_beginning=None,
+        **kwargs
+    ):
         """
         Asyncio read from one or several topics.
         :param topic: can be str: single topic; or str list: several topics
@@ -137,19 +159,36 @@ class MsgKafka(MsgBase):
                 topic_list = topic
             else:
                 topic_list = (topic,)
-            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=group_id,
-                                             auto_offset_reset="earliest" if from_beginning else "latest")
+            self.consumer = AIOKafkaConsumer(
+                loop=loop,
+                bootstrap_servers=self.broker,
+                group_id=group_id,
+                auto_offset_reset="earliest" if from_beginning else "latest",
+            )
             await self.consumer.start()
             self.consumer.subscribe(topic_list)
 
             async for message in self.consumer:
                 if callback:
-                    callback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value), **kwargs)
+                    callback(
+                        message.topic,
+                        yaml.safe_load(message.key),
+                        yaml.safe_load(message.value),
+                        **kwargs
+                    )
                 elif aiocallback:
-                    await aiocallback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value),
-                                      **kwargs)
+                    await aiocallback(
+                        message.topic,
+                        yaml.safe_load(message.key),
+                        yaml.safe_load(message.value),
+                        **kwargs
+                    )
                 else:
-                    return message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value)
+                    return (
+                        message.topic,
+                        yaml.safe_load(message.key),
+                        yaml.safe_load(message.value),
+                    )
         except KafkaError as e:
             raise MsgException(str(e))
         finally: