1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
22 from osm_common
.msgbase
import MsgBase
, MsgException
23 from time
import sleep
24 from http
import HTTPStatus
26 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
36 class MsgLocal(MsgBase
):
38 def __init__(self
, logger_name
='msg', lock
=False):
39 super().__init
__(logger_name
, lock
)
41 # create a different file for each topic
47 def connect(self
, config
):
49 if "logger_name" in config
:
50 self
.logger
= logging
.getLogger(config
["logger_name"])
51 self
.path
= config
["path"]
52 if not self
.path
.endswith("/"):
54 if not os
.path
.exists(self
.path
):
56 self
.loop
= config
.get("loop")
60 except Exception as e
: # TODO refine
61 raise MsgException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
64 for topic
, f
in self
.files_read
.items():
67 self
.files_read
[topic
] = None
68 except Exception: # TODO refine
70 for topic
, f
in self
.files_write
.items():
73 self
.files_write
[topic
] = None
74 except Exception: # TODO refine
77 def write(self
, topic
, key
, msg
):
79 Insert a message into topic
81 :param key: key text to be inserted
82 :param msg: value object to be inserted, can be str, object ...
83 :return: None or raises and exception
87 if topic
not in self
.files_write
:
88 self
.files_write
[topic
] = open(self
.path
+ topic
, "a+")
89 yaml
.safe_dump({key
: msg
}, self
.files_write
[topic
], default_flow_style
=True, width
=20000)
90 self
.files_write
[topic
].flush()
91 except Exception as e
: # TODO refine
92 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
94 def read(self
, topic
, blocks
=True):
96 Read from one or several topics. it is non blocking returning None if nothing is available
97 :param topic: can be str: single topic; or str list: several topics
98 :param blocks: indicates if it should wait and block until a message is present or returns None
99 :return: topic, key, message; or None if blocks==True
102 if isinstance(topic
, (list, tuple)):
105 topic_list
= (topic
, )
107 for single_topic
in topic_list
:
109 if single_topic
not in self
.files_read
:
110 self
.files_read
[single_topic
] = open(self
.path
+ single_topic
, "a+")
111 self
.buffer[single_topic
] = ""
112 self
.buffer[single_topic
] += self
.files_read
[single_topic
].readline()
113 if not self
.buffer[single_topic
].endswith("\n"):
115 msg_dict
= yaml
.load(self
.buffer[single_topic
])
116 self
.buffer[single_topic
] = ""
117 assert len(msg_dict
) == 1
118 for k
, v
in msg_dict
.items():
119 return single_topic
, k
, v
123 except Exception as e
: # TODO refine
124 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
126 async def aioread(self
, topic
, loop
=None, callback
=None, aiocallback
=None, **kwargs
):
128 Asyncio read from one or several topics. It blocks
129 :param topic: can be str: single topic; or str list: several topics
130 :param loop: asyncio loop
131 :return: topic, key, message
133 _loop
= loop
or self
.loop
136 msg
= self
.read(topic
, blocks
=False)
139 callback(*msg
, **kwargs
)
141 await aiocallback(*msg
, **kwargs
)
144 await asyncio
.sleep(2, loop
=_loop
)
147 except Exception as e
: # TODO refine
148 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
150 async def aiowrite(self
, topic
, key
, msg
, loop
=None):
152 Asyncio write. It blocks
155 :param msg: message, can be str or yaml
156 :param loop: asyncio loop
157 :return: nothing if ok or raises an exception
159 return self
.write(topic
, key
, msg
)