1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
18 from typing
import Callable
, List
20 from osm_common
import msgkafka
, msglocal
21 from osm_mon
.core
.config
import Config
24 class MessageBusClient
:
25 def __init__(self
, config
: Config
, loop
=None):
26 if config
.get("message", "driver") == "local":
27 self
.msg_bus
= msglocal
.MsgLocal()
28 elif config
.get("message", "driver") == "kafka":
29 self
.msg_bus
= msgkafka
.MsgKafka()
32 "Unknown message bug driver {}".format(config
.get("section", "driver"))
34 self
.msg_bus
.connect(config
.get("message"))
36 loop
= asyncio
.get_event_loop()
39 async def aioread(self
, topics
: List
[str], callback
: Callable
= None, **kwargs
):
41 Retrieves messages continuously from bus and executes callback for each message consumed.
42 :param topics: List of message bus topics to consume from.
43 :param callback: Async callback function to be called for each message received.
44 :param kwargs: Keyword arguments to be passed to callback function.
47 await self
.msg_bus
.aioread(topics
, self
.loop
, aiocallback
=callback
, **kwargs
)
49 async def aiowrite(self
, topic
: str, key
: str, msg
: dict):
51 Writes message to bus.
52 :param topic: Topic to write to.
53 :param key: Key to write to.
54 :param msg: Dictionary containing message to be written.
57 await self
.msg_bus
.aiowrite(topic
, key
, msg
, self
.loop
)
59 async def aioread_once(self
, topic
: str):
61 Retrieves last message from bus.
62 :param topic: topic to retrieve message from.
63 :return: tuple(topic, key, message)
65 result
= await self
.msg_bus
.aioread(topic
, self
.loop
)