projects
/
osm
/
common.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Increase Logging, fix directory delete
[osm/common.git]
/
osm_common
/
msglocal.py
diff --git
a/osm_common/msglocal.py
b/osm_common/msglocal.py
index
965cb26
..
c10ff17
100644
(file)
--- a/
osm_common/msglocal.py
+++ b/
osm_common/msglocal.py
@@
-34,8
+34,7
@@
One text line per message is used in yaml format.
class MsgLocal(MsgBase):
class MsgLocal(MsgBase):
-
- def __init__(self, logger_name='msg', lock=False):
+ def __init__(self, logger_name="msg", lock=False):
super().__init__(logger_name, lock)
self.path = None
# create a different file for each topic
super().__init__(logger_name, lock)
self.path = None
# create a different file for each topic
@@
-86,7
+85,12
@@
class MsgLocal(MsgBase):
with self.lock:
if topic not in self.files_write:
self.files_write[topic] = open(self.path + topic, "a+")
with self.lock:
if topic not in self.files_write:
self.files_write[topic] = open(self.path + topic, "a+")
- yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
+ yaml.safe_dump(
+ {key: msg},
+ self.files_write[topic],
+ default_flow_style=True,
+ width=20000,
+ )
self.files_write[topic].flush()
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
self.files_write[topic].flush()
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
@@
-102,17
+106,21
@@
class MsgLocal(MsgBase):
if isinstance(topic, (list, tuple)):
topic_list = topic
else:
if isinstance(topic, (list, tuple)):
topic_list = topic
else:
- topic_list = (topic,
)
+ topic_list = (topic,)
while True:
for single_topic in topic_list:
with self.lock:
if single_topic not in self.files_read:
while True:
for single_topic in topic_list:
with self.lock:
if single_topic not in self.files_read:
- self.files_read[single_topic] = open(self.path + single_topic, "a+")
+ self.files_read[single_topic] = open(
+ self.path + single_topic, "a+"
+ )
self.buffer[single_topic] = ""
self.buffer[single_topic] = ""
- self.buffer[single_topic] += self.files_read[single_topic].readline()
+ self.buffer[single_topic] += self.files_read[
+ single_topic
+ ].readline()
if not self.buffer[single_topic].endswith("\n"):
continue
if not self.buffer[single_topic].endswith("\n"):
continue
- msg_dict = yaml.load(self.buffer[single_topic])
+ msg_dict = yaml.
safe_
load(self.buffer[single_topic])
self.buffer[single_topic] = ""
assert len(msg_dict) == 1
for k, v in msg_dict.items():
self.buffer[single_topic] = ""
assert len(msg_dict) == 1
for k, v in msg_dict.items():
@@
-123,7
+131,9
@@
class MsgLocal(MsgBase):
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
- async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs):
+ async def aioread(
+ self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
+ ):
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics