1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
22 from osm_common
.msgbase
import MsgBase
, MsgException
23 from time
import sleep
24 from http
import HTTPStatus
26 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
30 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
31 access to the same file. e.g. same volume if running with docker.
32 One text line per message is used in yaml format.
36 class MsgLocal(MsgBase
):
38 def __init__(self
, logger_name
='msg', lock
=False):
39 super().__init
__(logger_name
, lock
)
41 # create a different file for each topic
46 def connect(self
, config
):
48 if "logger_name" in config
:
49 self
.logger
= logging
.getLogger(config
["logger_name"])
50 self
.path
= config
["path"]
51 if not self
.path
.endswith("/"):
53 if not os
.path
.exists(self
.path
):
57 except Exception as e
: # TODO refine
58 raise MsgException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
61 for topic
, f
in self
.files_read
.items():
64 self
.files_read
[topic
] = None
65 except Exception: # TODO refine
67 for topic
, f
in self
.files_write
.items():
70 self
.files_write
[topic
] = None
71 except Exception: # TODO refine
74 def write(self
, topic
, key
, msg
):
76 Insert a message into topic
78 :param key: key text to be inserted
79 :param msg: value object to be inserted, can be str, object ...
80 :return: None or raises and exception
84 if topic
not in self
.files_write
:
85 self
.files_write
[topic
] = open(self
.path
+ topic
, "a+")
86 yaml
.safe_dump({key
: msg
}, self
.files_write
[topic
], default_flow_style
=True, width
=20000)
87 self
.files_write
[topic
].flush()
88 except Exception as e
: # TODO refine
89 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
91 def read(self
, topic
, blocks
=True):
93 Read from one or several topics. it is non blocking returning None if nothing is available
94 :param topic: can be str: single topic; or str list: several topics
95 :param blocks: indicates if it should wait and block until a message is present or returns None
96 :return: topic, key, message; or None if blocks==True
99 if isinstance(topic
, (list, tuple)):
102 topic_list
= (topic
, )
104 for single_topic
in topic_list
:
106 if single_topic
not in self
.files_read
:
107 self
.files_read
[single_topic
] = open(self
.path
+ single_topic
, "a+")
108 self
.buffer[single_topic
] = ""
109 self
.buffer[single_topic
] += self
.files_read
[single_topic
].readline()
110 if not self
.buffer[single_topic
].endswith("\n"):
112 msg_dict
= yaml
.load(self
.buffer[single_topic
])
113 self
.buffer[single_topic
] = ""
114 assert len(msg_dict
) == 1
115 for k
, v
in msg_dict
.items():
116 return single_topic
, k
, v
120 except Exception as e
: # TODO refine
121 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
123 async def aioread(self
, topic
, loop
=None, callback
=None, aiocallback
=None, **kwargs
):
125 Asyncio read from one or several topics. It blocks
126 :param topic: can be str: single topic; or str list: several topics
127 :param loop: asyncio loop
128 :return: topic, key, message
132 msg
= self
.read(topic
, blocks
=False)
135 callback(*msg
, **kwargs
)
137 await aiocallback(*msg
, **kwargs
)
140 await asyncio
.sleep(2, loop
=loop
)
143 except Exception as e
: # TODO refine
144 raise MsgException(str(e
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
146 async def aiowrite(self
, topic
, key
, msg
, loop
=None):
148 Asyncio write. It blocks
151 :param msg: message, can be str or yaml
152 :param loop: asyncio loop
153 :return: nothing if ok or raises an exception
155 return self
.write(topic
, key
, msg
)