1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
25 from unittest
import TestCase
, mock
27 from osm_common
.msgkafka
import MsgKafka
29 from osm_policy_module
.common
.message_bus_client
import MessageBusClient
30 from osm_policy_module
.core
.config
import Config
33 class TestMessageBusClient(TestCase
):
36 self
.config
= Config()
37 self
.config
.set('message', 'driver', 'kafka')
38 self
.loop
= asyncio
.new_event_loop()
40 @mock.patch
.object(MsgKafka
, 'aioread')
41 def test_aioread(self
, aioread
):
42 async def mock_callback():
45 future
= asyncio
.Future(loop
=self
.loop
)
46 future
.set_result('mock')
47 aioread
.return_value
= future
48 msg_bus
= MessageBusClient(self
.config
, loop
=self
.loop
)
50 self
.loop
.run_until_complete(msg_bus
.aioread([topic
], mock_callback
))
51 aioread
.assert_called_with(['test_topic'], self
.loop
, aiocallback
=mock_callback
)
53 @mock.patch
.object(MsgKafka
, 'aiowrite')
54 def test_aiowrite(self
, aiowrite
):
55 future
= asyncio
.Future(loop
=self
.loop
)
56 future
.set_result('mock')
57 aiowrite
.return_value
= future
58 msg_bus
= MessageBusClient(self
.config
, loop
=self
.loop
)
61 msg
= {'test': 'test_msg'}
62 self
.loop
.run_until_complete(msg_bus
.aiowrite(topic
, key
, msg
))
63 aiowrite
.assert_called_with(topic
, key
, msg
, self
.loop
)
65 @mock.patch
.object(MsgKafka
, 'aioread')
66 def test_aioread_once(self
, aioread
):
67 future
= asyncio
.Future(loop
=self
.loop
)
68 future
.set_result('mock')
69 aioread
.return_value
= future
70 msg_bus
= MessageBusClient(self
.config
, loop
=self
.loop
)
72 self
.loop
.run_until_complete(msg_bus
.aioread_once(topic
))
73 aioread
.assert_called_with('test_topic', self
.loop
)