f731e741fdd801264f86c3c4b97fd630f48c4d88
[osm/common.git] / osm_common / msglocal.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import logging
19 import os
20 import yaml
21 import asyncio
22 from osm_common.msgbase import MsgBase, MsgException
23 from time import sleep
24
25 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
26
27 """
28 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
29 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
30 access to the same file. e.g. same volume if running with docker.
31 One text line per message is used in yaml format.
32 """
33
34
35 class MsgLocal(MsgBase):
36
37 def __init__(self, logger_name='msg'):
38 self.logger = logging.getLogger(logger_name)
39 self.path = None
40 # create a different file for each topic
41 self.files_read = {}
42 self.files_write = {}
43 self.buffer = {}
44
45 def connect(self, config):
46 try:
47 if "logger_name" in config:
48 self.logger = logging.getLogger(config["logger_name"])
49 self.path = config["path"]
50 if not self.path.endswith("/"):
51 self.path += "/"
52 if not os.path.exists(self.path):
53 os.mkdir(self.path)
54 except MsgException:
55 raise
56 except Exception as e: # TODO refine
57 raise MsgException(str(e))
58
59 def disconnect(self):
60 for f in self.files_read.values():
61 try:
62 f.close()
63 except Exception: # TODO refine
64 pass
65 for f in self.files_write.values():
66 try:
67 f.close()
68 except Exception: # TODO refine
69 pass
70
71 def write(self, topic, key, msg):
72 """
73 Insert a message into topic
74 :param topic: topic
75 :param key: key text to be inserted
76 :param msg: value object to be inserted, can be str, object ...
77 :return: None or raises and exception
78 """
79 try:
80 if topic not in self.files_write:
81 self.files_write[topic] = open(self.path + topic, "a+")
82 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
83 self.files_write[topic].flush()
84 except Exception as e: # TODO refine
85 raise MsgException(str(e))
86
87 def read(self, topic, blocks=True):
88 """
89 Read from one or several topics. it is non blocking returning None if nothing is available
90 :param topic: can be str: single topic; or str list: several topics
91 :param blocks: indicates if it should wait and block until a message is present or returns None
92 :return: topic, key, message; or None if blocks==True
93 """
94 try:
95 if isinstance(topic, (list, tuple)):
96 topic_list = topic
97 else:
98 topic_list = (topic, )
99 while True:
100 for single_topic in topic_list:
101 if single_topic not in self.files_read:
102 self.files_read[single_topic] = open(self.path + single_topic, "a+")
103 self.buffer[single_topic] = ""
104 self.buffer[single_topic] += self.files_read[single_topic].readline()
105 if not self.buffer[single_topic].endswith("\n"):
106 continue
107 msg_dict = yaml.load(self.buffer[single_topic])
108 self.buffer[single_topic] = ""
109 assert len(msg_dict) == 1
110 for k, v in msg_dict.items():
111 return single_topic, k, v
112 if not blocks:
113 return None
114 sleep(2)
115 except Exception as e: # TODO refine
116 raise MsgException(str(e))
117
118 async def aioread(self, topic, loop):
119 """
120 Asyncio read from one or several topics. It blocks
121 :param topic: can be str: single topic; or str list: several topics
122 :param loop: asyncio loop
123 :return: topic, key, message
124 """
125 try:
126 while True:
127 msg = self.read(topic, blocks=False)
128 if msg:
129 return msg
130 await asyncio.sleep(2, loop=loop)
131 except MsgException:
132 raise
133 except Exception as e: # TODO refine
134 raise MsgException(str(e))
135
136 async def aiowrite(self, topic, key, msg, loop=None):
137 """
138 Asyncio write. It blocks
139 :param topic: str
140 :param key: str
141 :param msg: message, can be str or yaml
142 :param loop: asyncio loop
143 :return: nothing if ok or raises an exception
144 """
145 return self.write(topic, key, msg)