X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fmsglocal.py;h=c10ff17b0808c6e649755f3e7886b33db8d4ecf3;hb=3d82ba2cc4c3ebb340a88400aaa4a8d3683806a1;hp=eba4b97f7928a52773997a6b9f6a75810cbdc8b2;hpb=05ede8f86c71ae21a331961b7795c0bd21ef09df;p=osm%2Fcommon.git diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py index eba4b97..c10ff17 100644 --- a/osm_common/msglocal.py +++ b/osm_common/msglocal.py @@ -34,8 +34,7 @@ One text line per message is used in yaml format. class MsgLocal(MsgBase): - - def __init__(self, logger_name='msg', lock=False): + def __init__(self, logger_name="msg", lock=False): super().__init__(logger_name, lock) self.path = None # create a different file for each topic @@ -86,7 +85,12 @@ class MsgLocal(MsgBase): with self.lock: if topic not in self.files_write: self.files_write[topic] = open(self.path + topic, "a+") - yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000) + yaml.safe_dump( + {key: msg}, + self.files_write[topic], + default_flow_style=True, + width=20000, + ) self.files_write[topic].flush() except Exception as e: # TODO refine raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) @@ -102,17 +106,21 @@ class MsgLocal(MsgBase): if isinstance(topic, (list, tuple)): topic_list = topic else: - topic_list = (topic, ) + topic_list = (topic,) while True: for single_topic in topic_list: with self.lock: if single_topic not in self.files_read: - self.files_read[single_topic] = open(self.path + single_topic, "a+") + self.files_read[single_topic] = open( + self.path + single_topic, "a+" + ) self.buffer[single_topic] = "" - self.buffer[single_topic] += self.files_read[single_topic].readline() + self.buffer[single_topic] += self.files_read[ + single_topic + ].readline() if not self.buffer[single_topic].endswith("\n"): continue - msg_dict = yaml.load(self.buffer[single_topic]) + msg_dict = yaml.safe_load(self.buffer[single_topic]) self.buffer[single_topic] = "" assert len(msg_dict) == 1 for k, v in msg_dict.items(): @@ -123,12 +131,19 @@ class MsgLocal(MsgBase): except Exception as e: # TODO refine raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) - async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs): + async def aioread( + self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs + ): """ Asyncio read from one or several topics. It blocks :param topic: can be str: single topic; or str list: several topics - :param loop: asyncio loop - :return: topic, key, message + :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect + :param callback: synchronous callback function that will handle the message + :param aiocallback: async callback function that will handle the message + :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general + group_id provided at connect inside config), or a group_id string + :param kwargs: optional keyword arguments for callback function + :return: If no callback defined, it returns (topic, key, message) """ _loop = loop or self.loop try: