X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fmsglocal.py;h=337321ff9685e5dba019889494211ae6351e994f;hp=a19c5c3964f809daa5fafefc80f3c7d4bcfd52ba;hb=0f98af53b320c8244b58d0d8751e28e157949e8e;hpb=c94c3df90aa64298a7935a80b221f80f3c043260 diff --git a/osm_nbi/msglocal.py b/osm_nbi/msglocal.py index a19c5c3..337321f 100644 --- a/osm_nbi/msglocal.py +++ b/osm_nbi/msglocal.py @@ -3,9 +3,16 @@ import os import yaml import asyncio from msgbase import MsgBase, MsgException +from time import sleep __author__ = "Alfonso Tierno " +""" +This emulated kafka bus by just using a shared file system. Usefull for testing or devops. +One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer +access to the same file. e.g. same volume if running with docker. +One text line per message is used in yaml format +""" class MsgLocal(MsgBase): @@ -14,6 +21,7 @@ class MsgLocal(MsgBase): self.path = None # create a different file for each topic self.files = {} + self.buffer = {} def connect(self, config): try: @@ -41,55 +49,63 @@ class MsgLocal(MsgBase): Insert a message into topic :param topic: topic :param key: key text to be inserted - :param msg: value object to be inserted + :param msg: value object to be inserted, can be str, object ... :return: None or raises and exception """ try: if topic not in self.files: self.files[topic] = open(self.path + topic, "a+") - yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True) + yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000) self.files[topic].flush() except Exception as e: # TODO refine raise MsgException(str(e)) - def read(self, topic): + def read(self, topic, blocks=True): + """ + Read from one or several topics. it is non blocking returning None if nothing is available + :param topic: can be str: single topic; or str list: several topics + :param blocks: indicates if it should wait and block until a message is present or returns None + :return: topic, key, message; or None if blocks==True + """ try: - msg = "" - if topic not in self.files: - self.files[topic] = open(self.path + topic, "a+") - # ignore previous content - for line in self.files[topic]: - if not line.endswith("\n"): - msg = line - msg += self.files[topic].readline() - if not msg.endswith("\n"): - return None - msg_dict = yaml.load(msg) - assert len(msg_dict) == 1 - for k, v in msg_dict.items(): - return k, v + if isinstance(topic, (list, tuple)): + topic_list = topic + else: + topic_list = (topic, ) + while True: + for single_topic in topic_list: + if single_topic not in self.files: + self.files[single_topic] = open(self.path + single_topic, "a+") + self.buffer[single_topic] = "" + self.buffer[single_topic] += self.files[single_topic].readline() + if not self.buffer[single_topic].endswith("\n"): + continue + msg_dict = yaml.load(self.buffer[single_topic]) + self.buffer[single_topic] = "" + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return single_topic, k, v + if not blocks: + return None + sleep(2) except Exception as e: # TODO refine raise MsgException(str(e)) - async def aioread(self, topic, loop=None): + async def aioread(self, topic, loop): + """ + Asyncio read from one or several topics. It blocks + :param topic: can be str: single topic; or str list: several topics + :param loop: asyncio loop + :return: topic, key, message + """ try: - msg = "" - if not loop: - loop = asyncio.get_event_loop() - if topic not in self.files: - self.files[topic] = open(self.path + topic, "a+") - # ignore previous content - for line in self.files[topic]: - if not line.endswith("\n"): - msg = line while True: - msg += self.files[topic].readline() - if msg.endswith("\n"): - break + msg = self.read(topic, blocks=False) + if msg: + return msg await asyncio.sleep(2, loop=loop) - msg_dict = yaml.load(msg) - assert len(msg_dict) == 1 - for k, v in msg_dict.items(): - return k, v + except MsgException: + raise except Exception as e: # TODO refine raise MsgException(str(e)) +