lightweight build structure
[osm/RO.git] / lcm / osm_common / msglocal.py
1 import os
2 import yaml
3 import asyncio
4 from msgbase import MsgBase, MsgException
5
6
7 class msgLocal(MsgBase):
8
9 def __init__(self):
10 self.path = None
11 # create a different file for each topic
12 self.files = {}
13
14 def connect(self, config):
15 try:
16 self.path = config["path"]
17 if not self.path.endswith("/"):
18 self.path += "/"
19 if not os.path.exists(self.path):
20 os.mkdir(self.path)
21 except MsgException:
22 raise
23 except Exception as e: # TODO refine
24 raise MsgException(str(e))
25
26 def disconnect(self):
27 for f in self.files.values():
28 try:
29 f.close()
30 except Exception as e: # TODO refine
31 pass
32
33 def write(self, topic, key, msg):
34 """
35 Insert a message into topic
36 :param topic: topic
37 :param key: key text to be inserted
38 :param msg: value object to be inserted
39 :return: None or raises and exception
40 """
41 try:
42 if topic not in self.files:
43 self.files[topic] = open(self.path + topic, "w+")
44 yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True)
45 self.files[topic].flush()
46 except Exception as e: # TODO refine
47 raise MsgException(str(e))
48
49 def read(self, topic):
50 try:
51 if topic not in self.files:
52 self.files[topic] = open(self.path + topic, "r+")
53 msg = self.files[topic].read()
54 msg_dict = yaml.load(msg)
55 assert len(msg_dict) == 1
56 for k, v in msg_dict.items():
57 return k, v
58 except Exception as e: # TODO refine
59 raise MsgException(str(e))
60
61 async def aioread(self, loop, topic):
62 try:
63 if topic not in self.files:
64 self.files[topic] = open(self.path + topic, "r+")
65 # ignore previous content
66 while self.files[topic].read():
67 pass
68 while True:
69 msg = self.files[topic].read()
70 if msg:
71 break
72 await asyncio.sleep(2, loop=loop)
73 msg_dict = yaml.load(msg)
74 assert len(msg_dict) == 1
75 for k, v in msg_dict.items():
76 return k, v
77 except Exception as e: # TODO refine
78 raise MsgException(str(e))