blob: 247de7b8cf4e5d34ae3d0147d19ee99b0f7ec274 [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Telefonica S.A.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
tierno5c012612018-04-19 16:01:59 +020018import logging
19import os
20import yaml
21import asyncio
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
tierno5c012612018-04-19 16:01:59 +020023from time import sleep
tierno136f2952018-10-19 13:01:03 +020024from http import HTTPStatus
tierno5c012612018-04-19 16:01:59 +020025
26__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
27
28"""
29This emulated kafka bus by just using a shared file system. Useful for testing or devops.
tierno3054f782018-04-25 16:59:53 +020030One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
tierno5c012612018-04-19 16:01:59 +020031access to the same file. e.g. same volume if running with docker.
32One text line per message is used in yaml format.
33"""
34
tierno3054f782018-04-25 16:59:53 +020035
tierno5c012612018-04-19 16:01:59 +020036class MsgLocal(MsgBase):
37
38 def __init__(self, logger_name='msg'):
39 self.logger = logging.getLogger(logger_name)
40 self.path = None
41 # create a different file for each topic
tiernoe74238f2018-04-26 17:22:09 +020042 self.files_read = {}
43 self.files_write = {}
tierno5c012612018-04-19 16:01:59 +020044 self.buffer = {}
45
46 def connect(self, config):
47 try:
48 if "logger_name" in config:
49 self.logger = logging.getLogger(config["logger_name"])
50 self.path = config["path"]
51 if not self.path.endswith("/"):
52 self.path += "/"
53 if not os.path.exists(self.path):
54 os.mkdir(self.path)
55 except MsgException:
56 raise
57 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +020058 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +020059
60 def disconnect(self):
tiernoe74238f2018-04-26 17:22:09 +020061 for f in self.files_read.values():
62 try:
63 f.close()
64 except Exception: # TODO refine
65 pass
66 for f in self.files_write.values():
tierno5c012612018-04-19 16:01:59 +020067 try:
68 f.close()
tierno3054f782018-04-25 16:59:53 +020069 except Exception: # TODO refine
tierno5c012612018-04-19 16:01:59 +020070 pass
71
72 def write(self, topic, key, msg):
73 """
74 Insert a message into topic
75 :param topic: topic
76 :param key: key text to be inserted
77 :param msg: value object to be inserted, can be str, object ...
78 :return: None or raises and exception
79 """
80 try:
tiernoe74238f2018-04-26 17:22:09 +020081 if topic not in self.files_write:
82 self.files_write[topic] = open(self.path + topic, "a+")
83 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
84 self.files_write[topic].flush()
tierno5c012612018-04-19 16:01:59 +020085 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +020086 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +020087
88 def read(self, topic, blocks=True):
89 """
90 Read from one or several topics. it is non blocking returning None if nothing is available
91 :param topic: can be str: single topic; or str list: several topics
92 :param blocks: indicates if it should wait and block until a message is present or returns None
93 :return: topic, key, message; or None if blocks==True
94 """
95 try:
96 if isinstance(topic, (list, tuple)):
97 topic_list = topic
98 else:
99 topic_list = (topic, )
100 while True:
101 for single_topic in topic_list:
tiernoe74238f2018-04-26 17:22:09 +0200102 if single_topic not in self.files_read:
103 self.files_read[single_topic] = open(self.path + single_topic, "a+")
tierno5c012612018-04-19 16:01:59 +0200104 self.buffer[single_topic] = ""
tiernoe74238f2018-04-26 17:22:09 +0200105 self.buffer[single_topic] += self.files_read[single_topic].readline()
tierno5c012612018-04-19 16:01:59 +0200106 if not self.buffer[single_topic].endswith("\n"):
107 continue
108 msg_dict = yaml.load(self.buffer[single_topic])
109 self.buffer[single_topic] = ""
110 assert len(msg_dict) == 1
111 for k, v in msg_dict.items():
112 return single_topic, k, v
113 if not blocks:
114 return None
115 sleep(2)
116 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +0200117 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tierno5c012612018-04-19 16:01:59 +0200118
119 async def aioread(self, topic, loop):
120 """
121 Asyncio read from one or several topics. It blocks
122 :param topic: can be str: single topic; or str list: several topics
123 :param loop: asyncio loop
124 :return: topic, key, message
125 """
126 try:
127 while True:
128 msg = self.read(topic, blocks=False)
129 if msg:
130 return msg
131 await asyncio.sleep(2, loop=loop)
132 except MsgException:
133 raise
134 except Exception as e: # TODO refine
tierno136f2952018-10-19 13:01:03 +0200135 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
tiernoebbf3532018-05-03 17:49:37 +0200136
137 async def aiowrite(self, topic, key, msg, loop=None):
138 """
139 Asyncio write. It blocks
140 :param topic: str
141 :param key: str
142 :param msg: message, can be str or yaml
143 :param loop: asyncio loop
144 :return: nothing if ok or raises an exception
145 """
146 return self.write(topic, key, msg)