blob: 337321ff9685e5dba019889494211ae6351e994f [file] [log] [blame]
tiernoae501922018-02-06 23:17:16 +01001import logging
tierno0aef0db2018-02-01 19:13:07 +01002import os
3import yaml
4import asyncio
5from msgbase import MsgBase, MsgException
tiernof3a54432018-03-21 11:34:00 +01006from time import sleep
tierno0aef0db2018-02-01 19:13:07 +01007
tiernoae501922018-02-06 23:17:16 +01008__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
tierno0aef0db2018-02-01 19:13:07 +01009
tiernof3a54432018-03-21 11:34:00 +010010"""
11This emulated kafka bus by just using a shared file system. Usefull for testing or devops.
12One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
13access to the same file. e.g. same volume if running with docker.
14One text line per message is used in yaml format
15"""
tierno0aef0db2018-02-01 19:13:07 +010016
tiernoae501922018-02-06 23:17:16 +010017class MsgLocal(MsgBase):
18
19 def __init__(self, logger_name='msg'):
20 self.logger = logging.getLogger(logger_name)
tierno0aef0db2018-02-01 19:13:07 +010021 self.path = None
22 # create a different file for each topic
23 self.files = {}
tiernof3a54432018-03-21 11:34:00 +010024 self.buffer = {}
tierno0aef0db2018-02-01 19:13:07 +010025
26 def connect(self, config):
27 try:
tiernoae501922018-02-06 23:17:16 +010028 if "logger_name" in config:
29 self.logger = logging.getLogger(config["logger_name"])
tierno0aef0db2018-02-01 19:13:07 +010030 self.path = config["path"]
31 if not self.path.endswith("/"):
32 self.path += "/"
33 if not os.path.exists(self.path):
34 os.mkdir(self.path)
35 except MsgException:
36 raise
37 except Exception as e: # TODO refine
38 raise MsgException(str(e))
39
40 def disconnect(self):
41 for f in self.files.values():
42 try:
43 f.close()
44 except Exception as e: # TODO refine
45 pass
46
47 def write(self, topic, key, msg):
48 """
49 Insert a message into topic
50 :param topic: topic
51 :param key: key text to be inserted
tiernof3a54432018-03-21 11:34:00 +010052 :param msg: value object to be inserted, can be str, object ...
tierno0aef0db2018-02-01 19:13:07 +010053 :return: None or raises and exception
54 """
55 try:
56 if topic not in self.files:
tiernof3a54432018-03-21 11:34:00 +010057 self.files[topic] = open(self.path + topic, "a+")
58 yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
tierno0aef0db2018-02-01 19:13:07 +010059 self.files[topic].flush()
60 except Exception as e: # TODO refine
61 raise MsgException(str(e))
62
tiernof3a54432018-03-21 11:34:00 +010063 def read(self, topic, blocks=True):
64 """
65 Read from one or several topics. it is non blocking returning None if nothing is available
66 :param topic: can be str: single topic; or str list: several topics
67 :param blocks: indicates if it should wait and block until a message is present or returns None
68 :return: topic, key, message; or None if blocks==True
69 """
tierno0aef0db2018-02-01 19:13:07 +010070 try:
tiernof3a54432018-03-21 11:34:00 +010071 if isinstance(topic, (list, tuple)):
72 topic_list = topic
73 else:
74 topic_list = (topic, )
75 while True:
76 for single_topic in topic_list:
77 if single_topic not in self.files:
78 self.files[single_topic] = open(self.path + single_topic, "a+")
79 self.buffer[single_topic] = ""
80 self.buffer[single_topic] += self.files[single_topic].readline()
81 if not self.buffer[single_topic].endswith("\n"):
82 continue
83 msg_dict = yaml.load(self.buffer[single_topic])
84 self.buffer[single_topic] = ""
85 assert len(msg_dict) == 1
86 for k, v in msg_dict.items():
87 return single_topic, k, v
88 if not blocks:
89 return None
90 sleep(2)
tierno0aef0db2018-02-01 19:13:07 +010091 except Exception as e: # TODO refine
92 raise MsgException(str(e))
93
tiernof3a54432018-03-21 11:34:00 +010094 async def aioread(self, topic, loop):
95 """
96 Asyncio read from one or several topics. It blocks
97 :param topic: can be str: single topic; or str list: several topics
98 :param loop: asyncio loop
99 :return: topic, key, message
100 """
tierno0aef0db2018-02-01 19:13:07 +0100101 try:
tierno0aef0db2018-02-01 19:13:07 +0100102 while True:
tiernof3a54432018-03-21 11:34:00 +0100103 msg = self.read(topic, blocks=False)
104 if msg:
105 return msg
tierno0aef0db2018-02-01 19:13:07 +0100106 await asyncio.sleep(2, loop=loop)
tiernof3a54432018-03-21 11:34:00 +0100107 except MsgException:
108 raise
tierno0aef0db2018-02-01 19:13:07 +0100109 except Exception as e: # TODO refine
110 raise MsgException(str(e))
tiernof3a54432018-03-21 11:34:00 +0100111