1 |
|
# -*- coding: utf-8 -*- |
2 |
|
|
3 |
|
# Copyright 2018 Whitestack, LLC |
4 |
|
# ************************************************************* |
5 |
|
|
6 |
|
# This file is part of OSM Monitoring module |
7 |
|
# All Rights Reserved to Whitestack, LLC |
8 |
|
|
9 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
10 |
|
# not use this file except in compliance with the License. You may obtain |
11 |
|
# a copy of the License at |
12 |
|
|
13 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
14 |
|
|
15 |
|
# Unless required by applicable law or agreed to in writing, software |
16 |
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
17 |
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
18 |
|
# License for the specific language governing permissions and limitations |
19 |
|
# under the License. |
20 |
|
|
21 |
|
# For those usages not covered by the Apache License, Version 2.0 please |
22 |
|
# contact: bdiaz@whitestack.com or glavado@whitestack.com |
23 |
|
## |
24 |
1 |
import asyncio |
25 |
1 |
from typing import List, Callable |
26 |
|
|
27 |
1 |
from osm_common import msgkafka, msglocal |
28 |
|
|
29 |
1 |
from osm_policy_module.core.config import Config |
30 |
|
|
31 |
|
|
32 |
1 |
class MessageBusClient: |
33 |
1 |
def __init__(self, config: Config, loop=None): |
34 |
1 |
if config.get('message', 'driver') == "local": |
35 |
0 |
self.msg_bus = msglocal.MsgLocal() |
36 |
1 |
elif config.get('message', 'driver') == "kafka": |
37 |
1 |
self.msg_bus = msgkafka.MsgKafka() |
38 |
|
else: |
39 |
0 |
raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver'))) |
40 |
1 |
self.msg_bus.connect(config.get('message')) |
41 |
1 |
if not loop: |
42 |
1 |
loop = asyncio.get_event_loop() |
43 |
1 |
self.loop = loop |
44 |
|
|
45 |
1 |
async def aioread(self, topics: List[str], callback: Callable = None, **kwargs): |
46 |
|
""" |
47 |
|
Retrieves messages continuously from bus and executes callback for each message consumed. |
48 |
|
:param topics: List of message bus topics to consume from. |
49 |
|
:param callback: Async callback function to be called for each message received. |
50 |
|
:param kwargs: Keyword arguments to be passed to callback function. |
51 |
|
:return: None |
52 |
|
""" |
53 |
1 |
await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs) |
54 |
|
|
55 |
1 |
async def aiowrite(self, topic: str, key: str, msg: dict): |
56 |
|
""" |
57 |
|
Writes message to bus. |
58 |
|
:param topic: Topic to write to. |
59 |
|
:param key: Key to write to. |
60 |
|
:param msg: Dictionary containing message to be written. |
61 |
|
:return: None |
62 |
|
""" |
63 |
1 |
await self.msg_bus.aiowrite(topic, key, msg, self.loop) |
64 |
|
|
65 |
1 |
async def aioread_once(self, topic: str): |
66 |
|
""" |
67 |
|
Retrieves last message from bus. |
68 |
|
:param topic: topic to retrieve message from. |
69 |
|
:return: tuple(topic, key, message) |
70 |
|
""" |
71 |
1 |
result = await self.msg_bus.aioread(topic, self.loop) |
72 |
1 |
return result |