blob: bfa30b77ca5d77bb248b76f42d3eeb0a4a9e0709 [file] [log] [blame]
tierno5c012612018-04-19 16:01:59 +02001import logging
2import os
3import yaml
4import asyncio
tierno3054f782018-04-25 16:59:53 +02005from osm_common.msgbase import MsgBase, MsgException
tierno5c012612018-04-19 16:01:59 +02006from time import sleep
7
8__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
9
10"""
11This emulated kafka bus by just using a shared file system. Useful for testing or devops.
tierno3054f782018-04-25 16:59:53 +020012One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
tierno5c012612018-04-19 16:01:59 +020013access to the same file. e.g. same volume if running with docker.
14One text line per message is used in yaml format.
15"""
16
tierno3054f782018-04-25 16:59:53 +020017
tierno5c012612018-04-19 16:01:59 +020018class MsgLocal(MsgBase):
19
20 def __init__(self, logger_name='msg'):
21 self.logger = logging.getLogger(logger_name)
22 self.path = None
23 # create a different file for each topic
24 self.files = {}
25 self.buffer = {}
26
27 def connect(self, config):
28 try:
29 if "logger_name" in config:
30 self.logger = logging.getLogger(config["logger_name"])
31 self.path = config["path"]
32 if not self.path.endswith("/"):
33 self.path += "/"
34 if not os.path.exists(self.path):
35 os.mkdir(self.path)
36 except MsgException:
37 raise
38 except Exception as e: # TODO refine
39 raise MsgException(str(e))
40
41 def disconnect(self):
42 for f in self.files.values():
43 try:
44 f.close()
tierno3054f782018-04-25 16:59:53 +020045 except Exception: # TODO refine
tierno5c012612018-04-19 16:01:59 +020046 pass
47
48 def write(self, topic, key, msg):
49 """
50 Insert a message into topic
51 :param topic: topic
52 :param key: key text to be inserted
53 :param msg: value object to be inserted, can be str, object ...
54 :return: None or raises and exception
55 """
56 try:
57 if topic not in self.files:
58 self.files[topic] = open(self.path + topic, "a+")
59 yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
60 self.files[topic].flush()
61 except Exception as e: # TODO refine
62 raise MsgException(str(e))
63
64 def read(self, topic, blocks=True):
65 """
66 Read from one or several topics. it is non blocking returning None if nothing is available
67 :param topic: can be str: single topic; or str list: several topics
68 :param blocks: indicates if it should wait and block until a message is present or returns None
69 :return: topic, key, message; or None if blocks==True
70 """
71 try:
72 if isinstance(topic, (list, tuple)):
73 topic_list = topic
74 else:
75 topic_list = (topic, )
76 while True:
77 for single_topic in topic_list:
78 if single_topic not in self.files:
79 self.files[single_topic] = open(self.path + single_topic, "a+")
80 self.buffer[single_topic] = ""
81 self.buffer[single_topic] += self.files[single_topic].readline()
82 if not self.buffer[single_topic].endswith("\n"):
83 continue
84 msg_dict = yaml.load(self.buffer[single_topic])
85 self.buffer[single_topic] = ""
86 assert len(msg_dict) == 1
87 for k, v in msg_dict.items():
88 return single_topic, k, v
89 if not blocks:
90 return None
91 sleep(2)
92 except Exception as e: # TODO refine
93 raise MsgException(str(e))
94
95 async def aioread(self, topic, loop):
96 """
97 Asyncio read from one or several topics. It blocks
98 :param topic: can be str: single topic; or str list: several topics
99 :param loop: asyncio loop
100 :return: topic, key, message
101 """
102 try:
103 while True:
104 msg = self.read(topic, blocks=False)
105 if msg:
106 return msg
107 await asyncio.sleep(2, loop=loop)
108 except MsgException:
109 raise
110 except Exception as e: # TODO refine
111 raise MsgException(str(e))