Initial commit for NBI
[osm/NBI.git] / osm_nbi / msglocal.py
diff --git a/osm_nbi/msglocal.py b/osm_nbi/msglocal.py
new file mode 100644 (file)
index 0000000..a19c5c3
--- /dev/null
@@ -0,0 +1,95 @@
+import logging
+import os
+import yaml
+import asyncio
+from msgbase import MsgBase, MsgException
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class MsgLocal(MsgBase):
+
+    def __init__(self, logger_name='msg'):
+        self.logger = logging.getLogger(logger_name)
+        self.path = None
+        # create a different file for each topic
+        self.files = {}
+
+    def connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.path = config["path"]
+            if not self.path.endswith("/"):
+                self.path += "/"
+            if not os.path.exists(self.path):
+                os.mkdir(self.path)
+        except MsgException:
+            raise
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def disconnect(self):
+        for f in self.files.values():
+            try:
+                f.close()
+            except Exception as e:  # TODO refine
+                pass
+
+    def write(self, topic, key, msg):
+        """
+        Insert a message into topic
+        :param topic: topic
+        :param key: key text to be inserted
+        :param msg: value object to be inserted
+        :return: None or raises and exception
+        """
+        try:
+            if topic not in self.files:
+                self.files[topic] = open(self.path + topic, "a+")
+            yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True)
+            self.files[topic].flush()
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def read(self, topic):
+        try:
+            msg = ""
+            if topic not in self.files:
+                self.files[topic] = open(self.path + topic, "a+")
+                # ignore previous content
+                for line in self.files[topic]:
+                    if not line.endswith("\n"):
+                        msg = line
+            msg += self.files[topic].readline()
+            if not msg.endswith("\n"):
+                return None
+            msg_dict = yaml.load(msg)
+            assert len(msg_dict) == 1
+            for k, v in msg_dict.items():
+                return k, v
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    async def aioread(self, topic, loop=None):
+        try:
+            msg = ""
+            if not loop:
+                loop = asyncio.get_event_loop()
+            if topic not in self.files:
+                self.files[topic] = open(self.path + topic, "a+")
+                # ignore previous content
+                for line in self.files[topic]:
+                    if not line.endswith("\n"):
+                        msg = line
+            while True:
+                msg += self.files[topic].readline()
+                if msg.endswith("\n"):
+                    break
+                await asyncio.sleep(2, loop=loop)
+            msg_dict = yaml.load(msg)
+            assert len(msg_dict) == 1
+            for k, v in msg_dict.items():
+                return k, v
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))