a380e618da6565f5844a5020acc2080120cb025a
4 from msgbase
import MsgBase
, MsgException
7 class msgLocal(MsgBase
):
11 # create a different file for each topic
14 def connect(self
, config
):
16 self
.path
= config
["path"]
17 if not self
.path
.endswith("/"):
19 if not os
.path
.exists(self
.path
):
23 except Exception as e
: # TODO refine
24 raise MsgException(str(e
))
27 for f
in self
.files
.values():
30 except Exception as e
: # TODO refine
33 def write(self
, topic
, key
, msg
):
35 Insert a message into topic
37 :param key: key text to be inserted
38 :param msg: value object to be inserted
39 :return: None or raises and exception
42 if topic
not in self
.files
:
43 self
.files
[topic
] = open(self
.path
+ topic
, "w+")
44 yaml
.safe_dump({key
: msg
}, self
.files
[topic
], default_flow_style
=True)
45 self
.files
[topic
].flush()
46 except Exception as e
: # TODO refine
47 raise MsgException(str(e
))
49 def read(self
, topic
):
51 if topic
not in self
.files
:
52 self
.files
[topic
] = open(self
.path
+ topic
, "r+")
53 msg
= self
.files
[topic
].read()
54 msg_dict
= yaml
.load(msg
)
55 assert len(msg_dict
) == 1
56 for k
, v
in msg_dict
.items():
58 except Exception as e
: # TODO refine
59 raise MsgException(str(e
))
61 async def aioread(self
, topic
, loop
=None):
64 loop
= asyncio
.get_event_loop()
65 if topic
not in self
.files
:
66 self
.files
[topic
] = open(self
.path
+ topic
, "r+")
67 # ignore previous content
68 while self
.files
[topic
].read():
71 msg
= self
.files
[topic
].read()
74 await asyncio
.sleep(2, loop
=loop
)
75 msg_dict
= yaml
.load(msg
)
76 assert len(msg_dict
) == 1
77 for k
, v
in msg_dict
.items():
79 except Exception as e
: # TODO refine
80 raise MsgException(str(e
))