sol005 packages upload implementation
[osm/NBI.git] / osm_nbi / msglocal.py
1 import logging
2 import os
3 import yaml
4 import asyncio
5 from msgbase import MsgBase, MsgException
6
7 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
8
9
10 class MsgLocal(MsgBase):
11
12 def __init__(self, logger_name='msg'):
13 self.logger = logging.getLogger(logger_name)
14 self.path = None
15 # create a different file for each topic
16 self.files = {}
17
18 def connect(self, config):
19 try:
20 if "logger_name" in config:
21 self.logger = logging.getLogger(config["logger_name"])
22 self.path = config["path"]
23 if not self.path.endswith("/"):
24 self.path += "/"
25 if not os.path.exists(self.path):
26 os.mkdir(self.path)
27 except MsgException:
28 raise
29 except Exception as e: # TODO refine
30 raise MsgException(str(e))
31
32 def disconnect(self):
33 for f in self.files.values():
34 try:
35 f.close()
36 except Exception as e: # TODO refine
37 pass
38
39 def write(self, topic, key, msg):
40 """
41 Insert a message into topic
42 :param topic: topic
43 :param key: key text to be inserted
44 :param msg: value object to be inserted
45 :return: None or raises and exception
46 """
47 try:
48 if topic not in self.files:
49 self.files[topic] = open(self.path + topic, "a+")
50 yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True)
51 self.files[topic].flush()
52 except Exception as e: # TODO refine
53 raise MsgException(str(e))
54
55 def read(self, topic):
56 try:
57 msg = ""
58 if topic not in self.files:
59 self.files[topic] = open(self.path + topic, "a+")
60 # ignore previous content
61 for line in self.files[topic]:
62 if not line.endswith("\n"):
63 msg = line
64 msg += self.files[topic].readline()
65 if not msg.endswith("\n"):
66 return None
67 msg_dict = yaml.load(msg)
68 assert len(msg_dict) == 1
69 for k, v in msg_dict.items():
70 return k, v
71 except Exception as e: # TODO refine
72 raise MsgException(str(e))
73
74 async def aioread(self, topic, loop=None):
75 try:
76 msg = ""
77 if not loop:
78 loop = asyncio.get_event_loop()
79 if topic not in self.files:
80 self.files[topic] = open(self.path + topic, "a+")
81 # ignore previous content
82 for line in self.files[topic]:
83 if not line.endswith("\n"):
84 msg = line
85 while True:
86 msg += self.files[topic].readline()
87 if msg.endswith("\n"):
88 break
89 await asyncio.sleep(2, loop=loop)
90 msg_dict = yaml.load(msg)
91 assert len(msg_dict) == 1
92 for k, v in msg_dict.items():
93 return k, v
94 except Exception as e: # TODO refine
95 raise MsgException(str(e))