c10ff17b0808c6e649755f3e7886b33db8d4ecf3
[osm/common.git] / osm_common / msglocal.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import logging
19 import os
20 import yaml
21 import asyncio
22 from osm_common.msgbase import MsgBase, MsgException
23 from time import sleep
24 from http import HTTPStatus
25
26 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
27
28 """
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
33 """
34
35
36 class MsgLocal(MsgBase):
37 def __init__(self, logger_name="msg", lock=False):
38 super().__init__(logger_name, lock)
39 self.path = None
40 # create a different file for each topic
41 self.files_read = {}
42 self.files_write = {}
43 self.buffer = {}
44 self.loop = None
45
46 def connect(self, config):
47 try:
48 if "logger_name" in config:
49 self.logger = logging.getLogger(config["logger_name"])
50 self.path = config["path"]
51 if not self.path.endswith("/"):
52 self.path += "/"
53 if not os.path.exists(self.path):
54 os.mkdir(self.path)
55 self.loop = config.get("loop")
56
57 except MsgException:
58 raise
59 except Exception as e: # TODO refine
60 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
61
62 def disconnect(self):
63 for topic, f in self.files_read.items():
64 try:
65 f.close()
66 self.files_read[topic] = None
67 except Exception: # TODO refine
68 pass
69 for topic, f in self.files_write.items():
70 try:
71 f.close()
72 self.files_write[topic] = None
73 except Exception: # TODO refine
74 pass
75
76 def write(self, topic, key, msg):
77 """
78 Insert a message into topic
79 :param topic: topic
80 :param key: key text to be inserted
81 :param msg: value object to be inserted, can be str, object ...
82 :return: None or raises and exception
83 """
84 try:
85 with self.lock:
86 if topic not in self.files_write:
87 self.files_write[topic] = open(self.path + topic, "a+")
88 yaml.safe_dump(
89 {key: msg},
90 self.files_write[topic],
91 default_flow_style=True,
92 width=20000,
93 )
94 self.files_write[topic].flush()
95 except Exception as e: # TODO refine
96 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
97
98 def read(self, topic, blocks=True):
99 """
100 Read from one or several topics. it is non blocking returning None if nothing is available
101 :param topic: can be str: single topic; or str list: several topics
102 :param blocks: indicates if it should wait and block until a message is present or returns None
103 :return: topic, key, message; or None if blocks==True
104 """
105 try:
106 if isinstance(topic, (list, tuple)):
107 topic_list = topic
108 else:
109 topic_list = (topic,)
110 while True:
111 for single_topic in topic_list:
112 with self.lock:
113 if single_topic not in self.files_read:
114 self.files_read[single_topic] = open(
115 self.path + single_topic, "a+"
116 )
117 self.buffer[single_topic] = ""
118 self.buffer[single_topic] += self.files_read[
119 single_topic
120 ].readline()
121 if not self.buffer[single_topic].endswith("\n"):
122 continue
123 msg_dict = yaml.safe_load(self.buffer[single_topic])
124 self.buffer[single_topic] = ""
125 assert len(msg_dict) == 1
126 for k, v in msg_dict.items():
127 return single_topic, k, v
128 if not blocks:
129 return None
130 sleep(2)
131 except Exception as e: # TODO refine
132 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
133
134 async def aioread(
135 self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
136 ):
137 """
138 Asyncio read from one or several topics. It blocks
139 :param topic: can be str: single topic; or str list: several topics
140 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
141 :param callback: synchronous callback function that will handle the message
142 :param aiocallback: async callback function that will handle the message
143 :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
144 group_id provided at connect inside config), or a group_id string
145 :param kwargs: optional keyword arguments for callback function
146 :return: If no callback defined, it returns (topic, key, message)
147 """
148 _loop = loop or self.loop
149 try:
150 while True:
151 msg = self.read(topic, blocks=False)
152 if msg:
153 if callback:
154 callback(*msg, **kwargs)
155 elif aiocallback:
156 await aiocallback(*msg, **kwargs)
157 else:
158 return msg
159 await asyncio.sleep(2, loop=_loop)
160 except MsgException:
161 raise
162 except Exception as e: # TODO refine
163 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
164
165 async def aiowrite(self, topic, key, msg, loop=None):
166 """
167 Asyncio write. It blocks
168 :param topic: str
169 :param key: str
170 :param msg: message, can be str or yaml
171 :param loop: asyncio loop
172 :return: nothing if ok or raises an exception
173 """
174 return self.write(topic, key, msg)