Bug 559 adding encrypt/decrypt methods
[osm/common.git] / osm_common / msglocal.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import logging
19 import os
20 import yaml
21 import asyncio
22 from osm_common.msgbase import MsgBase, MsgException
23 from time import sleep
24 from http import HTTPStatus
25
26 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
27
28 """
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
33 """
34
35
36 class MsgLocal(MsgBase):
37
38 def __init__(self, logger_name='msg'):
39 self.logger = logging.getLogger(logger_name)
40 self.path = None
41 # create a different file for each topic
42 self.files_read = {}
43 self.files_write = {}
44 self.buffer = {}
45
46 def connect(self, config):
47 try:
48 if "logger_name" in config:
49 self.logger = logging.getLogger(config["logger_name"])
50 self.path = config["path"]
51 if not self.path.endswith("/"):
52 self.path += "/"
53 if not os.path.exists(self.path):
54 os.mkdir(self.path)
55 except MsgException:
56 raise
57 except Exception as e: # TODO refine
58 raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
59
60 def disconnect(self):
61 for f in self.files_read.values():
62 try:
63 f.close()
64 except Exception: # TODO refine
65 pass
66 for f in self.files_write.values():
67 try:
68 f.close()
69 except Exception: # TODO refine
70 pass
71
72 def write(self, topic, key, msg):
73 """
74 Insert a message into topic
75 :param topic: topic
76 :param key: key text to be inserted
77 :param msg: value object to be inserted, can be str, object ...
78 :return: None or raises and exception
79 """
80 try:
81 if topic not in self.files_write:
82 self.files_write[topic] = open(self.path + topic, "a+")
83 yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
84 self.files_write[topic].flush()
85 except Exception as e: # TODO refine
86 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
87
88 def read(self, topic, blocks=True):
89 """
90 Read from one or several topics. it is non blocking returning None if nothing is available
91 :param topic: can be str: single topic; or str list: several topics
92 :param blocks: indicates if it should wait and block until a message is present or returns None
93 :return: topic, key, message; or None if blocks==True
94 """
95 try:
96 if isinstance(topic, (list, tuple)):
97 topic_list = topic
98 else:
99 topic_list = (topic, )
100 while True:
101 for single_topic in topic_list:
102 if single_topic not in self.files_read:
103 self.files_read[single_topic] = open(self.path + single_topic, "a+")
104 self.buffer[single_topic] = ""
105 self.buffer[single_topic] += self.files_read[single_topic].readline()
106 if not self.buffer[single_topic].endswith("\n"):
107 continue
108 msg_dict = yaml.load(self.buffer[single_topic])
109 self.buffer[single_topic] = ""
110 assert len(msg_dict) == 1
111 for k, v in msg_dict.items():
112 return single_topic, k, v
113 if not blocks:
114 return None
115 sleep(2)
116 except Exception as e: # TODO refine
117 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
118
119 async def aioread(self, topic, loop):
120 """
121 Asyncio read from one or several topics. It blocks
122 :param topic: can be str: single topic; or str list: several topics
123 :param loop: asyncio loop
124 :return: topic, key, message
125 """
126 try:
127 while True:
128 msg = self.read(topic, blocks=False)
129 if msg:
130 return msg
131 await asyncio.sleep(2, loop=loop)
132 except MsgException:
133 raise
134 except Exception as e: # TODO refine
135 raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
136
137 async def aiowrite(self, topic, key, msg, loop=None):
138 """
139 Asyncio write. It blocks
140 :param topic: str
141 :param key: str
142 :param msg: message, can be str or yaml
143 :param loop: asyncio loop
144 :return: nothing if ok or raises an exception
145 """
146 return self.write(topic, key, msg)