Feature 9751: Centralized VCA for KNFs
- Use libjuju.py for the communication with VCA
- Add the k8s_cloud as an external cloud to the VCA
- Add unit tests
Change-Id: Id55bfada3957f35e13cef7b4bfcc7acb72452df0
Signed-off-by: David Garcia <david.garcia@canonical.com>
diff --git a/n2vc/tests/unit/test_juju_watcher.py b/n2vc/tests/unit/test_juju_watcher.py
index 593ff0d..41065bf 100644
--- a/n2vc/tests/unit/test_juju_watcher.py
+++ b/n2vc/tests/unit/test_juju_watcher.py
@@ -140,3 +140,29 @@
value = status(application)
mock_derive_status.assert_called_once()
self.assertTrue(isinstance(value, str))
+
+
+class WaitForModelTest(asynctest.TestCase):
+ @asynctest.mock.patch("juju.client.connector.Connector.connect")
+ def setUp(self, mock_connect=None):
+ self.loop = asyncio.new_event_loop()
+ self.model = Model()
+
+ @asynctest.mock.patch("juju.model.Model.block_until")
+ def test_wait_for_model(self, mock_block_until):
+ self.loop.run_until_complete(
+ JujuModelWatcher.wait_for_model(self.model, timeout=None)
+ )
+ mock_block_until.assert_called()
+
+ @asynctest.mock.patch("asyncio.ensure_future")
+ @asynctest.mock.patch("asyncio.wait")
+ def test_wait_for_model_exception(self, mock_wait, mock_ensure_future):
+ task = Mock()
+ mock_ensure_future.return_value = task
+ mock_wait.side_effect = Exception
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ JujuModelWatcher.wait_for_model(self.model, timeout=None)
+ )
+ task.cancel.assert_called()
diff --git a/n2vc/tests/unit/test_k8s_juju_conn.py b/n2vc/tests/unit/test_k8s_juju_conn.py
new file mode 100644
index 0000000..50e827e
--- /dev/null
+++ b/n2vc/tests/unit/test_k8s_juju_conn.py
@@ -0,0 +1,778 @@
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import logging
+import asynctest
+from n2vc.k8s_juju_conn import K8sJujuConnector
+from osm_common import fslocal
+from .utils import kubeconfig, FakeModel, FakeFileWrapper
+from n2vc.exceptions import (
+ MethodNotImplemented,
+ K8sException,
+ N2VCBadArgumentsException,
+)
+from unittest.mock import Mock
+from .utils import AsyncMock
+
+
+class K8sJujuConnTestCase(asynctest.TestCase):
+ @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
+ @asynctest.mock.patch("juju.client.connector.Connector.connect")
+ @asynctest.mock.patch("juju.controller.Controller.connection")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.Libjuju")
+ def setUp(
+ self,
+ mock_libjuju=None,
+ mock_base64_to_cacert=None,
+ mock_connection=None,
+ mock_connect=None,
+ mock_update_endpoints=None,
+ ):
+ self.loop = asyncio.get_event_loop()
+ mock_libjuju.return_value = AsyncMock()
+ db = Mock()
+ vca_config = {
+ "secret": "secret",
+ "api_proxy": "api_proxy",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+
+ logging.disable(logging.CRITICAL)
+
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=vca_config,
+ on_update_db=None,
+ )
+
+
+class K8sJujuConnInitSuccessTestCase(asynctest.TestCase):
+ def setUp(
+ self,
+ ):
+ logging.disable(logging.CRITICAL)
+
+ @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
+ @asynctest.mock.patch("juju.client.connector.Connector.connect")
+ @asynctest.mock.patch("juju.controller.Controller.connection")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert")
+ @asynctest.mock.patch("n2vc.libjuju.Libjuju.__init__")
+ def test_success(
+ self,
+ mock_libjuju=None,
+ mock_base64_to_cacert=None,
+ mock_connection=None,
+ mock_connect=None,
+ mock_update_endpoints=None,
+ ):
+ mock_libjuju.return_value = None
+ loop = asyncio.get_event_loop()
+ log = logging.getLogger()
+ db = Mock()
+ vca_config = {
+ "secret": "secret",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+ K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=log,
+ loop=self.loop,
+ vca_config=vca_config,
+ on_update_db=None,
+ )
+
+ mock_libjuju.assert_called_once_with(
+ endpoint="1.1.1.1:17070",
+ api_proxy=None, # Not needed for k8s charms
+ enable_os_upgrade=True,
+ apt_mirror=None,
+ username="user",
+ password="secret",
+ cacert=mock_base64_to_cacert.return_value,
+ loop=loop,
+ log=log,
+ db=db,
+ )
+
+
+class K8sJujuConnectorInitFailureTestCase(asynctest.TestCase):
+ def setUp(
+ self,
+ ):
+ self.loop = asyncio.get_event_loop()
+ logging.disable(logging.CRITICAL)
+ self.vca_config = {
+ "secret": "secret",
+ "api_proxy": "api_proxy",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+
+ def test_missing_vca_config_host(self):
+ db = Mock()
+ self.vca_config.pop("host")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_user(self):
+ db = Mock()
+ self.vca_config.pop("user")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_secret(self):
+ db = Mock()
+ self.vca_config.pop("secret")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_ca_cert(self):
+ db = Mock()
+ self.vca_config.pop("ca_cert")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+
+class InitEnvTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InitEnvTest, self).setUp()
+ self.k8s_juju_conn.libjuju.add_k8s = AsyncMock()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_with_cluster_uuid(
+ self,
+ mock_get_default_storage_class,
+ ):
+ reuse_cluster_uuid = "uuid"
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(
+ k8s_creds=kubeconfig, reuse_cluster_uuid=reuse_cluster_uuid
+ )
+ )
+
+ self.assertTrue(created)
+ self.assertEqual(uuid, reuse_cluster_uuid)
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_with_no_cluster_uuid(self, mock_get_default_storage_class):
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
+ )
+
+ self.assertTrue(created)
+ self.assertTrue(isinstance(uuid, str))
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_init_env_exception(self, mock_get_default_storage_class):
+ self.k8s_juju_conn.libjuju.add_k8s.side_effect = Exception()
+ created = None
+ uuid = None
+ with self.assertRaises(Exception):
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
+ )
+
+ self.assertIsNone(created)
+ self.assertIsNone(uuid)
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+
+class NotImplementedTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(NotImplementedTest, self).setUp()
+
+ def test_repo_add(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_add("", ""))
+
+ def test_repo_list(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_list())
+
+ def test_repo_remove(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_remove(""))
+
+ def test_synchronize_repos(self):
+ self.assertIsNone(
+ self.loop.run_until_complete(self.k8s_juju_conn.synchronize_repos("", ""))
+ )
+
+ def test_upgrade(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.upgrade("", ""))
+
+ def test_rollback(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.rollback("", ""))
+
+ def test_get_namespace(self):
+ self.assertIsNone(self.k8s_juju_conn.get_namespace(""))
+
+ def test_instances_list(self):
+ res = self.loop.run_until_complete(self.k8s_juju_conn.instances_list(""))
+ self.assertEqual(res, [])
+
+
+class ResetTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(ResetTest, self).setUp()
+ self.k8s_juju_conn.libjuju.remove_cloud = AsyncMock()
+
+ def test_success(self):
+ removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
+ self.assertTrue(removed)
+ self.k8s_juju_conn.libjuju.remove_cloud.assert_called_once()
+
+ def test_exception(self):
+ removed = None
+ self.k8s_juju_conn.libjuju.remove_cloud.side_effect = Exception()
+ with self.assertRaises(Exception):
+ removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
+ self.assertIsNone(removed)
+ self.k8s_juju_conn.libjuju.remove_cloud.assert_called_once()
+
+
+@asynctest.mock.patch("os.chdir")
+class InstallTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InstallTest, self).setUp()
+ self.db_dict = {"filter": {"_id": "id"}}
+ self.local_bundle = "bundle"
+ self.cs_bundle = "cs:bundle"
+ self.http_bundle = "https://example.com/bundle.yaml"
+ self.kdu_name = "kdu_name"
+ self.cluster_uuid = "cluster"
+ self.k8s_juju_conn.libjuju.add_model = AsyncMock()
+ self.k8s_juju_conn.libjuju.deploy = AsyncMock()
+
+ def test_success_local(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.local_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.assertEqual(mock_chdir.call_count, 2)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ "local:{}".format(self.local_bundle),
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_cs(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_http(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.http_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.http_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_not_kdu_name(self, mock_chdir):
+ expected_kdu_instance = "id"
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_missing_db_dict(self, mock_chdir):
+ kdu_instance = None
+ with self.assertRaises(K8sException):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ timeout=1800,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_not_called()
+ self.k8s_juju_conn.libjuju.deploy.assert_not_called()
+
+ @asynctest.mock.patch("os.getcwd")
+ def test_getcwd_exception(self, mock_getcwd, mock_chdir):
+ mock_getcwd.side_effect = FileNotFoundError()
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_missing_bundle(self, mock_chdir):
+ kdu_instance = None
+ with self.assertRaises(K8sException):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ "",
+ atomic=True,
+ kdu_name=self.kdu_name,
+ timeout=1800,
+ db_dict=self.db_dict,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_not_called()
+ self.k8s_juju_conn.libjuju.deploy.assert_not_called()
+
+ def test_missing_exception(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = None
+ self.k8s_juju_conn.libjuju.deploy.side_effect = Exception()
+ with self.assertRaises(Exception):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.local_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ "local:{}".format(self.local_bundle),
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+
+class UninstallTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(UninstallTest, self).setUp()
+ self.k8s_juju_conn.libjuju.destroy_model = AsyncMock()
+
+ def test_success(self):
+ destroyed = self.loop.run_until_complete(
+ self.k8s_juju_conn.uninstall("cluster_uuid", "model_name")
+ )
+ self.assertTrue(destroyed)
+ self.k8s_juju_conn.libjuju.destroy_model.assert_called_once()
+
+ def test_exception(self):
+ destroyed = None
+ self.k8s_juju_conn.libjuju.destroy_model.side_effect = Exception()
+ with self.assertRaises(Exception):
+ destroyed = self.loop.run_until_complete(
+ self.k8s_juju_conn.uninstall("cluster_uuid", "model_name")
+ )
+ self.assertIsNone(destroyed)
+ self.k8s_juju_conn.libjuju.destroy_model.assert_called_once()
+
+
+class ExecPrimitivesTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(ExecPrimitivesTest, self).setUp()
+ self.action_name = "touch"
+ self.application_name = "myapp"
+ self.model_name = "model"
+ self.k8s_juju_conn.libjuju.get_actions = AsyncMock()
+ self.k8s_juju_conn.libjuju.execute_action = AsyncMock()
+
+ def test_success(self):
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (
+ "success",
+ "completed",
+ )
+
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertEqual(output, "success")
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+ def test_exception(self):
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.side_effect = Exception()
+ output = None
+
+ with self.assertRaises(Exception):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+ def test_missing_application_name_in_params(self):
+ params = {}
+ output = None
+
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_not_called()
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_params(self):
+ output = None
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_not_called()
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_action(self):
+ output = None
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (
+ "success",
+ "completed",
+ )
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, "non-existing-action", params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_not_completed(self):
+ output = None
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (None, "failed")
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+
+class InspectKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InspectKduTest, self).setUp()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.path.exists")
+ def test_existing_file(self, mock_exists, mock_open):
+ mock_exists.return_value = True
+ content = """{
+ 'description': 'test bundle',
+ 'bundle': 'kubernetes',
+ 'applications': {'app':{ }, 'app2': { }}
+ }"""
+ mock_open.return_value = FakeFileWrapper(content=content)
+ kdu = self.loop.run_until_complete(self.k8s_juju_conn.inspect_kdu("model"))
+ self.assertEqual(kdu, {"app": {}, "app2": {}})
+ mock_exists.assert_called_once()
+ mock_open.assert_called_once()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.path.exists")
+ def test_not_existing_file(self, mock_exists, mock_open):
+ kdu = None
+ mock_exists.return_value = False
+ with self.assertRaises(K8sException):
+ kdu = self.loop.run_until_complete(self.k8s_juju_conn.inspect_kdu("model"))
+ self.assertEqual(kdu, None)
+ mock_exists.assert_called_once_with("model")
+ mock_open.assert_not_called()
+
+
+class HelpKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(HelpKduTest, self).setUp()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.listdir")
+ def test_existing_file(self, mock_listdir, mock_open):
+ content = "Readme file content"
+ mock_open.return_value = FakeFileWrapper(content=content)
+ for file in ["README.md", "README.txt", "README"]:
+ mock_listdir.return_value = [file]
+ help = self.loop.run_until_complete(
+ self.k8s_juju_conn.help_kdu("kdu_instance")
+ )
+ self.assertEqual(help, content)
+
+ self.assertEqual(mock_listdir.call_count, 3)
+ self.assertEqual(mock_open.call_count, 3)
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.listdir")
+ def test_not_existing_file(self, mock_listdir, mock_open):
+ for file in ["src/charm.py", "tox.ini", "requirements.txt"]:
+ mock_listdir.return_value = [file]
+ help = self.loop.run_until_complete(
+ self.k8s_juju_conn.help_kdu("kdu_instance")
+ )
+ self.assertEqual(help, None)
+
+ self.assertEqual(mock_listdir.call_count, 3)
+ self.assertEqual(mock_open.call_count, 0)
+
+
+class StatusKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(StatusKduTest, self).setUp()
+ self.k8s_juju_conn.libjuju.get_model_status = AsyncMock()
+
+ def test_success(self):
+ applications = {"app": {"status": {"status": "active"}}}
+ model = FakeModel(applications=applications)
+ self.k8s_juju_conn.libjuju.get_model_status.return_value = model
+ status = self.loop.run_until_complete(
+ self.k8s_juju_conn.status_kdu("cluster", "kdu_instance")
+ )
+ self.assertEqual(status, {"app": {"status": "active"}})
+ self.k8s_juju_conn.libjuju.get_model_status.assert_called_once()
+
+ def test_exception(self):
+ self.k8s_juju_conn.libjuju.get_model_status.side_effect = Exception()
+ status = None
+ with self.assertRaises(Exception):
+ status = self.loop.run_until_complete(
+ self.k8s_juju_conn.status_kdu("cluster", "kdu_instance")
+ )
+ self.assertIsNone(status)
+ self.k8s_juju_conn.libjuju.get_model_status.assert_called_once()
+
+
+class GetServicesTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetServicesTest, self).setUp()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
+ def test_success(self, mock_get_credentials, mock_get_services):
+ mock_get_credentials.return_value = kubeconfig
+ self.loop.run_until_complete(self.k8s_juju_conn.get_services("", "", ""))
+ mock_get_credentials.assert_called_once()
+ mock_get_services.assert_called_once()
+
+
+class GetServiceTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetServiceTest, self).setUp()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
+ def test_success(self, mock_get_credentials, mock_get_services):
+ mock_get_credentials.return_value = kubeconfig
+ self.loop.run_until_complete(self.k8s_juju_conn.get_service("", "", ""))
+ mock_get_credentials.assert_called_once()
+ mock_get_services.assert_called_once()
+
+
+class GetCredentialsTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetCredentialsTest, self).setUp()
+
+ @asynctest.mock.patch("yaml.safe_dump")
+ def test_success(self, mock_safe_dump):
+ self.k8s_juju_conn.db.get_one.return_value = {
+ "_id": "id",
+ "credentials": "credentials",
+ "schema_version": "2",
+ }
+ self.k8s_juju_conn.get_credentials("cluster_uuid")
+ self.k8s_juju_conn.db.get_one.assert_called_once()
+ self.k8s_juju_conn.db.encrypt_decrypt_fields.assert_called_once()
+ mock_safe_dump.assert_called_once()
diff --git a/n2vc/tests/unit/test_libjuju.py b/n2vc/tests/unit/test_libjuju.py
index 76bbebe..454b87f 100644
--- a/n2vc/tests/unit/test_libjuju.py
+++ b/n2vc/tests/unit/test_libjuju.py
@@ -457,6 +457,71 @@
@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_model")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
+@asynctest.mock.patch("n2vc.juju_watcher.JujuModelWatcher.wait_for_model")
+@asynctest.mock.patch("juju.model.Model.deploy")
+class DeployTest(LibjujuTestCase):
+ def setUp(self):
+ super(DeployTest, self).setUp()
+
+ def test_deploy(
+ self,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_get_model.return_value = juju.model.Model()
+ self.loop.run_until_complete(
+ self.libjuju.deploy("cs:osm", "model", wait=True, timeout=0)
+ )
+ mock_deploy.assert_called_once()
+ mock_wait_for_model.assert_called_once()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_deploy_no_wait(
+ self,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_get_model.return_value = juju.model.Model()
+ self.loop.run_until_complete(
+ self.libjuju.deploy("cs:osm", "model", wait=False, timeout=0)
+ )
+ mock_deploy.assert_called_once()
+ mock_wait_for_model.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_deploy_exception(
+ self,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_deploy.side_effect = Exception()
+ mock_get_model.return_value = juju.model.Model()
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(self.libjuju.deploy("cs:osm", "model"))
+ mock_deploy.assert_called_once()
+ mock_wait_for_model.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
@asynctest.mock.patch(
"juju.model.Model.applications", new_callable=asynctest.PropertyMock
)
diff --git a/n2vc/tests/unit/utils.py b/n2vc/tests/unit/utils.py
index d960c70..ac86cdd 100644
--- a/n2vc/tests/unit/utils.py
+++ b/n2vc/tests/unit/utils.py
@@ -19,6 +19,50 @@
from unittest.mock import MagicMock
+kubeconfig = """apiVersion: v1
+clusters:
+- cluster:
+ certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1\
+ JSURBVENDQWVtZ0F3SUJBZ0lKQUxjMk9xVUpwcnVCTUEwR0NTcUdTSWIzRFFFQk\
+ N3VUFNQmN4RlRBVEJnTlYKQkFNTURERXdMakUxTWk0eE9ETXVNVEFlRncweU1EQ\
+ TVNVEV4TkRJeU16VmFGdzB6TURBNU1Ea3hOREl5TXpWYQpNQmN4RlRBVEJnTlZC\
+ QU1NRERFd0xqRTFNaTR4T0RNdU1UQ0NBU0l3RFFZSktvWklodmNOQVFFQkJRQUR\
+ nZ0VQCkFEQ0NBUW9DZ2dFQkFNV0tyQkdxWlJRT0VONDExR2RESmY2ckZWRDcvMU\
+ xHNlZMWjNhd1BRdHBhRTRxdVdyNisKWjExTWwra2kwVEU1cGZFV3dKenVUZXlCU\
+ XVkUEpnYm1QTjF1VWROdGRiNlpocHEzeC9oT0hCMVJLNC9iSlNFUgpiZ0dITmN6\
+ MzR6SHRaZ1dwb2NPTXpPOW9oRUdhMTZUaDhmQWVxYU1CQTJRaklmeUFlaVp3VHJ\
+ nZ3BrY2dBMUlOCjBvQkdqSURnSGVoSU5tbGZOOURkQ3hNN1FNTmtSbzRXdE13bF\
+ JSRWZ4QnFiVkNpZGFjbVhhb1VPUjJPeFVmQWEKN1orSUU1TmN5ZFQ1TGovazdwd\
+ XZCVkdIa0JQWnE0TmlBa3R4aXd5NVB5R29GTk9mT0NrV2I2VnBzVzNhTlNJeAo4\
+ aXBITkc3enV3elc1TGQ5TkhQYWpRckZwdFZBSHpJNWNhRUNBd0VBQWFOUU1FNHd\
+ IUVlEVlIwT0JCWUVGQ1dVCkFaTXNaeE13L1k1OGlXMGZJWVAzcDdTYk1COEdBMV\
+ VkSXdRWU1CYUFGQ1dVQVpNc1p4TXcvWTU4aVcwZklZUDMKcDdTYk1Bd0dBMVVkR\
+ XdRRk1BTUJBZjh3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUJaMlYxMWowRzhh\
+ Z1Z6Twp2YWtKTGt4UGZ0UE1NMFFOaVRzZmV6RzlicnBkdEVLSjFyalFCblNXYTN\
+ WbThWRGZTYkhLQUNXaGh0OEhzcXhtCmNzdVQyOWUyaGZBNHVIOUxMdy9MVG5EdE\
+ tJSjZ6aWFzaTM5RGh3UGwwaExuamJRMjk4VVo5TGovVlpnZGlqemIKWnVPdHlpT\
+ nVOS0E2Nmd0dGxXcWZRQ2hkbnJ5MlZUbjBjblR5dU9UalByYWdOdXJMdlVwL3Nl\
+ eURhZmsxNXJ4egozcmlYZldiQnRhUUk1dnM0ekFKU2xneUg2RnpiZStoTUhlUzF\
+ mM2ppb3dJV0lRR2NNbHpGT1RpMm1xWFRybEJYCnh1WmpLZlpOcndjQVNGbk9qYV\
+ BWeFQ1ODJ4WWhtTm8wR3J2MlZEck51bDlSYkgvK3lNS2J5NEhkOFRvVThMU2kKY\
+ 3Uxajh3cz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
+ server: https://192.168.0.22:16443
+ name: microk8s-cluster
+contexts:
+- context:
+ cluster: microk8s-cluster
+ user: admin
+ name: microk8s
+current-context: microk8s
+kind: Config
+preferences: {}
+users:
+- name: admin
+ user:
+ token: clhkRExRem5Xd1dCdnFEVXdvRGtDRGE5b1F3WnNrZk5qeHFCOU10bHBZRT0K
+"""
+
+
async def AsyncMockFunc():
await asyncio.sleep(1)
@@ -91,6 +135,15 @@
status = "ready"
+class FakeModel:
+ def __init__(self, applications: dict = {}):
+ self._applications = applications
+
+ @property
+ def applications(self):
+ return self._applications
+
+
class FakeUnit(MagicMock):
async def is_leader_from_status(self):
return True
@@ -100,7 +153,6 @@
class FakeApplication(AsyncMock):
-
async def set_config(self, config):
pass
@@ -113,6 +165,25 @@
units = [FakeUnit(), FakeUnit()]
+class FakeFile:
+ def __init__(self, content: str = ""):
+ self.content = content
+
+ def read(self, size: int = -1):
+ return self.content
+
+
+class FakeFileWrapper:
+ def __init__(self, content: str = ""):
+ self.file = FakeFile(content=content)
+
+ def __enter__(self):
+ return self.file
+
+ def __exit__(self, type, value, traceback):
+ pass
+
+
FAKE_DELTA_MACHINE_PENDING = Dict(
{
"deltas": ["machine", "change", {}],