1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
22 from osm_common
.msgbase
import MsgBase
, MsgException
23 from time
import sleep
24 from http
import HTTPStatus
26 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
36 class MsgLocal(MsgBase
):
38 def __init__(self
, logger_name
='msg'):
39 self
.logger
= logging
.getLogger(logger_name
)
41 # create a different file for each topic
46 def connect(self
, config
):
48 if "logger_name" in config
:
49 self
.logger
= logging
.getLogger(config
["logger_name"])
50 self
.path
= config
["path"]
51 if not self
.path
.endswith("/"):
53 if not os
.path
.exists(self
.path
):
57 except Exception as e
: # TODO refine
58 raise MsgException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
61 for f
in self
.files_read
.values():
64 except Exception: # TODO refine
66 for f
in self
.files_write
.values():
69 except Exception: # TODO refine
72 def write(self
, topic
, key
, msg
):
74 Insert a message into topic
76 :param key: key text to be inserted
77 :param msg: value object to be inserted, can be str, object ...
78 :return: None or raises and exception
81 if topic
not in self
.files_write
:
82 self
.files_write
[topic
] = open(self
.path
+ topic
, "a+")
83 yaml
.safe_dump({key
: msg
}, self
.files_write
[topic
], default_flow_style
=True, width
=20000)
84 self
.files_write
[topic
].flush()
85 except Exception as e
: # TODO refine
86 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
88 def read(self
, topic
, blocks
=True):
90 Read from one or several topics. it is non blocking returning None if nothing is available
91 :param topic: can be str: single topic; or str list: several topics
92 :param blocks: indicates if it should wait and block until a message is present or returns None
93 :return: topic, key, message; or None if blocks==True
96 if isinstance(topic
, (list, tuple)):
99 topic_list
= (topic
, )
101 for single_topic
in topic_list
:
102 if single_topic
not in self
.files_read
:
103 self
.files_read
[single_topic
] = open(self
.path
+ single_topic
, "a+")
104 self
.buffer[single_topic
] = ""
105 self
.buffer[single_topic
] += self
.files_read
[single_topic
].readline()
106 if not self
.buffer[single_topic
].endswith("\n"):
108 msg_dict
= yaml
.load(self
.buffer[single_topic
])
109 self
.buffer[single_topic
] = ""
110 assert len(msg_dict
) == 1
111 for k
, v
in msg_dict
.items():
112 return single_topic
, k
, v
116 except Exception as e
: # TODO refine
117 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
119 async def aioread(self
, topic
, loop
):
121 Asyncio read from one or several topics. It blocks
122 :param topic: can be str: single topic; or str list: several topics
123 :param loop: asyncio loop
124 :return: topic, key, message
128 msg
= self
.read(topic
, blocks
=False)
131 await asyncio
.sleep(2, loop
=loop
)
134 except Exception as e
: # TODO refine
135 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
137 async def aiowrite(self
, topic
, key
, msg
, loop
=None):
139 Asyncio write. It blocks
142 :param msg: message, can be str or yaml
143 :param loop: asyncio loop
144 :return: nothing if ok or raises an exception
146 return self
.write(topic
, key
, msg
)