def setUp(self):
self.config = Config()
self.config.set("message", "driver", "kafka")
- self.loop = asyncio.new_event_loop()
@mock.patch.object(MsgKafka, "aioread")
def test_aioread(self, aioread):
async def mock_callback():
pass
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
aioread.return_value = future
- msg_bus = MessageBusClient(self.config, loop=self.loop)
+ msg_bus = MessageBusClient(self.config)
topic = "test_topic"
- self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback))
- aioread.assert_called_with(["test_topic"], self.loop, aiocallback=mock_callback)
+ asyncio.run(msg_bus.aioread([topic], mock_callback))
+ aioread.assert_called_with(["test_topic"], aiocallback=mock_callback)
@mock.patch.object(MsgKafka, "aiowrite")
def test_aiowrite(self, aiowrite):
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
aiowrite.return_value = future
- msg_bus = MessageBusClient(self.config, loop=self.loop)
+ msg_bus = MessageBusClient(self.config)
topic = "test_topic"
key = "test_key"
msg = {"test": "test_msg"}
- self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg))
- aiowrite.assert_called_with(topic, key, msg, self.loop)
+ asyncio.run(msg_bus.aiowrite(topic, key, msg))
+ aiowrite.assert_called_with(topic, key, msg)
@mock.patch.object(MsgKafka, "aioread")
def test_aioread_once(self, aioread):
- future = asyncio.Future(loop=self.loop)
+ future = asyncio.Future(loop=asyncio.new_event_loop())
future.set_result("mock")
aioread.return_value = future
- msg_bus = MessageBusClient(self.config, loop=self.loop)
+ msg_bus = MessageBusClient(self.config)
topic = "test_topic"
- self.loop.run_until_complete(msg_bus.aioread_once(topic))
- aioread.assert_called_with("test_topic", self.loop)
+ asyncio.run(msg_bus.aioread_once(topic))
+ aioread.assert_called_with("test_topic")