X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fcommon.git;a=blobdiff_plain;f=osm_common%2Fmsglocal.py;fp=osm_common%2Fmsglocal.py;h=0c3b216e5349ac7bf4cb5a914adf749c8322e27a;hp=6d4cb58fed1200c4ea78fdf640358be181d62a55;hb=a06b854f2b278aaee015fc1f76015895f8cf50c1;hpb=b2d732a70efa33e4bc478d351d64bc4adb4ea332 diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py index 6d4cb58..0c3b216 100644 --- a/osm_common/msglocal.py +++ b/osm_common/msglocal.py @@ -41,7 +41,6 @@ class MsgLocal(MsgBase): self.files_read = {} self.files_write = {} self.buffer = {} - self.loop = None def connect(self, config): try: @@ -52,7 +51,6 @@ class MsgLocal(MsgBase): self.path += "/" if not os.path.exists(self.path): os.mkdir(self.path) - self.loop = config.get("loop") except MsgException: raise @@ -158,12 +156,11 @@ class MsgLocal(MsgBase): raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) async def aioread( - self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs + self, topic, callback=None, aiocallback=None, group_id=None, **kwargs ): """ Asyncio read from one or several topics. It blocks :param topic: can be str: single topic; or str list: several topics - :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect :param callback: synchronous callback function that will handle the message :param aiocallback: async callback function that will handle the message :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general @@ -171,7 +168,6 @@ class MsgLocal(MsgBase): :param kwargs: optional keyword arguments for callback function :return: If no callback defined, it returns (topic, key, message) """ - _loop = loop or self.loop try: while True: msg = self.read(topic, blocks=False) @@ -182,19 +178,18 @@ class MsgLocal(MsgBase): await aiocallback(*msg, **kwargs) else: return msg - await asyncio.sleep(2, loop=_loop) + await asyncio.sleep(2) except MsgException: raise except Exception as e: # TODO refine raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) - async def aiowrite(self, topic, key, msg, loop=None): + async def aiowrite(self, topic, key, msg): """ Asyncio write. It blocks :param topic: str :param key: str :param msg: message, can be str or yaml - :param loop: asyncio loop :return: nothing if ok or raises an exception """ return self.write(topic, key, msg)