Fixes Bug 1273 - aiokafka>0.7.0 needs extra dependency for python < 3.7.0
[osm/common.git] / osm_common / msglocal.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import logging
19 import os
20 import yaml
21 import asyncio
22 from osm_common.msgbase import MsgBase, MsgException
23 from time import sleep
24 from http import HTTPStatus
25
26 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
27
28 """
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
33 """
34
35
36 class MsgLocal(MsgBase):
37
38 def __init__(self, logger_name='msg', lock=False):
39 super().__init__(logger_name, lock)
40 self.path = None
41 # create a different file for each topic
42 self.files_read = {}
43 self.files_write = {}
44 self.buffer = {}
45 self.loop = None
46
47 def connect(self, config):
48 try:
49 if "logger_name" in config:
50 self.logger = logging.getLogger(config["logger_name"])
51 self.path = config["path"]
52 if not self.path.endswith("/"):
53 self.path += "/"
54 if not os.path.exists(self.path):
55 os.mkdir(self.path)
56 self.loop = config.get("loop")
57
58 except MsgException:
59 raise
60 except Exception as e: # TODO refine
61 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
62
63 def disconnect(self):
64 for topic, f in self.files_read.items():
65 try:
66 f.close()
67 self.files_read[topic] = None
68 except Exception: # TODO refine
69 pass
70 for topic, f in self.files_write.items():
71 try:
72 f.close()
73 self.files_write[topic] = None
74 except Exception: # TODO refine
75 pass
76
77 def write(self, topic, key, msg):
78 """
79 Insert a message into topic
80 :param topic: topic
81 :param key: key text to be inserted
82 :param msg: value object to be inserted, can be str, object ...
83 :return: None or raises and exception
84 """
85 try:
86 with self.lock:
87 if topic not in self.files_write:
88 self.files_write[topic] = open(self.path + topic, "a+")
89 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
90 self.files_write[topic].flush()
91 except Exception as e: # TODO refine
92 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
93
94 def read(self, topic, blocks=True):
95 """
96 Read from one or several topics. it is non blocking returning None if nothing is available
97 :param topic: can be str: single topic; or str list: several topics
98 :param blocks: indicates if it should wait and block until a message is present or returns None
99 :return: topic, key, message; or None if blocks==True
100 """
101 try:
102 if isinstance(topic, (list, tuple)):
103 topic_list = topic
104 else:
105 topic_list = (topic, )
106 while True:
107 for single_topic in topic_list:
108 with self.lock:
109 if single_topic not in self.files_read:
110 self.files_read[single_topic] = open(self.path + single_topic, "a+")
111 self.buffer[single_topic] = ""
112 self.buffer[single_topic] += self.files_read[single_topic].readline()
113 if not self.buffer[single_topic].endswith("\n"):
114 continue
115 msg_dict = yaml.safe_load(self.buffer[single_topic])
116 self.buffer[single_topic] = ""
117 assert len(msg_dict) == 1
118 for k, v in msg_dict.items():
119 return single_topic, k, v
120 if not blocks:
121 return None
122 sleep(2)
123 except Exception as e: # TODO refine
124 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
125
126 async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs):
127 """
128 Asyncio read from one or several topics. It blocks
129 :param topic: can be str: single topic; or str list: several topics
130 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
131 :param callback: synchronous callback function that will handle the message
132 :param aiocallback: async callback function that will handle the message
133 :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
134 group_id provided at connect inside config), or a group_id string
135 :param kwargs: optional keyword arguments for callback function
136 :return: If no callback defined, it returns (topic, key, message)
137 """
138 _loop = loop or self.loop
139 try:
140 while True:
141 msg = self.read(topic, blocks=False)
142 if msg:
143 if callback:
144 callback(*msg, **kwargs)
145 elif aiocallback:
146 await aiocallback(*msg, **kwargs)
147 else:
148 return msg
149 await asyncio.sleep(2, loop=_loop)
150 except MsgException:
151 raise
152 except Exception as e: # TODO refine
153 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
154
155 async def aiowrite(self, topic, key, msg, loop=None):
156 """
157 Asyncio write. It blocks
158 :param topic: str
159 :param key: str
160 :param msg: message, can be str or yaml
161 :param loop: asyncio loop
162 :return: nothing if ok or raises an exception
163 """
164 return self.write(topic, key, msg)