X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fmsglocal.py;h=6d4cb58fed1200c4ea78fdf640358be181d62a55;hb=c04a88c04823353dedf4145c6ae359ad041d111c;hp=c10ff17b0808c6e649755f3e7886b33db8d4ecf3;hpb=2644b76248a1b96f7a47013b414e31b4e3feecf8;p=osm%2Fcommon.git diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py index c10ff17..6d4cb58 100644 --- a/osm_common/msglocal.py +++ b/osm_common/msglocal.py @@ -15,16 +15,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio +from http import HTTPStatus import logging import os -import yaml -import asyncio -from osm_common.msgbase import MsgBase, MsgException from time import sleep -from http import HTTPStatus -__author__ = "Alfonso Tierno " +from osm_common.msgbase import MsgBase, MsgException +import yaml +__author__ = "Alfonso Tierno " """ This emulated kafka bus by just using a shared file system. Useful for testing or devops. One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer @@ -64,14 +64,37 @@ class MsgLocal(MsgBase): try: f.close() self.files_read[topic] = None - except Exception: # TODO refine - pass + except Exception as read_topic_error: + if isinstance(read_topic_error, (IOError, FileNotFoundError)): + self.logger.exception( + f"{read_topic_error} occured while closing read topic files." + ) + elif isinstance(read_topic_error, KeyError): + self.logger.exception( + f"{read_topic_error} occured while reading from files_read dictionary." + ) + else: + self.logger.exception( + f"{read_topic_error} occured while closing read topics." + ) + for topic, f in self.files_write.items(): try: f.close() self.files_write[topic] = None - except Exception: # TODO refine - pass + except Exception as write_topic_error: + if isinstance(write_topic_error, (IOError, FileNotFoundError)): + self.logger.exception( + f"{write_topic_error} occured while closing write topic files." + ) + elif isinstance(write_topic_error, KeyError): + self.logger.exception( + f"{write_topic_error} occured while reading from files_write dictionary." + ) + else: + self.logger.exception( + f"{write_topic_error} occured while closing write topics." + ) def write(self, topic, key, msg): """ @@ -122,7 +145,10 @@ class MsgLocal(MsgBase): continue msg_dict = yaml.safe_load(self.buffer[single_topic]) self.buffer[single_topic] = "" - assert len(msg_dict) == 1 + if len(msg_dict) != 1: + raise ValueError( + "Length of message dictionary is not equal to 1" + ) for k, v in msg_dict.items(): return single_topic, k, v if not blocks: