6d4cb58fed1200c4ea78fdf640358be181d62a55
[osm/common.git] / osm_common / msglocal.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import asyncio
19 from http import HTTPStatus
20 import logging
21 import os
22 from time import sleep
23
24 from osm_common.msgbase import MsgBase, MsgException
25 import yaml
26
27 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
28 """
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
33 """
34
35
36 class MsgLocal(MsgBase):
37 def __init__(self, logger_name="msg", lock=False):
38 super().__init__(logger_name, lock)
39 self.path = None
40 # create a different file for each topic
41 self.files_read = {}
42 self.files_write = {}
43 self.buffer = {}
44 self.loop = None
45
46 def connect(self, config):
47 try:
48 if "logger_name" in config:
49 self.logger = logging.getLogger(config["logger_name"])
50 self.path = config["path"]
51 if not self.path.endswith("/"):
52 self.path += "/"
53 if not os.path.exists(self.path):
54 os.mkdir(self.path)
55 self.loop = config.get("loop")
56
57 except MsgException:
58 raise
59 except Exception as e: # TODO refine
60 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
61
62 def disconnect(self):
63 for topic, f in self.files_read.items():
64 try:
65 f.close()
66 self.files_read[topic] = None
67 except Exception as read_topic_error:
68 if isinstance(read_topic_error, (IOError, FileNotFoundError)):
69 self.logger.exception(
70 f"{read_topic_error} occured while closing read topic files."
71 )
72 elif isinstance(read_topic_error, KeyError):
73 self.logger.exception(
74 f"{read_topic_error} occured while reading from files_read dictionary."
75 )
76 else:
77 self.logger.exception(
78 f"{read_topic_error} occured while closing read topics."
79 )
80
81 for topic, f in self.files_write.items():
82 try:
83 f.close()
84 self.files_write[topic] = None
85 except Exception as write_topic_error:
86 if isinstance(write_topic_error, (IOError, FileNotFoundError)):
87 self.logger.exception(
88 f"{write_topic_error} occured while closing write topic files."
89 )
90 elif isinstance(write_topic_error, KeyError):
91 self.logger.exception(
92 f"{write_topic_error} occured while reading from files_write dictionary."
93 )
94 else:
95 self.logger.exception(
96 f"{write_topic_error} occured while closing write topics."
97 )
98
99 def write(self, topic, key, msg):
100 """
101 Insert a message into topic
102 :param topic: topic
103 :param key: key text to be inserted
104 :param msg: value object to be inserted, can be str, object ...
105 :return: None or raises and exception
106 """
107 try:
108 with self.lock:
109 if topic not in self.files_write:
110 self.files_write[topic] = open(self.path + topic, "a+")
111 yaml.safe_dump(
112 {key: msg},
113 self.files_write[topic],
114 default_flow_style=True,
115 width=20000,
116 )
117 self.files_write[topic].flush()
118 except Exception as e: # TODO refine
119 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
120
121 def read(self, topic, blocks=True):
122 """
123 Read from one or several topics. it is non blocking returning None if nothing is available
124 :param topic: can be str: single topic; or str list: several topics
125 :param blocks: indicates if it should wait and block until a message is present or returns None
126 :return: topic, key, message; or None if blocks==True
127 """
128 try:
129 if isinstance(topic, (list, tuple)):
130 topic_list = topic
131 else:
132 topic_list = (topic,)
133 while True:
134 for single_topic in topic_list:
135 with self.lock:
136 if single_topic not in self.files_read:
137 self.files_read[single_topic] = open(
138 self.path + single_topic, "a+"
139 )
140 self.buffer[single_topic] = ""
141 self.buffer[single_topic] += self.files_read[
142 single_topic
143 ].readline()
144 if not self.buffer[single_topic].endswith("\n"):
145 continue
146 msg_dict = yaml.safe_load(self.buffer[single_topic])
147 self.buffer[single_topic] = ""
148 if len(msg_dict) != 1:
149 raise ValueError(
150 "Length of message dictionary is not equal to 1"
151 )
152 for k, v in msg_dict.items():
153 return single_topic, k, v
154 if not blocks:
155 return None
156 sleep(2)
157 except Exception as e: # TODO refine
158 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
159
160 async def aioread(
161 self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
162 ):
163 """
164 Asyncio read from one or several topics. It blocks
165 :param topic: can be str: single topic; or str list: several topics
166 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
167 :param callback: synchronous callback function that will handle the message
168 :param aiocallback: async callback function that will handle the message
169 :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
170 group_id provided at connect inside config), or a group_id string
171 :param kwargs: optional keyword arguments for callback function
172 :return: If no callback defined, it returns (topic, key, message)
173 """
174 _loop = loop or self.loop
175 try:
176 while True:
177 msg = self.read(topic, blocks=False)
178 if msg:
179 if callback:
180 callback(*msg, **kwargs)
181 elif aiocallback:
182 await aiocallback(*msg, **kwargs)
183 else:
184 return msg
185 await asyncio.sleep(2, loop=_loop)
186 except MsgException:
187 raise
188 except Exception as e: # TODO refine
189 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
190
191 async def aiowrite(self, topic, key, msg, loop=None):
192 """
193 Asyncio write. It blocks
194 :param topic: str
195 :param key: str
196 :param msg: message, can be str or yaml
197 :param loop: asyncio loop
198 :return: nothing if ok or raises an exception
199 """
200 return self.write(topic, key, msg)