projects
/
osm
/
common.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
msgkafka, provide loop on config
[osm/common.git]
/
osm_common
/
msglocal.py
diff --git
a/osm_common/msglocal.py
b/osm_common/msglocal.py
index
1e8e089
..
eba4b97
100644
(file)
--- a/
osm_common/msglocal.py
+++ b/
osm_common/msglocal.py
@@
-42,6
+42,7
@@
class MsgLocal(MsgBase):
self.files_read = {}
self.files_write = {}
self.buffer = {}
self.files_read = {}
self.files_write = {}
self.buffer = {}
+ self.loop = None
def connect(self, config):
try:
def connect(self, config):
try:
@@
-52,6
+53,8
@@
class MsgLocal(MsgBase):
self.path += "/"
if not os.path.exists(self.path):
os.mkdir(self.path)
self.path += "/"
if not os.path.exists(self.path):
os.mkdir(self.path)
+ self.loop = config.get("loop")
+
except MsgException:
raise
except Exception as e: # TODO refine
except MsgException:
raise
except Exception as e: # TODO refine
@@
-127,6
+130,7
@@
class MsgLocal(MsgBase):
:param loop: asyncio loop
:return: topic, key, message
"""
:param loop: asyncio loop
:return: topic, key, message
"""
+ _loop = loop or self.loop
try:
while True:
msg = self.read(topic, blocks=False)
try:
while True:
msg = self.read(topic, blocks=False)
@@
-137,7
+141,7
@@
class MsgLocal(MsgBase):
await aiocallback(*msg, **kwargs)
else:
return msg
await aiocallback(*msg, **kwargs)
else:
return msg
- await asyncio.sleep(2, loop=loop)
+ await asyncio.sleep(2, loop=
_
loop)
except MsgException:
raise
except Exception as e: # TODO refine
except MsgException:
raise
except Exception as e: # TODO refine