X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_nbi%2Fmsglocal.py;fp=osm_nbi%2Fmsglocal.py;h=a19c5c3964f809daa5fafefc80f3c7d4bcfd52ba;hb=c94c3df90aa64298a7935a80b221f80f3c043260;hp=0000000000000000000000000000000000000000;hpb=22ed16460edb54806e9b957be18cbafb2f63b54d;p=osm%2FNBI.git diff --git a/osm_nbi/msglocal.py b/osm_nbi/msglocal.py new file mode 100644 index 0000000..a19c5c3 --- /dev/null +++ b/osm_nbi/msglocal.py @@ -0,0 +1,95 @@ +import logging +import os +import yaml +import asyncio +from msgbase import MsgBase, MsgException + +__author__ = "Alfonso Tierno " + + +class MsgLocal(MsgBase): + + def __init__(self, logger_name='msg'): + self.logger = logging.getLogger(logger_name) + self.path = None + # create a different file for each topic + self.files = {} + + def connect(self, config): + try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) + self.path = config["path"] + if not self.path.endswith("/"): + self.path += "/" + if not os.path.exists(self.path): + os.mkdir(self.path) + except MsgException: + raise + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def disconnect(self): + for f in self.files.values(): + try: + f.close() + except Exception as e: # TODO refine + pass + + def write(self, topic, key, msg): + """ + Insert a message into topic + :param topic: topic + :param key: key text to be inserted + :param msg: value object to be inserted + :return: None or raises and exception + """ + try: + if topic not in self.files: + self.files[topic] = open(self.path + topic, "a+") + yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True) + self.files[topic].flush() + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def read(self, topic): + try: + msg = "" + if topic not in self.files: + self.files[topic] = open(self.path + topic, "a+") + # ignore previous content + for line in self.files[topic]: + if not line.endswith("\n"): + msg = line + msg += self.files[topic].readline() + if not msg.endswith("\n"): + return None + msg_dict = yaml.load(msg) + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return k, v + except Exception as e: # TODO refine + raise MsgException(str(e)) + + async def aioread(self, topic, loop=None): + try: + msg = "" + if not loop: + loop = asyncio.get_event_loop() + if topic not in self.files: + self.files[topic] = open(self.path + topic, "a+") + # ignore previous content + for line in self.files[topic]: + if not line.endswith("\n"): + msg = line + while True: + msg += self.files[topic].readline() + if msg.endswith("\n"): + break + await asyncio.sleep(2, loop=loop) + msg_dict = yaml.load(msg) + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return k, v + except Exception as e: # TODO refine + raise MsgException(str(e))