def test_model_watcher(self, allwatcher):
tests = Deltas
allwatcher.return_value = FakeWatcher()
+ n2vc = AsyncMock()
for test in tests:
with self.assertRaises(asyncio.TimeoutError):
allwatcher.return_value.delta_to_return = [test.delta]
test.filter.entity_type,
timeout=0,
db_dict={"something"},
- n2vc=self.n2vc,
+ n2vc=n2vc,
+ vca_id=None,
)
)
- self.assertEqual(self.n2vc.last_written_values, test.db.data)
- self.n2vc.last_written_values = None
+ n2vc.write_app_status_to_db.assert_called()
@mock.patch("n2vc.juju_watcher.asyncio.wait")
def test_wait_for(self, wait):
self.assertTrue(isinstance(value, str))
+@asynctest.mock.patch("asyncio.sleep")
class WaitForModelTest(asynctest.TestCase):
@asynctest.mock.patch("juju.client.connector.Connector.connect")
def setUp(self, mock_connect=None):
self.model = Model()
@asynctest.mock.patch("juju.model.Model.block_until")
- def test_wait_for_model(self, mock_block_until):
+ def test_wait_for_model(self, mock_block_until, mock_sleep):
self.loop.run_until_complete(
JujuModelWatcher.wait_for_model(self.model, timeout=None)
)
@asynctest.mock.patch("asyncio.ensure_future")
@asynctest.mock.patch("asyncio.wait")
- def test_wait_for_model_exception(self, mock_wait, mock_ensure_future):
+ def test_wait_for_model_exception(self, mock_wait, mock_ensure_future, mock_sleep):
task = Mock()
mock_ensure_future.return_value = task
mock_wait.side_effect = Exception