blob: 8fae7a2ec3de7f497d1b4fd7b86524a817dcea76 [file] [log] [blame]
tierno5c012612018-04-19 16:01:59 +02001import logging
2import os
3import yaml
4import asyncio
tierno3054f782018-04-25 16:59:53 +02005from osm_common.msgbase import MsgBase, MsgException
tierno5c012612018-04-19 16:01:59 +02006from time import sleep
7
8__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
9
10"""
11This emulated kafka bus by just using a shared file system. Useful for testing or devops.
tierno3054f782018-04-25 16:59:53 +020012One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
tierno5c012612018-04-19 16:01:59 +020013access to the same file. e.g. same volume if running with docker.
14One text line per message is used in yaml format.
15"""
16
tierno3054f782018-04-25 16:59:53 +020017
tierno5c012612018-04-19 16:01:59 +020018class MsgLocal(MsgBase):
19
20 def __init__(self, logger_name='msg'):
21 self.logger = logging.getLogger(logger_name)
22 self.path = None
23 # create a different file for each topic
tiernoe74238f2018-04-26 17:22:09 +020024 self.files_read = {}
25 self.files_write = {}
tierno5c012612018-04-19 16:01:59 +020026 self.buffer = {}
27
28 def connect(self, config):
29 try:
30 if "logger_name" in config:
31 self.logger = logging.getLogger(config["logger_name"])
32 self.path = config["path"]
33 if not self.path.endswith("/"):
34 self.path += "/"
35 if not os.path.exists(self.path):
36 os.mkdir(self.path)
37 except MsgException:
38 raise
39 except Exception as e: # TODO refine
40 raise MsgException(str(e))
41
42 def disconnect(self):
tiernoe74238f2018-04-26 17:22:09 +020043 for f in self.files_read.values():
44 try:
45 f.close()
46 except Exception: # TODO refine
47 pass
48 for f in self.files_write.values():
tierno5c012612018-04-19 16:01:59 +020049 try:
50 f.close()
tierno3054f782018-04-25 16:59:53 +020051 except Exception: # TODO refine
tierno5c012612018-04-19 16:01:59 +020052 pass
53
54 def write(self, topic, key, msg):
55 """
56 Insert a message into topic
57 :param topic: topic
58 :param key: key text to be inserted
59 :param msg: value object to be inserted, can be str, object ...
60 :return: None or raises and exception
61 """
62 try:
tiernoe74238f2018-04-26 17:22:09 +020063 if topic not in self.files_write:
64 self.files_write[topic] = open(self.path + topic, "a+")
65 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
66 self.files_write[topic].flush()
tierno5c012612018-04-19 16:01:59 +020067 except Exception as e: # TODO refine
68 raise MsgException(str(e))
69
70 def read(self, topic, blocks=True):
71 """
72 Read from one or several topics. it is non blocking returning None if nothing is available
73 :param topic: can be str: single topic; or str list: several topics
74 :param blocks: indicates if it should wait and block until a message is present or returns None
75 :return: topic, key, message; or None if blocks==True
76 """
77 try:
78 if isinstance(topic, (list, tuple)):
79 topic_list = topic
80 else:
81 topic_list = (topic, )
82 while True:
83 for single_topic in topic_list:
tiernoe74238f2018-04-26 17:22:09 +020084 if single_topic not in self.files_read:
85 self.files_read[single_topic] = open(self.path + single_topic, "a+")
tierno5c012612018-04-19 16:01:59 +020086 self.buffer[single_topic] = ""
tiernoe74238f2018-04-26 17:22:09 +020087 self.buffer[single_topic] += self.files_read[single_topic].readline()
tierno5c012612018-04-19 16:01:59 +020088 if not self.buffer[single_topic].endswith("\n"):
89 continue
90 msg_dict = yaml.load(self.buffer[single_topic])
91 self.buffer[single_topic] = ""
92 assert len(msg_dict) == 1
93 for k, v in msg_dict.items():
94 return single_topic, k, v
95 if not blocks:
96 return None
97 sleep(2)
98 except Exception as e: # TODO refine
99 raise MsgException(str(e))
100
101 async def aioread(self, topic, loop):
102 """
103 Asyncio read from one or several topics. It blocks
104 :param topic: can be str: single topic; or str list: several topics
105 :param loop: asyncio loop
106 :return: topic, key, message
107 """
108 try:
109 while True:
110 msg = self.read(topic, blocks=False)
111 if msg:
112 return msg
113 await asyncio.sleep(2, loop=loop)
114 except MsgException:
115 raise
116 except Exception as e: # TODO refine
117 raise MsgException(str(e))