blob: 843b3766a9dacf475294f11757b094282a681ea9 [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Telefonica S.A.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
tierno5c012612018-04-19 16:01:59 +020018import logging
19import os
20import yaml
21import asyncio
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
tierno5c012612018-04-19 16:01:59 +020023from time import sleep
tierno136f2952018-10-19 13:01:03 +020024from http import HTTPStatus
tierno5c012612018-04-19 16:01:59 +020025
26__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
27
28"""
29This emulated kafka bus by just using a shared file system. Useful for testing or devops.
tierno3054f782018-04-25 16:59:53 +020030One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
tierno5c012612018-04-19 16:01:59 +020031access to the same file. e.g. same volume if running with docker.
32One text line per message is used in yaml format.
33"""
34
tierno3054f782018-04-25 16:59:53 +020035
tierno5c012612018-04-19 16:01:59 +020036class MsgLocal(MsgBase):
37
tierno1e9a3292018-11-05 18:18:45 +010038 def __init__(self, logger_name='msg', lock=False):
39 super().__init__(logger_name, lock)
tierno5c012612018-04-19 16:01:59 +020040 self.path = None
41 # create a different file for each topic
tiernoe74238f2018-04-26 17:22:09 +020042 self.files_read = {}
43 self.files_write = {}
tierno5c012612018-04-19 16:01:59 +020044 self.buffer = {}
tierno05ede8f2019-01-28 16:20:18 +000045 self.loop = None
tierno5c012612018-04-19 16:01:59 +020046
47 def connect(self, config):
48 try:
49 if "logger_name" in config:
50 self.logger = logging.getLogger(config["logger_name"])
51 self.path = config["path"]
52 if not self.path.endswith("/"):
53 self.path += "/"
54 if not os.path.exists(self.path):
55 os.mkdir(self.path)
tierno05ede8f2019-01-28 16:20:18 +000056 self.loop = config.get("loop")
57
tierno5c012612018-04-19 16:01:59 +020058 except MsgException:
59 raise
60 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +020061 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +020062
63 def disconnect(self):
tierno1e9a3292018-11-05 18:18:45 +010064 for topic, f in self.files_read.items():
tiernoe74238f2018-04-26 17:22:09 +020065 try:
66 f.close()
tierno1e9a3292018-11-05 18:18:45 +010067 self.files_read[topic] = None
tiernoe74238f2018-04-26 17:22:09 +020068 except Exception: # TODO refine
69 pass
tierno1e9a3292018-11-05 18:18:45 +010070 for topic, f in self.files_write.items():
tierno5c012612018-04-19 16:01:59 +020071 try:
72 f.close()
tierno1e9a3292018-11-05 18:18:45 +010073 self.files_write[topic] = None
tierno3054f782018-04-25 16:59:53 +020074 except Exception: # TODO refine
tierno5c012612018-04-19 16:01:59 +020075 pass
76
77 def write(self, topic, key, msg):
78 """
79 Insert a message into topic
80 :param topic: topic
81 :param key: key text to be inserted
82 :param msg: value object to be inserted, can be str, object ...
83 :return: None or raises and exception
84 """
85 try:
tierno1e9a3292018-11-05 18:18:45 +010086 with self.lock:
87 if topic not in self.files_write:
88 self.files_write[topic] = open(self.path + topic, "a+")
89 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
90 self.files_write[topic].flush()
tierno5c012612018-04-19 16:01:59 +020091 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +020092 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +020093
94 def read(self, topic, blocks=True):
95 """
96 Read from one or several topics. it is non blocking returning None if nothing is available
97 :param topic: can be str: single topic; or str list: several topics
98 :param blocks: indicates if it should wait and block until a message is present or returns None
99 :return: topic, key, message; or None if blocks==True
100 """
101 try:
102 if isinstance(topic, (list, tuple)):
103 topic_list = topic
104 else:
105 topic_list = (topic, )
106 while True:
107 for single_topic in topic_list:
tierno1e9a3292018-11-05 18:18:45 +0100108 with self.lock:
109 if single_topic not in self.files_read:
110 self.files_read[single_topic] = open(self.path + single_topic, "a+")
111 self.buffer[single_topic] = ""
112 self.buffer[single_topic] += self.files_read[single_topic].readline()
113 if not self.buffer[single_topic].endswith("\n"):
114 continue
tierno6472e2b2019-09-02 16:04:16 +0000115 msg_dict = yaml.safe_load(self.buffer[single_topic])
tierno5c012612018-04-19 16:01:59 +0200116 self.buffer[single_topic] = ""
tierno1e9a3292018-11-05 18:18:45 +0100117 assert len(msg_dict) == 1
118 for k, v in msg_dict.items():
119 return single_topic, k, v
tierno5c012612018-04-19 16:01:59 +0200120 if not blocks:
121 return None
122 sleep(2)
123 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +0200124 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +0200125
tierno10602af2019-02-18 14:53:54 +0000126 async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs):
tierno5c012612018-04-19 16:01:59 +0200127 """
128 Asyncio read from one or several topics. It blocks
129 :param topic: can be str: single topic; or str list: several topics
tierno10602af2019-02-18 14:53:54 +0000130 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
131 :param callback: synchronous callback function that will handle the message
132 :param aiocallback: async callback function that will handle the message
133 :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
134 group_id provided at connect inside config), or a group_id string
135 :param kwargs: optional keyword arguments for callback function
136 :return: If no callback defined, it returns (topic, key, message)
tierno5c012612018-04-19 16:01:59 +0200137 """
tierno05ede8f2019-01-28 16:20:18 +0000138 _loop = loop or self.loop
tierno5c012612018-04-19 16:01:59 +0200139 try:
140 while True:
141 msg = self.read(topic, blocks=False)
142 if msg:
tierno14521832018-10-24 10:53:37 +0200143 if callback:
144 callback(*msg, **kwargs)
145 elif aiocallback:
146 await aiocallback(*msg, **kwargs)
147 else:
148 return msg
tierno05ede8f2019-01-28 16:20:18 +0000149 await asyncio.sleep(2, loop=_loop)
tierno5c012612018-04-19 16:01:59 +0200150 except MsgException:
151 raise
152 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +0200153 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tiernoebbf3532018-05-03 17:49:37 +0200154
155 async def aiowrite(self, topic, key, msg, loop=None):
156 """
157 Asyncio write. It blocks
158 :param topic: str
159 :param key: str
160 :param msg: message, can be str or yaml
161 :param loop: asyncio loop
162 :return: nothing if ok or raises an exception
163 """
164 return self.write(topic, key, msg)