X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fmsglocal.py;h=c10ff17b0808c6e649755f3e7886b33db8d4ecf3;hb=HEAD;hp=b0abb895ce623291af5ff4ab4e13cbbb413d3bd8;hpb=145218304679a552a3d34c9b5d0cb7334c6a9586;p=osm%2Fcommon.git diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py index b0abb89..0c3b216 100644 --- a/osm_common/msglocal.py +++ b/osm_common/msglocal.py @@ -15,16 +15,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio +from http import HTTPStatus import logging import os -import yaml -import asyncio -from osm_common.msgbase import MsgBase, MsgException from time import sleep -from http import HTTPStatus -__author__ = "Alfonso Tierno " +from osm_common.msgbase import MsgBase, MsgException +import yaml +__author__ = "Alfonso Tierno " """ This emulated kafka bus by just using a shared file system. Useful for testing or devops. One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer @@ -34,9 +34,8 @@ One text line per message is used in yaml format. class MsgLocal(MsgBase): - - def __init__(self, logger_name='msg'): - self.logger = logging.getLogger(logger_name) + def __init__(self, logger_name="msg", lock=False): + super().__init__(logger_name, lock) self.path = None # create a different file for each topic self.files_read = {} @@ -52,22 +51,48 @@ class MsgLocal(MsgBase): self.path += "/" if not os.path.exists(self.path): os.mkdir(self.path) + except MsgException: raise except Exception as e: # TODO refine raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR) def disconnect(self): - for f in self.files_read.values(): + for topic, f in self.files_read.items(): try: f.close() - except Exception: # TODO refine - pass - for f in self.files_write.values(): + self.files_read[topic] = None + except Exception as read_topic_error: + if isinstance(read_topic_error, (IOError, FileNotFoundError)): + self.logger.exception( + f"{read_topic_error} occured while closing read topic files." + ) + elif isinstance(read_topic_error, KeyError): + self.logger.exception( + f"{read_topic_error} occured while reading from files_read dictionary." + ) + else: + self.logger.exception( + f"{read_topic_error} occured while closing read topics." + ) + + for topic, f in self.files_write.items(): try: f.close() - except Exception: # TODO refine - pass + self.files_write[topic] = None + except Exception as write_topic_error: + if isinstance(write_topic_error, (IOError, FileNotFoundError)): + self.logger.exception( + f"{write_topic_error} occured while closing write topic files." + ) + elif isinstance(write_topic_error, KeyError): + self.logger.exception( + f"{write_topic_error} occured while reading from files_write dictionary." + ) + else: + self.logger.exception( + f"{write_topic_error} occured while closing write topics." + ) def write(self, topic, key, msg): """ @@ -78,10 +103,16 @@ class MsgLocal(MsgBase): :return: None or raises and exception """ try: - if topic not in self.files_write: - self.files_write[topic] = open(self.path + topic, "a+") - yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000) - self.files_write[topic].flush() + with self.lock: + if topic not in self.files_write: + self.files_write[topic] = open(self.path + topic, "a+") + yaml.safe_dump( + {key: msg}, + self.files_write[topic], + default_flow_style=True, + width=20000, + ) + self.files_write[topic].flush() except Exception as e: # TODO refine raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) @@ -96,32 +127,46 @@ class MsgLocal(MsgBase): if isinstance(topic, (list, tuple)): topic_list = topic else: - topic_list = (topic, ) + topic_list = (topic,) while True: for single_topic in topic_list: - if single_topic not in self.files_read: - self.files_read[single_topic] = open(self.path + single_topic, "a+") + with self.lock: + if single_topic not in self.files_read: + self.files_read[single_topic] = open( + self.path + single_topic, "a+" + ) + self.buffer[single_topic] = "" + self.buffer[single_topic] += self.files_read[ + single_topic + ].readline() + if not self.buffer[single_topic].endswith("\n"): + continue + msg_dict = yaml.safe_load(self.buffer[single_topic]) self.buffer[single_topic] = "" - self.buffer[single_topic] += self.files_read[single_topic].readline() - if not self.buffer[single_topic].endswith("\n"): - continue - msg_dict = yaml.load(self.buffer[single_topic]) - self.buffer[single_topic] = "" - assert len(msg_dict) == 1 - for k, v in msg_dict.items(): - return single_topic, k, v + if len(msg_dict) != 1: + raise ValueError( + "Length of message dictionary is not equal to 1" + ) + for k, v in msg_dict.items(): + return single_topic, k, v if not blocks: return None sleep(2) except Exception as e: # TODO refine raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) - async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs): + async def aioread( + self, topic, callback=None, aiocallback=None, group_id=None, **kwargs + ): """ Asyncio read from one or several topics. It blocks :param topic: can be str: single topic; or str list: several topics - :param loop: asyncio loop - :return: topic, key, message + :param callback: synchronous callback function that will handle the message + :param aiocallback: async callback function that will handle the message + :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general + group_id provided at connect inside config), or a group_id string + :param kwargs: optional keyword arguments for callback function + :return: If no callback defined, it returns (topic, key, message) """ try: while True: @@ -133,19 +178,18 @@ class MsgLocal(MsgBase): await aiocallback(*msg, **kwargs) else: return msg - await asyncio.sleep(2, loop=loop) + await asyncio.sleep(2) except MsgException: raise except Exception as e: # TODO refine raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) - async def aiowrite(self, topic, key, msg, loop=None): + async def aiowrite(self, topic, key, msg): """ Asyncio write. It blocks :param topic: str :param key: str :param msg: message, can be str or yaml - :param loop: asyncio loop :return: nothing if ok or raises an exception """ return self.write(topic, key, msg)