projects
/
osm
/
common.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Jenkins refresh
[osm/common.git]
/
osm_common
/
msglocal.py
diff --git
a/osm_common/msglocal.py
b/osm_common/msglocal.py
index
843b376
..
2f90307
100644
(file)
--- a/
osm_common/msglocal.py
+++ b/
osm_common/msglocal.py
@@
-15,16
+15,16
@@
# See the License for the specific language governing permissions and
# limitations under the License.
# See the License for the specific language governing permissions and
# limitations under the License.
+import asyncio
+from http import HTTPStatus
import logging
import os
import logging
import os
-import yaml
-import asyncio
-from osm_common.msgbase import MsgBase, MsgException
from time import sleep
from time import sleep
-from http import HTTPStatus
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+from osm_common.msgbase import MsgBase, MsgException
+import yaml
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"""
This emulated kafka bus by just using a shared file system. Useful for testing or devops.
One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
"""
This emulated kafka bus by just using a shared file system. Useful for testing or devops.
One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
@@
-34,8
+34,7
@@
One text line per message is used in yaml format.
class MsgLocal(MsgBase):
class MsgLocal(MsgBase):
-
- def __init__(self, logger_name='msg', lock=False):
+ def __init__(self, logger_name="msg", lock=False):
super().__init__(logger_name, lock)
self.path = None
# create a different file for each topic
super().__init__(logger_name, lock)
self.path = None
# create a different file for each topic
@@
-86,7
+85,12
@@
class MsgLocal(MsgBase):
with self.lock:
if topic not in self.files_write:
self.files_write[topic] = open(self.path + topic, "a+")
with self.lock:
if topic not in self.files_write:
self.files_write[topic] = open(self.path + topic, "a+")
- yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
+ yaml.safe_dump(
+ {key: msg},
+ self.files_write[topic],
+ default_flow_style=True,
+ width=20000,
+ )
self.files_write[topic].flush()
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
self.files_write[topic].flush()
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
@@
-102,14
+106,18
@@
class MsgLocal(MsgBase):
if isinstance(topic, (list, tuple)):
topic_list = topic
else:
if isinstance(topic, (list, tuple)):
topic_list = topic
else:
- topic_list = (topic,
)
+ topic_list = (topic,)
while True:
for single_topic in topic_list:
with self.lock:
if single_topic not in self.files_read:
while True:
for single_topic in topic_list:
with self.lock:
if single_topic not in self.files_read:
- self.files_read[single_topic] = open(self.path + single_topic, "a+")
+ self.files_read[single_topic] = open(
+ self.path + single_topic, "a+"
+ )
self.buffer[single_topic] = ""
self.buffer[single_topic] = ""
- self.buffer[single_topic] += self.files_read[single_topic].readline()
+ self.buffer[single_topic] += self.files_read[
+ single_topic
+ ].readline()
if not self.buffer[single_topic].endswith("\n"):
continue
msg_dict = yaml.safe_load(self.buffer[single_topic])
if not self.buffer[single_topic].endswith("\n"):
continue
msg_dict = yaml.safe_load(self.buffer[single_topic])
@@
-123,7
+131,9
@@
class MsgLocal(MsgBase):
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
- async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs):
+ async def aioread(
+ self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
+ ):
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics