Feature 10981: added autohealing DAG and updated requirements
[osm/NG-SA.git] / src / osm_ngsa / osm_mon / core / message_bus_client.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 import asyncio
18 from typing import Callable, List
19
20 from osm_common import msgkafka, msglocal
21 from osm_mon.core.config import Config
22
23
24 class MessageBusClient:
25 def __init__(self, config: Config, loop=None):
26 if config.get("message", "driver") == "local":
27 self.msg_bus = msglocal.MsgLocal()
28 elif config.get("message", "driver") == "kafka":
29 self.msg_bus = msgkafka.MsgKafka()
30 else:
31 raise Exception(
32 "Unknown message bug driver {}".format(config.get("section", "driver"))
33 )
34 self.msg_bus.connect(config.get("message"))
35 if not loop:
36 loop = asyncio.get_event_loop()
37 self.loop = loop
38
39 async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
40 """
41 Retrieves messages continuously from bus and executes callback for each message consumed.
42 :param topics: List of message bus topics to consume from.
43 :param callback: Async callback function to be called for each message received.
44 :param kwargs: Keyword arguments to be passed to callback function.
45 :return: None
46 """
47 await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
48
49 async def aiowrite(self, topic: str, key: str, msg: dict):
50 """
51 Writes message to bus.
52 :param topic: Topic to write to.
53 :param key: Key to write to.
54 :param msg: Dictionary containing message to be written.
55 :return: None
56 """
57 await self.msg_bus.aiowrite(topic, key, msg, self.loop)
58
59 async def aioread_once(self, topic: str):
60 """
61 Retrieves last message from bus.
62 :param topic: topic to retrieve message from.
63 :return: tuple(topic, key, message)
64 """
65 result = await self.msg_bus.aioread(topic, self.loop)
66 return result