0c3b216e5349ac7bf4cb5a914adf749c8322e27a
[osm/common.git] / osm_common / msglocal.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import asyncio
19 from http import HTTPStatus
20 import logging
21 import os
22 from time import sleep
23
24 from osm_common.msgbase import MsgBase, MsgException
25 import yaml
26
27 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
28 """
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
33 """
34
35
36 class MsgLocal(MsgBase):
37 def __init__(self, logger_name="msg", lock=False):
38 super().__init__(logger_name, lock)
39 self.path = None
40 # create a different file for each topic
41 self.files_read = {}
42 self.files_write = {}
43 self.buffer = {}
44
45 def connect(self, config):
46 try:
47 if "logger_name" in config:
48 self.logger = logging.getLogger(config["logger_name"])
49 self.path = config["path"]
50 if not self.path.endswith("/"):
51 self.path += "/"
52 if not os.path.exists(self.path):
53 os.mkdir(self.path)
54
55 except MsgException:
56 raise
57 except Exception as e: # TODO refine
58 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
59
60 def disconnect(self):
61 for topic, f in self.files_read.items():
62 try:
63 f.close()
64 self.files_read[topic] = None
65 except Exception as read_topic_error:
66 if isinstance(read_topic_error, (IOError, FileNotFoundError)):
67 self.logger.exception(
68 f"{read_topic_error} occured while closing read topic files."
69 )
70 elif isinstance(read_topic_error, KeyError):
71 self.logger.exception(
72 f"{read_topic_error} occured while reading from files_read dictionary."
73 )
74 else:
75 self.logger.exception(
76 f"{read_topic_error} occured while closing read topics."
77 )
78
79 for topic, f in self.files_write.items():
80 try:
81 f.close()
82 self.files_write[topic] = None
83 except Exception as write_topic_error:
84 if isinstance(write_topic_error, (IOError, FileNotFoundError)):
85 self.logger.exception(
86 f"{write_topic_error} occured while closing write topic files."
87 )
88 elif isinstance(write_topic_error, KeyError):
89 self.logger.exception(
90 f"{write_topic_error} occured while reading from files_write dictionary."
91 )
92 else:
93 self.logger.exception(
94 f"{write_topic_error} occured while closing write topics."
95 )
96
97 def write(self, topic, key, msg):
98 """
99 Insert a message into topic
100 :param topic: topic
101 :param key: key text to be inserted
102 :param msg: value object to be inserted, can be str, object ...
103 :return: None or raises and exception
104 """
105 try:
106 with self.lock:
107 if topic not in self.files_write:
108 self.files_write[topic] = open(self.path + topic, "a+")
109 yaml.safe_dump(
110 {key: msg},
111 self.files_write[topic],
112 default_flow_style=True,
113 width=20000,
114 )
115 self.files_write[topic].flush()
116 except Exception as e: # TODO refine
117 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
118
119 def read(self, topic, blocks=True):
120 """
121 Read from one or several topics. it is non blocking returning None if nothing is available
122 :param topic: can be str: single topic; or str list: several topics
123 :param blocks: indicates if it should wait and block until a message is present or returns None
124 :return: topic, key, message; or None if blocks==True
125 """
126 try:
127 if isinstance(topic, (list, tuple)):
128 topic_list = topic
129 else:
130 topic_list = (topic,)
131 while True:
132 for single_topic in topic_list:
133 with self.lock:
134 if single_topic not in self.files_read:
135 self.files_read[single_topic] = open(
136 self.path + single_topic, "a+"
137 )
138 self.buffer[single_topic] = ""
139 self.buffer[single_topic] += self.files_read[
140 single_topic
141 ].readline()
142 if not self.buffer[single_topic].endswith("\n"):
143 continue
144 msg_dict = yaml.safe_load(self.buffer[single_topic])
145 self.buffer[single_topic] = ""
146 if len(msg_dict) != 1:
147 raise ValueError(
148 "Length of message dictionary is not equal to 1"
149 )
150 for k, v in msg_dict.items():
151 return single_topic, k, v
152 if not blocks:
153 return None
154 sleep(2)
155 except Exception as e: # TODO refine
156 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
157
158 async def aioread(
159 self, topic, callback=None, aiocallback=None, group_id=None, **kwargs
160 ):
161 """
162 Asyncio read from one or several topics. It blocks
163 :param topic: can be str: single topic; or str list: several topics
164 :param callback: synchronous callback function that will handle the message
165 :param aiocallback: async callback function that will handle the message
166 :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
167 group_id provided at connect inside config), or a group_id string
168 :param kwargs: optional keyword arguments for callback function
169 :return: If no callback defined, it returns (topic, key, message)
170 """
171 try:
172 while True:
173 msg = self.read(topic, blocks=False)
174 if msg:
175 if callback:
176 callback(*msg, **kwargs)
177 elif aiocallback:
178 await aiocallback(*msg, **kwargs)
179 else:
180 return msg
181 await asyncio.sleep(2)
182 except MsgException:
183 raise
184 except Exception as e: # TODO refine
185 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
186
187 async def aiowrite(self, topic, key, msg):
188 """
189 Asyncio write. It blocks
190 :param topic: str
191 :param key: str
192 :param msg: message, can be str or yaml
193 :return: nothing if ok or raises an exception
194 """
195 return self.write(topic, key, msg)