fixing imports to get ready for module distribution.
[osm/common.git] / osm_common / msglocal.py
1 import logging
2 import os
3 import yaml
4 import asyncio
5 from osm_common.msgbase import MsgBase, MsgException
6 from time import sleep
7
8 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
9
10 """
11 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
12 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
13 access to the same file. e.g. same volume if running with docker.
14 One text line per message is used in yaml format.
15 """
16
17
18 class MsgLocal(MsgBase):
19
20 def __init__(self, logger_name='msg'):
21 self.logger = logging.getLogger(logger_name)
22 self.path = None
23 # create a different file for each topic
24 self.files = {}
25 self.buffer = {}
26
27 def connect(self, config):
28 try:
29 if "logger_name" in config:
30 self.logger = logging.getLogger(config["logger_name"])
31 self.path = config["path"]
32 if not self.path.endswith("/"):
33 self.path += "/"
34 if not os.path.exists(self.path):
35 os.mkdir(self.path)
36 except MsgException:
37 raise
38 except Exception as e: # TODO refine
39 raise MsgException(str(e))
40
41 def disconnect(self):
42 for f in self.files.values():
43 try:
44 f.close()
45 except Exception: # TODO refine
46 pass
47
48 def write(self, topic, key, msg):
49 """
50 Insert a message into topic
51 :param topic: topic
52 :param key: key text to be inserted
53 :param msg: value object to be inserted, can be str, object ...
54 :return: None or raises and exception
55 """
56 try:
57 if topic not in self.files:
58 self.files[topic] = open(self.path + topic, "a+")
59 yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
60 self.files[topic].flush()
61 except Exception as e: # TODO refine
62 raise MsgException(str(e))
63
64 def read(self, topic, blocks=True):
65 """
66 Read from one or several topics. it is non blocking returning None if nothing is available
67 :param topic: can be str: single topic; or str list: several topics
68 :param blocks: indicates if it should wait and block until a message is present or returns None
69 :return: topic, key, message; or None if blocks==True
70 """
71 try:
72 if isinstance(topic, (list, tuple)):
73 topic_list = topic
74 else:
75 topic_list = (topic, )
76 while True:
77 for single_topic in topic_list:
78 if single_topic not in self.files:
79 self.files[single_topic] = open(self.path + single_topic, "a+")
80 self.buffer[single_topic] = ""
81 self.buffer[single_topic] += self.files[single_topic].readline()
82 if not self.buffer[single_topic].endswith("\n"):
83 continue
84 msg_dict = yaml.load(self.buffer[single_topic])
85 self.buffer[single_topic] = ""
86 assert len(msg_dict) == 1
87 for k, v in msg_dict.items():
88 return single_topic, k, v
89 if not blocks:
90 return None
91 sleep(2)
92 except Exception as e: # TODO refine
93 raise MsgException(str(e))
94
95 async def aioread(self, topic, loop):
96 """
97 Asyncio read from one or several topics. It blocks
98 :param topic: can be str: single topic; or str list: several topics
99 :param loop: asyncio loop
100 :return: topic, key, message
101 """
102 try:
103 while True:
104 msg = self.read(topic, blocks=False)
105 if msg:
106 return msg
107 await asyncio.sleep(2, loop=loop)
108 except MsgException:
109 raise
110 except Exception as e: # TODO refine
111 raise MsgException(str(e))