a380e618da6565f5844a5020acc2080120cb025a
[osm/RO.git] / lcm / osm_common / msglocal.py
1 import os
2 import yaml
3 import asyncio
4 from msgbase import MsgBase, MsgException
5
6
7 class msgLocal(MsgBase):
8
9 def __init__(self):
10 self.path = None
11 # create a different file for each topic
12 self.files = {}
13
14 def connect(self, config):
15 try:
16 self.path = config["path"]
17 if not self.path.endswith("/"):
18 self.path += "/"
19 if not os.path.exists(self.path):
20 os.mkdir(self.path)
21 except MsgException:
22 raise
23 except Exception as e: # TODO refine
24 raise MsgException(str(e))
25
26 def disconnect(self):
27 for f in self.files.values():
28 try:
29 f.close()
30 except Exception as e: # TODO refine
31 pass
32
33 def write(self, topic, key, msg):
34 """
35 Insert a message into topic
36 :param topic: topic
37 :param key: key text to be inserted
38 :param msg: value object to be inserted
39 :return: None or raises and exception
40 """
41 try:
42 if topic not in self.files:
43 self.files[topic] = open(self.path + topic, "w+")
44 yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True)
45 self.files[topic].flush()
46 except Exception as e: # TODO refine
47 raise MsgException(str(e))
48
49 def read(self, topic):
50 try:
51 if topic not in self.files:
52 self.files[topic] = open(self.path + topic, "r+")
53 msg = self.files[topic].read()
54 msg_dict = yaml.load(msg)
55 assert len(msg_dict) == 1
56 for k, v in msg_dict.items():
57 return k, v
58 except Exception as e: # TODO refine
59 raise MsgException(str(e))
60
61 async def aioread(self, topic, loop=None):
62 try:
63 if not loop:
64 loop = asyncio.get_event_loop()
65 if topic not in self.files:
66 self.files[topic] = open(self.path + topic, "r+")
67 # ignore previous content
68 while self.files[topic].read():
69 pass
70 while True:
71 msg = self.files[topic].read()
72 if msg:
73 break
74 await asyncio.sleep(2, loop=loop)
75 msg_dict = yaml.load(msg)
76 assert len(msg_dict) == 1
77 for k, v in msg_dict.items():
78 return k, v
79 except Exception as e: # TODO refine
80 raise MsgException(str(e))