Make common methods threading safe. pytest enhancements
[osm/common.git] / osm_common / msglocal.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import logging
19 import os
20 import yaml
21 import asyncio
22 from osm_common.msgbase import MsgBase, MsgException
23 from time import sleep
24 from http import HTTPStatus
25
26 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
27
28 """
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
33 """
34
35
36 class MsgLocal(MsgBase):
37
38 def __init__(self, logger_name='msg', lock=False):
39 super().__init__(logger_name, lock)
40 self.path = None
41 # create a different file for each topic
42 self.files_read = {}
43 self.files_write = {}
44 self.buffer = {}
45
46 def connect(self, config):
47 try:
48 if "logger_name" in config:
49 self.logger = logging.getLogger(config["logger_name"])
50 self.path = config["path"]
51 if not self.path.endswith("/"):
52 self.path += "/"
53 if not os.path.exists(self.path):
54 os.mkdir(self.path)
55 except MsgException:
56 raise
57 except Exception as e: # TODO refine
58 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
59
60 def disconnect(self):
61 for topic, f in self.files_read.items():
62 try:
63 f.close()
64 self.files_read[topic] = None
65 except Exception: # TODO refine
66 pass
67 for topic, f in self.files_write.items():
68 try:
69 f.close()
70 self.files_write[topic] = None
71 except Exception: # TODO refine
72 pass
73
74 def write(self, topic, key, msg):
75 """
76 Insert a message into topic
77 :param topic: topic
78 :param key: key text to be inserted
79 :param msg: value object to be inserted, can be str, object ...
80 :return: None or raises and exception
81 """
82 try:
83 with self.lock:
84 if topic not in self.files_write:
85 self.files_write[topic] = open(self.path + topic, "a+")
86 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
87 self.files_write[topic].flush()
88 except Exception as e: # TODO refine
89 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
90
91 def read(self, topic, blocks=True):
92 """
93 Read from one or several topics. it is non blocking returning None if nothing is available
94 :param topic: can be str: single topic; or str list: several topics
95 :param blocks: indicates if it should wait and block until a message is present or returns None
96 :return: topic, key, message; or None if blocks==True
97 """
98 try:
99 if isinstance(topic, (list, tuple)):
100 topic_list = topic
101 else:
102 topic_list = (topic, )
103 while True:
104 for single_topic in topic_list:
105 with self.lock:
106 if single_topic not in self.files_read:
107 self.files_read[single_topic] = open(self.path + single_topic, "a+")
108 self.buffer[single_topic] = ""
109 self.buffer[single_topic] += self.files_read[single_topic].readline()
110 if not self.buffer[single_topic].endswith("\n"):
111 continue
112 msg_dict = yaml.load(self.buffer[single_topic])
113 self.buffer[single_topic] = ""
114 assert len(msg_dict) == 1
115 for k, v in msg_dict.items():
116 return single_topic, k, v
117 if not blocks:
118 return None
119 sleep(2)
120 except Exception as e: # TODO refine
121 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
122
123 async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
124 """
125 Asyncio read from one or several topics. It blocks
126 :param topic: can be str: single topic; or str list: several topics
127 :param loop: asyncio loop
128 :return: topic, key, message
129 """
130 try:
131 while True:
132 msg = self.read(topic, blocks=False)
133 if msg:
134 if callback:
135 callback(*msg, **kwargs)
136 elif aiocallback:
137 await aiocallback(*msg, **kwargs)
138 else:
139 return msg
140 await asyncio.sleep(2, loop=loop)
141 except MsgException:
142 raise
143 except Exception as e: # TODO refine
144 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
145
146 async def aiowrite(self, topic, key, msg, loop=None):
147 """
148 Asyncio write. It blocks
149 :param topic: str
150 :param key: str
151 :param msg: message, can be str or yaml
152 :param loop: asyncio loop
153 :return: nothing if ok or raises an exception
154 """
155 return self.write(topic, key, msg)