Change default error code from 500 to 503 in case message is not ready
[osm/common.git] / osm_common / msglocal.py
index c774f85..75fc717 100644 (file)
@@ -2,25 +2,27 @@ import logging
 import os
 import yaml
 import asyncio
-from msgbase import MsgBase, MsgException
+from osm_common.msgbase import MsgBase, MsgException
 from time import sleep
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
 """
 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
-One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer 
+One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
 access to the same file. e.g. same volume if running with docker.
 One text line per message is used in yaml format.
 """
 
+
 class MsgLocal(MsgBase):
 
     def __init__(self, logger_name='msg'):
         self.logger = logging.getLogger(logger_name)
         self.path = None
         # create a different file for each topic
-        self.files = {}
+        self.files_read = {}
+        self.files_write = {}
         self.buffer = {}
 
     def connect(self, config):
@@ -38,10 +40,15 @@ class MsgLocal(MsgBase):
             raise MsgException(str(e))
 
     def disconnect(self):
-        for f in self.files.values():
+        for f in self.files_read.values():
+            try:
+                f.close()
+            except Exception:  # TODO refine
+                pass
+        for f in self.files_write.values():
             try:
                 f.close()
-            except Exception as e:  # TODO refine
+            except Exception:  # TODO refine
                 pass
 
     def write(self, topic, key, msg):
@@ -53,10 +60,10 @@ class MsgLocal(MsgBase):
         :return: None or raises and exception
         """
         try:
-            if topic not in self.files:
-                self.files[topic] = open(self.path + topic, "a+")
-            yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
-            self.files[topic].flush()
+            if topic not in self.files_write:
+                self.files_write[topic] = open(self.path + topic, "a+")
+            yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
+            self.files_write[topic].flush()
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
@@ -74,10 +81,10 @@ class MsgLocal(MsgBase):
                 topic_list = (topic, )
             while True:
                 for single_topic in topic_list:
-                    if single_topic not in self.files:
-                        self.files[single_topic] = open(self.path + single_topic, "a+")
+                    if single_topic not in self.files_read:
+                        self.files_read[single_topic] = open(self.path + single_topic, "a+")
                         self.buffer[single_topic] = ""
-                    self.buffer[single_topic] += self.files[single_topic].readline()
+                    self.buffer[single_topic] += self.files_read[single_topic].readline()
                     if not self.buffer[single_topic].endswith("\n"):
                         continue
                     msg_dict = yaml.load(self.buffer[single_topic])
@@ -109,3 +116,13 @@ class MsgLocal(MsgBase):
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
+    async def aiowrite(self, topic, key, msg, loop=None):
+        """
+        Asyncio write. It blocks
+        :param topic: str
+        :param key: str
+        :param msg: message, can be str or yaml
+        :param loop: asyncio loop
+        :return: nothing if ok or raises an exception
+        """
+        return self.write(topic, key, msg)