mock_libjuju.assert_called_once_with(
endpoint="1.1.1.1:17070",
api_proxy=None, # Not needed for k8s charms
- enable_os_upgrade=True,
- apt_mirror=None,
+ model_config={},
username="user",
password="secret",
cacert=mock_base64_to_cacert.return_value,
self.k8s_juju_conn._create_cluster_role = Mock()
self.k8s_juju_conn._create_service_account = Mock()
self.k8s_juju_conn._create_cluster_role_binding = Mock()
+ self.k8s_juju_conn._delete_cluster_role = Mock()
+ self.k8s_juju_conn._delete_service_account = Mock()
+ self.k8s_juju_conn._delete_cluster_role_binding = Mock()
self.k8s_juju_conn._get_secret_data = AsyncMock()
self.k8s_juju_conn._get_secret_data.return_value = ("token", "cacert")
self.k8s_juju_conn.db.get_one.assert_called_once()
self.k8s_juju_conn.db.encrypt_decrypt_fields.assert_called_once()
mock_safe_dump.assert_called_once()
+
+
+class UpdateVcaStatusTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(UpdateVcaStatusTest, self).setUp()
+ self.vcaStatus = {"model": {"applications": {"app": {"actions": {}}}}}
+ self.kdu_name = "kdu_name"
+ self.kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ self.k8s_juju_conn.libjuju.get_executed_actions = AsyncMock()
+ self.k8s_juju_conn.libjuju.get_actions = AsyncMock()
+ self.k8s_juju_conn.libjuju.get_application_configs = AsyncMock()
+
+ def test_success(self):
+ self.loop.run_until_complete(self.k8s_juju_conn.update_vca_status(
+ self.vcaStatus, self.kdu_instance))
+ self.k8s_juju_conn.libjuju.get_executed_actions.assert_called_once()
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once()
+ self.k8s_juju_conn.libjuju.get_application_configs.assert_called_once()
+
+ def test_exception(self):
+ self.k8s_juju_conn.libjuju.get_model.return_value = None
+ self.k8s_juju_conn.libjuju.get_executed_actions.side_effect = Exception()
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(self.k8s_juju_conn.update_vca_status(
+ self.vcaStatus, self.kdu_instance))
+ self.k8s_juju_conn.libjuju.get_executed_actions.assert_not_called()
+ self.k8s_juju_conn.libjuju.get_actions.assert_not_called_once()
+ self.k8s_juju_conn.libjuju.get_application_configs.assert_not_called_once()