X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fmsglocal.py;h=843b3766a9dacf475294f11757b094282a681ea9;hb=b95cca6f0bd7e6c20e5ec945def54565de6ac02f;hp=1e8e089b22913215ce7835cdab383ccd2a3dce51;hpb=1e9a329ca0085be33665e35d123394905bc46d74;p=osm%2Fcommon.git diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py index 1e8e089..843b376 100644 --- a/osm_common/msglocal.py +++ b/osm_common/msglocal.py @@ -42,6 +42,7 @@ class MsgLocal(MsgBase): self.files_read = {} self.files_write = {} self.buffer = {} + self.loop = None def connect(self, config): try: @@ -52,6 +53,8 @@ class MsgLocal(MsgBase): self.path += "/" if not os.path.exists(self.path): os.mkdir(self.path) + self.loop = config.get("loop") + except MsgException: raise except Exception as e: # TODO refine @@ -109,7 +112,7 @@ class MsgLocal(MsgBase): self.buffer[single_topic] += self.files_read[single_topic].readline() if not self.buffer[single_topic].endswith("\n"): continue - msg_dict = yaml.load(self.buffer[single_topic]) + msg_dict = yaml.safe_load(self.buffer[single_topic]) self.buffer[single_topic] = "" assert len(msg_dict) == 1 for k, v in msg_dict.items(): @@ -120,13 +123,19 @@ class MsgLocal(MsgBase): except Exception as e: # TODO refine raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) - async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs): + async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs): """ Asyncio read from one or several topics. It blocks :param topic: can be str: single topic; or str list: several topics - :param loop: asyncio loop - :return: topic, key, message + :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect + :param callback: synchronous callback function that will handle the message + :param aiocallback: async callback function that will handle the message + :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general + group_id provided at connect inside config), or a group_id string + :param kwargs: optional keyword arguments for callback function + :return: If no callback defined, it returns (topic, key, message) """ + _loop = loop or self.loop try: while True: msg = self.read(topic, blocks=False) @@ -137,7 +146,7 @@ class MsgLocal(MsgBase): await aiocallback(*msg, **kwargs) else: return msg - await asyncio.sleep(2, loop=loop) + await asyncio.sleep(2, loop=_loop) except MsgException: raise except Exception as e: # TODO refine