Collect consumption metrics from Azure in DAG
[osm/NG-SA.git] / src / osm_ngsa / osm_mon / core / message_bus_client.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from typing import Callable, List
18
19 from osm_common import msgkafka, msglocal
20 from osm_mon.core.config import Config
21
22
23 class MessageBusClient:
24 def __init__(self, config: Config):
25 if config.get("message", "driver") == "local":
26 self.msg_bus = msglocal.MsgLocal()
27 elif config.get("message", "driver") == "kafka":
28 self.msg_bus = msgkafka.MsgKafka()
29 else:
30 raise Exception(
31 "Unknown message bug driver {}".format(config.get("section", "driver"))
32 )
33 self.msg_bus.connect(config.get("message"))
34
35 async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
36 """
37 Retrieves messages continuously from bus and executes callback for each message consumed.
38 :param topics: List of message bus topics to consume from.
39 :param callback: Async callback function to be called for each message received.
40 :param kwargs: Keyword arguments to be passed to callback function.
41 :return: None
42 """
43 await self.msg_bus.aioread(topics, aiocallback=callback, **kwargs)
44
45 async def aiowrite(self, topic: str, key: str, msg: dict):
46 """
47 Writes message to bus.
48 :param topic: Topic to write to.
49 :param key: Key to write to.
50 :param msg: Dictionary containing message to be written.
51 :return: None
52 """
53 await self.msg_bus.aiowrite(topic, key, msg)
54
55 async def aioread_once(self, topic: str):
56 """
57 Retrieves last message from bus.
58 :param topic: topic to retrieve message from.
59 :return: tuple(topic, key, message)
60 """
61 result = await self.msg_bus.aioread(topic)
62 return result