75fc71718473b485b988ac099e0e58010b52de6d
[osm/common.git] / osm_common / msglocal.py
1 import logging
2 import os
3 import yaml
4 import asyncio
5 from osm_common.msgbase import MsgBase, MsgException
6 from time import sleep
7
8 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
9
10 """
11 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
12 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
13 access to the same file. e.g. same volume if running with docker.
14 One text line per message is used in yaml format.
15 """
16
17
18 class MsgLocal(MsgBase):
19
20 def __init__(self, logger_name='msg'):
21 self.logger = logging.getLogger(logger_name)
22 self.path = None
23 # create a different file for each topic
24 self.files_read = {}
25 self.files_write = {}
26 self.buffer = {}
27
28 def connect(self, config):
29 try:
30 if "logger_name" in config:
31 self.logger = logging.getLogger(config["logger_name"])
32 self.path = config["path"]
33 if not self.path.endswith("/"):
34 self.path += "/"
35 if not os.path.exists(self.path):
36 os.mkdir(self.path)
37 except MsgException:
38 raise
39 except Exception as e: # TODO refine
40 raise MsgException(str(e))
41
42 def disconnect(self):
43 for f in self.files_read.values():
44 try:
45 f.close()
46 except Exception: # TODO refine
47 pass
48 for f in self.files_write.values():
49 try:
50 f.close()
51 except Exception: # TODO refine
52 pass
53
54 def write(self, topic, key, msg):
55 """
56 Insert a message into topic
57 :param topic: topic
58 :param key: key text to be inserted
59 :param msg: value object to be inserted, can be str, object ...
60 :return: None or raises and exception
61 """
62 try:
63 if topic not in self.files_write:
64 self.files_write[topic] = open(self.path + topic, "a+")
65 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
66 self.files_write[topic].flush()
67 except Exception as e: # TODO refine
68 raise MsgException(str(e))
69
70 def read(self, topic, blocks=True):
71 """
72 Read from one or several topics. it is non blocking returning None if nothing is available
73 :param topic: can be str: single topic; or str list: several topics
74 :param blocks: indicates if it should wait and block until a message is present or returns None
75 :return: topic, key, message; or None if blocks==True
76 """
77 try:
78 if isinstance(topic, (list, tuple)):
79 topic_list = topic
80 else:
81 topic_list = (topic, )
82 while True:
83 for single_topic in topic_list:
84 if single_topic not in self.files_read:
85 self.files_read[single_topic] = open(self.path + single_topic, "a+")
86 self.buffer[single_topic] = ""
87 self.buffer[single_topic] += self.files_read[single_topic].readline()
88 if not self.buffer[single_topic].endswith("\n"):
89 continue
90 msg_dict = yaml.load(self.buffer[single_topic])
91 self.buffer[single_topic] = ""
92 assert len(msg_dict) == 1
93 for k, v in msg_dict.items():
94 return single_topic, k, v
95 if not blocks:
96 return None
97 sleep(2)
98 except Exception as e: # TODO refine
99 raise MsgException(str(e))
100
101 async def aioread(self, topic, loop):
102 """
103 Asyncio read from one or several topics. It blocks
104 :param topic: can be str: single topic; or str list: several topics
105 :param loop: asyncio loop
106 :return: topic, key, message
107 """
108 try:
109 while True:
110 msg = self.read(topic, blocks=False)
111 if msg:
112 return msg
113 await asyncio.sleep(2, loop=loop)
114 except MsgException:
115 raise
116 except Exception as e: # TODO refine
117 raise MsgException(str(e))
118
119 async def aiowrite(self, topic, key, msg, loop=None):
120 """
121 Asyncio write. It blocks
122 :param topic: str
123 :param key: str
124 :param msg: message, can be str or yaml
125 :param loop: asyncio loop
126 :return: nothing if ok or raises an exception
127 """
128 return self.write(topic, key, msg)