blob: 6d4cb58fed1200c4ea78fdf640358be181d62a55 [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Telefonica S.A.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
aticig3dd0db62022-03-04 19:35:45 +030018import asyncio
19from http import HTTPStatus
tierno5c012612018-04-19 16:01:59 +020020import logging
21import os
tierno5c012612018-04-19 16:01:59 +020022from time import sleep
aticig3dd0db62022-03-04 19:35:45 +030023
24from osm_common.msgbase import MsgBase, MsgException
25import yaml
tierno5c012612018-04-19 16:01:59 +020026
27__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
tierno5c012612018-04-19 16:01:59 +020028"""
29This emulated kafka bus by just using a shared file system. Useful for testing or devops.
tierno3054f782018-04-25 16:59:53 +020030One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
tierno5c012612018-04-19 16:01:59 +020031access to the same file. e.g. same volume if running with docker.
32One text line per message is used in yaml format.
33"""
34
tierno3054f782018-04-25 16:59:53 +020035
tierno5c012612018-04-19 16:01:59 +020036class MsgLocal(MsgBase):
garciadeblas2644b762021-03-24 09:21:01 +010037 def __init__(self, logger_name="msg", lock=False):
tierno1e9a3292018-11-05 18:18:45 +010038 super().__init__(logger_name, lock)
tierno5c012612018-04-19 16:01:59 +020039 self.path = None
40 # create a different file for each topic
tiernoe74238f2018-04-26 17:22:09 +020041 self.files_read = {}
42 self.files_write = {}
tierno5c012612018-04-19 16:01:59 +020043 self.buffer = {}
tierno05ede8f2019-01-28 16:20:18 +000044 self.loop = None
tierno5c012612018-04-19 16:01:59 +020045
46 def connect(self, config):
47 try:
48 if "logger_name" in config:
49 self.logger = logging.getLogger(config["logger_name"])
50 self.path = config["path"]
51 if not self.path.endswith("/"):
52 self.path += "/"
53 if not os.path.exists(self.path):
54 os.mkdir(self.path)
tierno05ede8f2019-01-28 16:20:18 +000055 self.loop = config.get("loop")
56
tierno5c012612018-04-19 16:01:59 +020057 except MsgException:
58 raise
59 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +020060 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +020061
62 def disconnect(self):
tierno1e9a3292018-11-05 18:18:45 +010063 for topic, f in self.files_read.items():
tiernoe74238f2018-04-26 17:22:09 +020064 try:
65 f.close()
tierno1e9a3292018-11-05 18:18:45 +010066 self.files_read[topic] = None
aticigd3b582a2022-08-24 22:41:56 +030067 except Exception as read_topic_error:
68 if isinstance(read_topic_error, (IOError, FileNotFoundError)):
69 self.logger.exception(
70 f"{read_topic_error} occured while closing read topic files."
71 )
72 elif isinstance(read_topic_error, KeyError):
73 self.logger.exception(
74 f"{read_topic_error} occured while reading from files_read dictionary."
75 )
76 else:
77 self.logger.exception(
78 f"{read_topic_error} occured while closing read topics."
79 )
80
tierno1e9a3292018-11-05 18:18:45 +010081 for topic, f in self.files_write.items():
tierno5c012612018-04-19 16:01:59 +020082 try:
83 f.close()
tierno1e9a3292018-11-05 18:18:45 +010084 self.files_write[topic] = None
aticigd3b582a2022-08-24 22:41:56 +030085 except Exception as write_topic_error:
86 if isinstance(write_topic_error, (IOError, FileNotFoundError)):
87 self.logger.exception(
88 f"{write_topic_error} occured while closing write topic files."
89 )
90 elif isinstance(write_topic_error, KeyError):
91 self.logger.exception(
92 f"{write_topic_error} occured while reading from files_write dictionary."
93 )
94 else:
95 self.logger.exception(
96 f"{write_topic_error} occured while closing write topics."
97 )
tierno5c012612018-04-19 16:01:59 +020098
99 def write(self, topic, key, msg):
100 """
101 Insert a message into topic
102 :param topic: topic
103 :param key: key text to be inserted
104 :param msg: value object to be inserted, can be str, object ...
105 :return: None or raises and exception
106 """
107 try:
tierno1e9a3292018-11-05 18:18:45 +0100108 with self.lock:
109 if topic not in self.files_write:
110 self.files_write[topic] = open(self.path + topic, "a+")
garciadeblas2644b762021-03-24 09:21:01 +0100111 yaml.safe_dump(
112 {key: msg},
113 self.files_write[topic],
114 default_flow_style=True,
115 width=20000,
116 )
tierno1e9a3292018-11-05 18:18:45 +0100117 self.files_write[topic].flush()
tierno5c012612018-04-19 16:01:59 +0200118 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +0200119 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +0200120
121 def read(self, topic, blocks=True):
122 """
123 Read from one or several topics. it is non blocking returning None if nothing is available
124 :param topic: can be str: single topic; or str list: several topics
125 :param blocks: indicates if it should wait and block until a message is present or returns None
126 :return: topic, key, message; or None if blocks==True
127 """
128 try:
129 if isinstance(topic, (list, tuple)):
130 topic_list = topic
131 else:
garciadeblas2644b762021-03-24 09:21:01 +0100132 topic_list = (topic,)
tierno5c012612018-04-19 16:01:59 +0200133 while True:
134 for single_topic in topic_list:
tierno1e9a3292018-11-05 18:18:45 +0100135 with self.lock:
136 if single_topic not in self.files_read:
garciadeblas2644b762021-03-24 09:21:01 +0100137 self.files_read[single_topic] = open(
138 self.path + single_topic, "a+"
139 )
tierno1e9a3292018-11-05 18:18:45 +0100140 self.buffer[single_topic] = ""
garciadeblas2644b762021-03-24 09:21:01 +0100141 self.buffer[single_topic] += self.files_read[
142 single_topic
143 ].readline()
tierno1e9a3292018-11-05 18:18:45 +0100144 if not self.buffer[single_topic].endswith("\n"):
145 continue
tierno6472e2b2019-09-02 16:04:16 +0000146 msg_dict = yaml.safe_load(self.buffer[single_topic])
tierno5c012612018-04-19 16:01:59 +0200147 self.buffer[single_topic] = ""
aticigd3b582a2022-08-24 22:41:56 +0300148 if len(msg_dict) != 1:
149 raise ValueError(
150 "Length of message dictionary is not equal to 1"
151 )
tierno1e9a3292018-11-05 18:18:45 +0100152 for k, v in msg_dict.items():
153 return single_topic, k, v
tierno5c012612018-04-19 16:01:59 +0200154 if not blocks:
155 return None
156 sleep(2)
157 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +0200158 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +0200159
garciadeblas2644b762021-03-24 09:21:01 +0100160 async def aioread(
161 self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
162 ):
tierno5c012612018-04-19 16:01:59 +0200163 """
164 Asyncio read from one or several topics. It blocks
165 :param topic: can be str: single topic; or str list: several topics
tierno10602af2019-02-18 14:53:54 +0000166 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
167 :param callback: synchronous callback function that will handle the message
168 :param aiocallback: async callback function that will handle the message
169 :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
170 group_id provided at connect inside config), or a group_id string
171 :param kwargs: optional keyword arguments for callback function
172 :return: If no callback defined, it returns (topic, key, message)
tierno5c012612018-04-19 16:01:59 +0200173 """
tierno05ede8f2019-01-28 16:20:18 +0000174 _loop = loop or self.loop
tierno5c012612018-04-19 16:01:59 +0200175 try:
176 while True:
177 msg = self.read(topic, blocks=False)
178 if msg:
tierno14521832018-10-24 10:53:37 +0200179 if callback:
180 callback(*msg, **kwargs)
181 elif aiocallback:
182 await aiocallback(*msg, **kwargs)
183 else:
184 return msg
tierno05ede8f2019-01-28 16:20:18 +0000185 await asyncio.sleep(2, loop=_loop)
tierno5c012612018-04-19 16:01:59 +0200186 except MsgException:
187 raise
188 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +0200189 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tiernoebbf3532018-05-03 17:49:37 +0200190
191 async def aiowrite(self, topic, key, msg, loop=None):
192 """
193 Asyncio write. It blocks
194 :param topic: str
195 :param key: str
196 :param msg: message, can be str or yaml
197 :param loop: asyncio loop
198 :return: nothing if ok or raises an exception
199 """
200 return self.write(topic, key, msg)