1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from typing
import Callable
, List
19 from osm_common
import msgkafka
, msglocal
20 from osm_mon
.core
.config
import Config
23 class MessageBusClient
:
24 def __init__(self
, config
: Config
):
25 if config
.get("message", "driver") == "local":
26 self
.msg_bus
= msglocal
.MsgLocal()
27 elif config
.get("message", "driver") == "kafka":
28 self
.msg_bus
= msgkafka
.MsgKafka()
31 "Unknown message bug driver {}".format(config
.get("section", "driver"))
33 self
.msg_bus
.connect(config
.get("message"))
35 async def aioread(self
, topics
: List
[str], callback
: Callable
= None, **kwargs
):
37 Retrieves messages continuously from bus and executes callback for each message consumed.
38 :param topics: List of message bus topics to consume from.
39 :param callback: Async callback function to be called for each message received.
40 :param kwargs: Keyword arguments to be passed to callback function.
43 await self
.msg_bus
.aioread(topics
, aiocallback
=callback
, **kwargs
)
45 async def aiowrite(self
, topic
: str, key
: str, msg
: dict):
47 Writes message to bus.
48 :param topic: Topic to write to.
49 :param key: Key to write to.
50 :param msg: Dictionary containing message to be written.
53 await self
.msg_bus
.aiowrite(topic
, key
, msg
)
55 async def aioread_once(self
, topic
: str):
57 Retrieves last message from bus.
58 :param topic: topic to retrieve message from.
59 :return: tuple(topic, key, message)
61 result
= await self
.msg_bus
.aioread(topic
)