Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/N2VC.git] / n2vc / tests / unit / test_libjuju.py
index 7bea6b3..36110f6 100644 (file)
@@ -24,14 +24,15 @@ from .utils import FakeN2VC, FakeMachine, FakeApplication
 from n2vc.libjuju import Libjuju
 from n2vc.exceptions import (
     JujuControllerFailedConnecting,
-    JujuModelAlreadyExists,
     JujuMachineNotFound,
     JujuApplicationNotFound,
     JujuActionNotFound,
     JujuApplicationExists,
     JujuInvalidK8sConfiguration,
     JujuLeaderUnitNotFound,
+    JujuError,
 )
+from n2vc.k8s_juju_conn import generate_rbac_id
 
 
 class LibjujuTestCase(asynctest.TestCase):
@@ -66,8 +67,6 @@ class LibjujuTestCase(asynctest.TestCase):
             log=None,
             db={"get_one": []},
             n2vc=n2vc,
-            apt_mirror="192.168.0.100",
-            enable_os_upgrade=True,
         )
         logging.disable(logging.CRITICAL)
         loop.run_until_complete(self.libjuju.disconnect())
@@ -105,8 +104,6 @@ class LibjujuInitTestCase(asynctest.TestCase):
             log=None,
             db={"get_one": []},
             n2vc=self.n2vc,
-            apt_mirror="192.168.0.100",
-            enable_os_upgrade=True,
         )
         mock_update_endpoints.assert_called_once_with([self.endpoint])
         mock__get_api_endpoints_db.assert_called_once()
@@ -128,8 +125,6 @@ class LibjujuInitTestCase(asynctest.TestCase):
             log=None,
             db={"get_one": []},
             n2vc=self.n2vc,
-            apt_mirror="192.168.0.100",
-            enable_os_upgrade=True,
         )
         mock_update_endpoints.assert_not_called()
         mock__get_api_endpoints_db.assert_called_once()
@@ -151,8 +146,6 @@ class LibjujuInitTestCase(asynctest.TestCase):
             log=None,
             db={"get_one": []},
             n2vc=self.n2vc,
-            apt_mirror="192.168.0.100",
-            enable_os_upgrade=True,
         )
         mock_update_endpoints.assert_called_once_with([self.endpoint])
         mock__get_api_endpoints_db.assert_called_once()
@@ -237,10 +230,10 @@ class AddModelTest(LibjujuTestCase):
     ):
         mock_model_exists.return_value = True
 
-        with self.assertRaises(JujuModelAlreadyExists):
-            self.loop.run_until_complete(
-                self.libjuju.add_model("existing_model", "cloud")
-            )
+        # This should not raise an exception
+        self.loop.run_until_complete(
+            self.libjuju.add_model("existing_model", "cloud")
+        )
 
         mock_disconnect_controller.assert_called()
 
@@ -266,6 +259,118 @@ class AddModelTest(LibjujuTestCase):
         mock_disconnect_model.assert_called()
 
 
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
+@asynctest.mock.patch(
+    "juju.model.Model.applications", new_callable=asynctest.PropertyMock
+)
+@asynctest.mock.patch("juju.model.Model.get_action_status")
+@asynctest.mock.patch("juju.model.Model.get_action_output")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_actions")
+class GetExecutedActionsTest(LibjujuTestCase):
+    def setUp(self):
+        super(GetExecutedActionsTest, self).setUp()
+
+    def test_exception(
+        self,
+        mock_get_actions,
+        mock_get_action_output,
+        mock_get_action_status,
+        mock_applications,
+        mock_disconnect_controller,
+        mock_disconnect_model,
+        mock_get_model,
+        mock_get_controller,
+    ):
+        mock_get_model.return_value = None
+        with self.assertRaises(JujuError):
+            self.loop.run_until_complete(self.libjuju.get_executed_actions("model"))
+
+        mock_get_controller.assert_called_once()
+        mock_disconnect_controller.assert_called_once()
+        mock_get_model.assert_called_once()
+        mock_disconnect_model.assert_not_called()
+
+    def test_success(
+        self,
+        mock_get_actions,
+        mock_get_action_output,
+        mock_get_action_status,
+        mock_applications,
+        mock_disconnect_controller,
+        mock_disconnect_model,
+        mock_get_model,
+        mock_get_controller,
+    ):
+        mock_get_model.return_value = juju.model.Model()
+        mock_applications.return_value = {"existing_app"}
+        mock_get_actions.return_value = {"action_name": "description"}
+        mock_get_action_status.return_value = {"id": "status"}
+        mock_get_action_output.return_value = {"output": "completed"}
+
+        executed_actions = self.loop.run_until_complete(
+            self.libjuju.get_executed_actions("model")
+        )
+        expected_result = [{'id': 'id', 'action': 'action_name',
+                           'status': 'status', 'output': 'completed'}]
+        self.assertListEqual(expected_result, executed_actions)
+        self.assertIsInstance(executed_actions, list)
+
+        mock_get_controller.assert_called_once()
+        mock_get_model.assert_called_once()
+        mock_disconnect_controller.assert_called_once()
+        mock_disconnect_model.assert_called_once()
+
+
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju._get_application")
+class GetApplicationConfigsTest(LibjujuTestCase):
+    def setUp(self):
+        super(GetApplicationConfigsTest, self).setUp()
+
+    def test_exception(
+        self,
+        mock_get_application,
+        mock_disconnect_controller,
+        mock_disconnect_model,
+        mock_get_model,
+        mock_get_controller,
+    ):
+        mock_get_model.return_value = None
+        with self.assertRaises(JujuError):
+            self.loop.run_until_complete(
+                self.libjuju.get_application_configs("model", "app"))
+
+        mock_get_controller.assert_called_once()
+        mock_disconnect_controller.assert_called_once()
+        mock_get_model.assert_called_once()
+        mock_disconnect_model.assert_not_called()
+
+    def test_success(
+        self,
+        mock_get_application,
+        mock_disconnect_controller,
+        mock_disconnect_model,
+        mock_get_model,
+        mock_get_controller,
+    ):
+        mock_get_application.return_value = FakeApplication()
+        application_configs = self.loop.run_until_complete(self.libjuju
+                                                           .get_application_configs("model", "app"))
+
+        self.assertEqual(application_configs, ["app_config"])
+
+        mock_get_controller.assert_called_once()
+        mock_get_model.assert_called_once()
+        mock_disconnect_controller.assert_called_once()
+        mock_disconnect_model.assert_called_once()
+
+
 @asynctest.mock.patch("juju.controller.Controller.get_model")
 class GetModelTest(LibjujuTestCase):
     def setUp(self):
@@ -1431,45 +1536,56 @@ class ConsumeTest(LibjujuTestCase):
 class AddK8sTest(LibjujuTestCase):
     def setUp(self):
         super(AddK8sTest, self).setUp()
-        self.configuration = kubernetes.client.configuration.Configuration()
+        name = "cloud"
+        rbac_id = generate_rbac_id()
+        token = "token"
+        client_cert_data = "cert"
+        configuration = kubernetes.client.configuration.Configuration()
+        storage_class = "storage_class"
+        credential_name = name
+
+        self._add_k8s_args = {
+            "name": name,
+            "rbac_id": rbac_id,
+            "token": token,
+            "client_cert_data": client_cert_data,
+            "configuration": configuration,
+            "storage_class": storage_class,
+            "credential_name": credential_name,
+        }
 
     def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
-        self.loop.run_until_complete(
-            self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
-        )
+        self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_called_once()
         mock_get_k8s_cloud_credential.assert_called_once()
 
     def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
         mock_add_cloud.side_effect = Exception()
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_called_once()
         mock_get_k8s_cloud_credential.assert_called_once()
 
     def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
+        self._add_k8s_args["name"] = ""
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("", self.configuration, "storage_class")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
     def test_add_k8s_missing_storage_name(
         self, mock_add_cloud, mock_get_k8s_cloud_credential
     ):
+        self._add_k8s_args["storage_class"] = ""
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("cloud", self.configuration, "")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
     def test_add_k8s_missing_configuration_keys(
         self, mock_add_cloud, mock_get_k8s_cloud_credential
     ):
+        self._add_k8s_args["configuration"] = None
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, ""))
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
 
@@ -1596,6 +1712,8 @@ class RemoveCloudTest(LibjujuTestCase):
 class GetK8sCloudCredentials(LibjujuTestCase):
     def setUp(self):
         super(GetK8sCloudCredentials, self).setUp()
+        self.cert_data = "cert"
+        self.token = "token"
 
     @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
     def test_not_supported(self, mock_exception, mock_configuration):
@@ -1605,8 +1723,14 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
         exception_raised = False
+        self.token = None
+        self.cert_data = None
         try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            _ = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
         except JujuInvalidK8sConfiguration as e:
             exception_raised = True
             self.assertEqual(
@@ -1621,7 +1745,13 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.ssl_ca_cert = None
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        self.token = None
+        self.cert_data = None
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
+        )
         self.assertEqual(
             credential,
             juju.client._definitions.CloudCredential(
@@ -1629,14 +1759,44 @@ class GetK8sCloudCredentials(LibjujuTestCase):
             ),
         )
 
+    def test_user_pass_with_cert(self, mock_configuration):
+        mock_configuration.username = "admin"
+        mock_configuration.password = "admin"
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        self.token = None
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
+        )
+        self.assertEqual(
+            credential,
+            juju.client._definitions.CloudCredential(
+                attrs={
+                    "ClientCertificateData": self.cert_data,
+                    "username": "admin",
+                    "password": "admin",
+                },
+                auth_type="userpasswithcert",
+            ),
+        )
+
     def test_user_no_pass(self, mock_configuration):
         mock_configuration.username = "admin"
         mock_configuration.password = ""
         mock_configuration.ssl_ca_cert = None
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
+        self.token = None
+        self.cert_data = None
         with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
-            credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            credential = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
             self.assertEqual(
                 credential,
                 juju.client._definitions.CloudCredential(
@@ -1647,28 +1807,6 @@ class GetK8sCloudCredentials(LibjujuTestCase):
                 "credential for user admin has empty password"
             )
 
-    def test_user_pass_with_cert(self, mock_configuration):
-        mock_configuration.username = "admin"
-        mock_configuration.password = "admin"
-        ssl_ca_cert = tempfile.NamedTemporaryFile()
-        with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
-            ssl_ca_cert_file.write("cacert")
-        mock_configuration.ssl_ca_cert = ssl_ca_cert.name
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        self.assertEqual(
-            credential,
-            juju.client._definitions.CloudCredential(
-                attrs={
-                    "username": "admin",
-                    "password": "admin",
-                    "ClientCertificateData": "cacert",
-                },
-                auth_type="userpasswithcert",
-            ),
-        )
-
     def test_cert(self, mock_configuration):
         mock_configuration.username = ""
         mock_configuration.password = ""
@@ -1679,72 +1817,67 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.ssl_ca_cert = ssl_ca_cert.name
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
+        )
         self.assertEqual(
             credential,
             juju.client._definitions.CloudCredential(
-                attrs={"ClientCertificateData": "cacert", "Token": "Token"},
+                attrs={"ClientCertificateData": self.cert_data, "Token": self.token},
                 auth_type="certificate",
             ),
         )
 
-    def test_oauth2(self, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        mock_configuration.api_key = {"authorization": "Bearer Token"}
-        key = tempfile.NamedTemporaryFile()
-        with open(key.name, "w") as key_file:
-            key_file.write("key")
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = key.name
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        self.assertEqual(
-            credential,
-            juju.client._definitions.CloudCredential(
-                attrs={"ClientKeyData": "key", "Token": "Token"},
-                auth_type="oauth2",
-            ),
-        )
-
-    @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
-    def test_oauth2_missing_token(self, mock_exception, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        key = tempfile.NamedTemporaryFile()
-        with open(key.name, "w") as key_file:
-            key_file.write("key")
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = key.name
-        exception_raised = False
-        try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        except JujuInvalidK8sConfiguration as e:
-            exception_raised = True
-            self.assertEqual(
-                e.message,
-                "missing token for auth type oauth2",
-            )
-        self.assertTrue(exception_raised)
-
-    def test_unknown_api_key(self, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = None
-        exception_raised = False
-        try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        except JujuInvalidK8sConfiguration as e:
-            exception_raised = True
-            self.assertEqual(
-                e.message,
-                "unknown format of api_key",
-            )
-        self.assertTrue(exception_raised)
+    # TODO: Fix this test when oauth authentication is supported
+    # def test_oauth2(self, mock_configuration):
+    #     mock_configuration.username = ""
+    #     mock_configuration.password = ""
+    #     mock_configuration.api_key = {"authorization": "Bearer Token"}
+    #     key = tempfile.NamedTemporaryFile()
+    #     with open(key.name, "w") as key_file:
+    #         key_file.write("key")
+    #     mock_configuration.ssl_ca_cert = None
+    #     mock_configuration.cert_file = None
+    #     mock_configuration.key_file = key.name
+    #     credential = self.libjuju.get_k8s_cloud_credential(
+    #         mock_configuration,
+    #         self.cert_data,
+    #         self.token,
+    #     )
+    #     self.assertEqual(
+    #         credential,
+    #         juju.client._definitions.CloudCredential(
+    #             attrs={"ClientKeyData": "key", "Token": "Token"},
+    #             auth_type="oauth2",
+    #         ),
+    #     )
+
+    # @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+    # def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+    #     mock_configuration.username = ""
+    #     mock_configuration.password = ""
+    #     key = tempfile.NamedTemporaryFile()
+    #     with open(key.name, "w") as key_file:
+    #         key_file.write("key")
+    #     mock_configuration.ssl_ca_cert = None
+    #     mock_configuration.cert_file = None
+    #     mock_configuration.key_file = key.name
+    #     exception_raised = False
+    #     try:
+    #         _ = self.libjuju.get_k8s_cloud_credential(
+    #             mock_configuration,
+    #             self.cert_data,
+    #             self.token,
+    #         )
+    #     except JujuInvalidK8sConfiguration as e:
+    #         exception_raised = True
+    #         self.assertEqual(
+    #             e.message,
+    #             "missing token for auth type oauth2",
+    #         )
+    #     self.assertTrue(exception_raised)
 
     def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
         mock_configuration.username = "admin"
@@ -1755,7 +1888,11 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.key_file = None
         exception_raised = False
         try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            _ = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
         except JujuInvalidK8sConfiguration as e:
             exception_raised = True
             self.assertEqual(