Adding release notes and enabling import order check
[osm/common.git] / osm_common / msglocal.py
index 247de7b..2f90307 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import asyncio
+from http import HTTPStatus
 import logging
 import os
 import logging
 import os
-import yaml
-import asyncio
-from osm_common.msgbase import MsgBase, MsgException
 from time import sleep
 from time import sleep
-from http import HTTPStatus
 
 
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+from osm_common.msgbase import MsgBase, MsgException
+import yaml
 
 
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 """
 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
 """
 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
@@ -34,14 +34,14 @@ One text line per message is used in yaml format.
 
 
 class MsgLocal(MsgBase):
 
 
 class MsgLocal(MsgBase):
-
-    def __init__(self, logger_name='msg'):
-        self.logger = logging.getLogger(logger_name)
+    def __init__(self, logger_name="msg", lock=False):
+        super().__init__(logger_name, lock)
         self.path = None
         # create a different file for each topic
         self.files_read = {}
         self.files_write = {}
         self.buffer = {}
         self.path = None
         # create a different file for each topic
         self.files_read = {}
         self.files_write = {}
         self.buffer = {}
+        self.loop = None
 
     def connect(self, config):
         try:
 
     def connect(self, config):
         try:
@@ -52,20 +52,24 @@ class MsgLocal(MsgBase):
                 self.path += "/"
             if not os.path.exists(self.path):
                 os.mkdir(self.path)
                 self.path += "/"
             if not os.path.exists(self.path):
                 os.mkdir(self.path)
+            self.loop = config.get("loop")
+
         except MsgException:
             raise
         except Exception as e:  # TODO refine
             raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
 
     def disconnect(self):
         except MsgException:
             raise
         except Exception as e:  # TODO refine
             raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
 
     def disconnect(self):
-        for f in self.files_read.values():
+        for topic, f in self.files_read.items():
             try:
                 f.close()
             try:
                 f.close()
+                self.files_read[topic] = None
             except Exception:  # TODO refine
                 pass
             except Exception:  # TODO refine
                 pass
-        for f in self.files_write.values():
+        for topic, f in self.files_write.items():
             try:
                 f.close()
             try:
                 f.close()
+                self.files_write[topic] = None
             except Exception:  # TODO refine
                 pass
 
             except Exception:  # TODO refine
                 pass
 
@@ -78,10 +82,16 @@ class MsgLocal(MsgBase):
         :return: None or raises and exception
         """
         try:
         :return: None or raises and exception
         """
         try:
-            if topic not in self.files_write:
-                self.files_write[topic] = open(self.path + topic, "a+")
-            yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
-            self.files_write[topic].flush()
+            with self.lock:
+                if topic not in self.files_write:
+                    self.files_write[topic] = open(self.path + topic, "a+")
+                yaml.safe_dump(
+                    {key: msg},
+                    self.files_write[topic],
+                    default_flow_style=True,
+                    width=20000,
+                )
+                self.files_write[topic].flush()
         except Exception as e:  # TODO refine
             raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
 
@@ -96,39 +106,57 @@ class MsgLocal(MsgBase):
             if isinstance(topic, (list, tuple)):
                 topic_list = topic
             else:
             if isinstance(topic, (list, tuple)):
                 topic_list = topic
             else:
-                topic_list = (topic, )
+                topic_list = (topic,)
             while True:
                 for single_topic in topic_list:
             while True:
                 for single_topic in topic_list:
-                    if single_topic not in self.files_read:
-                        self.files_read[single_topic] = open(self.path + single_topic, "a+")
+                    with self.lock:
+                        if single_topic not in self.files_read:
+                            self.files_read[single_topic] = open(
+                                self.path + single_topic, "a+"
+                            )
+                            self.buffer[single_topic] = ""
+                        self.buffer[single_topic] += self.files_read[
+                            single_topic
+                        ].readline()
+                        if not self.buffer[single_topic].endswith("\n"):
+                            continue
+                        msg_dict = yaml.safe_load(self.buffer[single_topic])
                         self.buffer[single_topic] = ""
                         self.buffer[single_topic] = ""
-                    self.buffer[single_topic] += self.files_read[single_topic].readline()
-                    if not self.buffer[single_topic].endswith("\n"):
-                        continue
-                    msg_dict = yaml.load(self.buffer[single_topic])
-                    self.buffer[single_topic] = ""
-                    assert len(msg_dict) == 1
-                    for k, v in msg_dict.items():
-                        return single_topic, k, v
+                        assert len(msg_dict) == 1
+                        for k, v in msg_dict.items():
+                            return single_topic, k, v
                 if not blocks:
                     return None
                 sleep(2)
         except Exception as e:  # TODO refine
             raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
 
                 if not blocks:
                     return None
                 sleep(2)
         except Exception as e:  # TODO refine
             raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
 
-    async def aioread(self, topic, loop):
+    async def aioread(
+        self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
+    ):
         """
         Asyncio read from one or several topics. It blocks
         :param topic: can be str: single topic; or str list: several topics
         """
         Asyncio read from one or several topics. It blocks
         :param topic: can be str: single topic; or str list: several topics
-        :param loop: asyncio loop
-        :return: topic, key, message
+        :param loop: asyncio loop. To be DEPRECATED! in near future!!!  loop must be provided inside config at connect
+        :param callback: synchronous callback function that will handle the message
+        :param aiocallback: async callback function that will handle the message
+        :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
+                         group_id provided at connect inside config), or a group_id string
+        :param kwargs: optional keyword arguments for callback function
+        :return: If no callback defined, it returns (topic, key, message)
         """
         """
+        _loop = loop or self.loop
         try:
             while True:
                 msg = self.read(topic, blocks=False)
                 if msg:
         try:
             while True:
                 msg = self.read(topic, blocks=False)
                 if msg:
-                    return msg
-                await asyncio.sleep(2, loop=loop)
+                    if callback:
+                        callback(*msg, **kwargs)
+                    elif aiocallback:
+                        await aiocallback(*msg, **kwargs)
+                    else:
+                        return msg
+                await asyncio.sleep(2, loop=_loop)
         except MsgException:
             raise
         except Exception as e:  # TODO refine
         except MsgException:
             raise
         except Exception as e:  # TODO refine