Replaces direct use of aiokafka with osm_common message bus in agent and
[osm/POL.git] / osm_policy_module / tests / unit / common / test_message_bus_client.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 from unittest import TestCase, mock
26
27 from osm_common.msgkafka import MsgKafka
28
29 from osm_policy_module.common.message_bus_client import MessageBusClient
30 from osm_policy_module.core.config import Config
31
32
33 class TestMessageBusClient(TestCase):
34
35 def setUp(self):
36 self.config = Config()
37 self.config.set('message', 'driver', 'kafka')
38 self.loop = asyncio.new_event_loop()
39
40 @mock.patch.object(MsgKafka, 'aioread')
41 def test_aioread(self, aioread):
42 async def mock_callback():
43 pass
44
45 future = asyncio.Future(loop=self.loop)
46 future.set_result('mock')
47 aioread.return_value = future
48 msg_bus = MessageBusClient(self.config, loop=self.loop)
49 topic = 'test_topic'
50 self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback))
51 aioread.assert_called_with(['test_topic'], self.loop, aiocallback=mock_callback)
52
53 @mock.patch.object(MsgKafka, 'aiowrite')
54 def test_aiowrite(self, aiowrite):
55 future = asyncio.Future(loop=self.loop)
56 future.set_result('mock')
57 aiowrite.return_value = future
58 msg_bus = MessageBusClient(self.config, loop=self.loop)
59 topic = 'test_topic'
60 key = 'test_key'
61 msg = {'test': 'test_msg'}
62 self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg))
63 aiowrite.assert_called_with(topic, key, msg, self.loop)
64
65 @mock.patch.object(MsgKafka, 'aioread')
66 def test_aioread_once(self, aioread):
67 future = asyncio.Future(loop=self.loop)
68 future.set_result('mock')
69 aioread.return_value = future
70 msg_bus = MessageBusClient(self.config, loop=self.loop)
71 topic = 'test_topic'
72 self.loop.run_until_complete(msg_bus.aioread_once(topic))
73 aioread.assert_called_with('test_topic', self.loop)