blob: f731e741fdd801264f86c3c4b97fd630f48c4d88 [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Telefonica S.A.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
tierno5c012612018-04-19 16:01:59 +020018import logging
19import os
20import yaml
21import asyncio
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
tierno5c012612018-04-19 16:01:59 +020023from time import sleep
24
25__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
26
27"""
28This emulated kafka bus by just using a shared file system. Useful for testing or devops.
tierno3054f782018-04-25 16:59:53 +020029One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
tierno5c012612018-04-19 16:01:59 +020030access to the same file. e.g. same volume if running with docker.
31One text line per message is used in yaml format.
32"""
33
tierno3054f782018-04-25 16:59:53 +020034
tierno5c012612018-04-19 16:01:59 +020035class MsgLocal(MsgBase):
36
37 def __init__(self, logger_name='msg'):
38 self.logger = logging.getLogger(logger_name)
39 self.path = None
40 # create a different file for each topic
tiernoe74238f2018-04-26 17:22:09 +020041 self.files_read = {}
42 self.files_write = {}
tierno5c012612018-04-19 16:01:59 +020043 self.buffer = {}
44
45 def connect(self, config):
46 try:
47 if "logger_name" in config:
48 self.logger = logging.getLogger(config["logger_name"])
49 self.path = config["path"]
50 if not self.path.endswith("/"):
51 self.path += "/"
52 if not os.path.exists(self.path):
53 os.mkdir(self.path)
54 except MsgException:
55 raise
56 except Exception as e: # TODO refine
57 raise MsgException(str(e))
58
59 def disconnect(self):
tiernoe74238f2018-04-26 17:22:09 +020060 for f in self.files_read.values():
61 try:
62 f.close()
63 except Exception: # TODO refine
64 pass
65 for f in self.files_write.values():
tierno5c012612018-04-19 16:01:59 +020066 try:
67 f.close()
tierno3054f782018-04-25 16:59:53 +020068 except Exception: # TODO refine
tierno5c012612018-04-19 16:01:59 +020069 pass
70
71 def write(self, topic, key, msg):
72 """
73 Insert a message into topic
74 :param topic: topic
75 :param key: key text to be inserted
76 :param msg: value object to be inserted, can be str, object ...
77 :return: None or raises and exception
78 """
79 try:
tiernoe74238f2018-04-26 17:22:09 +020080 if topic not in self.files_write:
81 self.files_write[topic] = open(self.path + topic, "a+")
82 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
83 self.files_write[topic].flush()
tierno5c012612018-04-19 16:01:59 +020084 except Exception as e: # TODO refine
85 raise MsgException(str(e))
86
87 def read(self, topic, blocks=True):
88 """
89 Read from one or several topics. it is non blocking returning None if nothing is available
90 :param topic: can be str: single topic; or str list: several topics
91 :param blocks: indicates if it should wait and block until a message is present or returns None
92 :return: topic, key, message; or None if blocks==True
93 """
94 try:
95 if isinstance(topic, (list, tuple)):
96 topic_list = topic
97 else:
98 topic_list = (topic, )
99 while True:
100 for single_topic in topic_list:
tiernoe74238f2018-04-26 17:22:09 +0200101 if single_topic not in self.files_read:
102 self.files_read[single_topic] = open(self.path + single_topic, "a+")
tierno5c012612018-04-19 16:01:59 +0200103 self.buffer[single_topic] = ""
tiernoe74238f2018-04-26 17:22:09 +0200104 self.buffer[single_topic] += self.files_read[single_topic].readline()
tierno5c012612018-04-19 16:01:59 +0200105 if not self.buffer[single_topic].endswith("\n"):
106 continue
107 msg_dict = yaml.load(self.buffer[single_topic])
108 self.buffer[single_topic] = ""
109 assert len(msg_dict) == 1
110 for k, v in msg_dict.items():
111 return single_topic, k, v
112 if not blocks:
113 return None
114 sleep(2)
115 except Exception as e: # TODO refine
116 raise MsgException(str(e))
117
118 async def aioread(self, topic, loop):
119 """
120 Asyncio read from one or several topics. It blocks
121 :param topic: can be str: single topic; or str list: several topics
122 :param loop: asyncio loop
123 :return: topic, key, message
124 """
125 try:
126 while True:
127 msg = self.read(topic, blocks=False)
128 if msg:
129 return msg
130 await asyncio.sleep(2, loop=loop)
131 except MsgException:
132 raise
133 except Exception as e: # TODO refine
134 raise MsgException(str(e))
tiernoebbf3532018-05-03 17:49:37 +0200135
136 async def aiowrite(self, topic, key, msg, loop=None):
137 """
138 Asyncio write. It blocks
139 :param topic: str
140 :param key: str
141 :param msg: message, can be str or yaml
142 :param loop: asyncio loop
143 :return: nothing if ok or raises an exception
144 """
145 return self.write(topic, key, msg)