X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=lcm%2Fosm_common%2Fmsglocal.py;h=337321ff9685e5dba019889494211ae6351e994f;hb=54467bb7f4edec1fa4fe195fe4ccf34fea5d7f9e;hp=a380e618da6565f5844a5020acc2080120cb025a;hpb=f3c4dbc42e206bcc0d4d3369f6d0d156d7ffe669;p=osm%2FRO.git diff --git a/lcm/osm_common/msglocal.py b/lcm/osm_common/msglocal.py index a380e618..337321ff 100644 --- a/lcm/osm_common/msglocal.py +++ b/lcm/osm_common/msglocal.py @@ -1,18 +1,32 @@ +import logging import os import yaml import asyncio from msgbase import MsgBase, MsgException +from time import sleep +__author__ = "Alfonso Tierno " -class msgLocal(MsgBase): +""" +This emulated kafka bus by just using a shared file system. Usefull for testing or devops. +One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer +access to the same file. e.g. same volume if running with docker. +One text line per message is used in yaml format +""" - def __init__(self): +class MsgLocal(MsgBase): + + def __init__(self, logger_name='msg'): + self.logger = logging.getLogger(logger_name) self.path = None # create a different file for each topic self.files = {} + self.buffer = {} def connect(self, config): try: + if "logger_name" in config: + self.logger = logging.getLogger(config["logger_name"]) self.path = config["path"] if not self.path.endswith("/"): self.path += "/" @@ -35,46 +49,63 @@ class msgLocal(MsgBase): Insert a message into topic :param topic: topic :param key: key text to be inserted - :param msg: value object to be inserted + :param msg: value object to be inserted, can be str, object ... :return: None or raises and exception """ try: if topic not in self.files: - self.files[topic] = open(self.path + topic, "w+") - yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True) + self.files[topic] = open(self.path + topic, "a+") + yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000) self.files[topic].flush() except Exception as e: # TODO refine raise MsgException(str(e)) - def read(self, topic): + def read(self, topic, blocks=True): + """ + Read from one or several topics. it is non blocking returning None if nothing is available + :param topic: can be str: single topic; or str list: several topics + :param blocks: indicates if it should wait and block until a message is present or returns None + :return: topic, key, message; or None if blocks==True + """ try: - if topic not in self.files: - self.files[topic] = open(self.path + topic, "r+") - msg = self.files[topic].read() - msg_dict = yaml.load(msg) - assert len(msg_dict) == 1 - for k, v in msg_dict.items(): - return k, v + if isinstance(topic, (list, tuple)): + topic_list = topic + else: + topic_list = (topic, ) + while True: + for single_topic in topic_list: + if single_topic not in self.files: + self.files[single_topic] = open(self.path + single_topic, "a+") + self.buffer[single_topic] = "" + self.buffer[single_topic] += self.files[single_topic].readline() + if not self.buffer[single_topic].endswith("\n"): + continue + msg_dict = yaml.load(self.buffer[single_topic]) + self.buffer[single_topic] = "" + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return single_topic, k, v + if not blocks: + return None + sleep(2) except Exception as e: # TODO refine raise MsgException(str(e)) - async def aioread(self, topic, loop=None): + async def aioread(self, topic, loop): + """ + Asyncio read from one or several topics. It blocks + :param topic: can be str: single topic; or str list: several topics + :param loop: asyncio loop + :return: topic, key, message + """ try: - if not loop: - loop = asyncio.get_event_loop() - if topic not in self.files: - self.files[topic] = open(self.path + topic, "r+") - # ignore previous content - while self.files[topic].read(): - pass while True: - msg = self.files[topic].read() + msg = self.read(topic, blocks=False) if msg: - break + return msg await asyncio.sleep(2, loop=loop) - msg_dict = yaml.load(msg) - assert len(msg_dict) == 1 - for k, v in msg_dict.items(): - return k, v + except MsgException: + raise except Exception as e: # TODO refine raise MsgException(str(e)) +