Get VIM certificates from DB 67/12467/9
authorcalvinosanc1 <guillermo.calvino@canonical.com>
Fri, 19 Aug 2022 10:31:19 +0000 (10:31 +0000)
committercalvinosanc1 <guillermo.calvino@canonical.com>
Mon, 3 Oct 2022 16:30:55 +0000 (16:30 +0000)
Change-Id: I02a71ee4b588274524e139195c3897573a6792cc
Signed-off-by: calvinosanc1 <guillermo.calvino@canonical.com>
osm_mon/collector/utils/openstack.py
osm_mon/core/exceptions.py
osm_mon/tests/unit/collector/utils/test_openstack.py

index 9162f98..89b13d1 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import logging
+from os import makedirs, path
 
 from keystoneauth1 import session
 from keystoneauth1.identity import v3
 
+from osm_mon.core.exceptions import CertificateNotCreated
+
+log = logging.getLogger(__name__)
+
 
 class OpenstackUtils:
     @staticmethod
@@ -32,22 +38,62 @@ class OpenstackUtils:
         verify_ssl = True
         project_domain_name = "Default"
         user_domain_name = "Default"
-        if "config" in creds:
-            vim_config = creds["config"]
-            if "insecure" in vim_config and vim_config["insecure"]:
-                verify_ssl = False
-            if "ca_cert" in vim_config:
-                verify_ssl = vim_config["ca_cert"]
-            if "project_domain_name" in vim_config:
-                project_domain_name = vim_config["project_domain_name"]
-            if "user_domain_name" in vim_config:
-                user_domain_name = vim_config["user_domain_name"]
-        auth = v3.Password(
-            auth_url=creds["vim_url"],
-            username=creds["vim_user"],
-            password=creds["vim_password"],
-            project_name=creds["vim_tenant_name"],
-            project_domain_name=project_domain_name,
-            user_domain_name=user_domain_name,
-        )
-        return session.Session(auth=auth, verify=verify_ssl, timeout=10)
+        try:
+            if "config" in creds:
+                vim_config = creds["config"]
+                if "insecure" in vim_config and vim_config["insecure"]:
+                    verify_ssl = False
+                if "ca_cert" in vim_config:
+                    verify_ssl = vim_config["ca_cert"]
+                elif "ca_cert_content" in vim_config:
+                    vim_config = OpenstackUtils._create_file_cert(
+                        vim_config, creds["_id"]
+                    )
+                    verify_ssl = vim_config["ca_cert"]
+                if "project_domain_name" in vim_config:
+                    project_domain_name = vim_config["project_domain_name"]
+                if "user_domain_name" in vim_config:
+                    user_domain_name = vim_config["user_domain_name"]
+            auth = v3.Password(
+                auth_url=creds["vim_url"],
+                username=creds["vim_user"],
+                password=creds["vim_password"],
+                project_name=creds["vim_tenant_name"],
+                project_domain_name=project_domain_name,
+                user_domain_name=user_domain_name,
+            )
+            return session.Session(auth=auth, verify=verify_ssl, timeout=10)
+        except CertificateNotCreated as e:
+            log.error(e)
+
+    @staticmethod
+    def _create_file_cert(vim_config: dict, target_id: str) -> dict:
+        """
+        Process vim config, creating vim configuration files as ca_cert
+        Creates a folder '/app/osm_mon/certs/{target_id}' and the ca_cert inside
+        :param target_id: vim-id
+        :param db_vim: Vim dictionary obtained from database
+        :return: Modified vim configuration dictionary.
+        """
+
+        work_dir = f"/app/osm_mon/certs/{target_id}"
+        file_name = ""
+
+        try:
+            if vim_config.get("ca_cert_content"):
+                if not path.isdir(work_dir):
+                    makedirs(work_dir)
+
+                file_name = f"{work_dir}/ca_cert"
+                with open(file_name, "w") as f:
+                    f.write(vim_config["ca_cert_content"])
+                    del vim_config["ca_cert_content"]
+                    vim_config["ca_cert"] = file_name
+                return vim_config
+        except Exception as e:
+            if file_name:
+                raise CertificateNotCreated(f"Error writing to file '{file_name}': {e}")
+            else:
+                raise CertificateNotCreated(
+                    f"Error creating the directory '{work_dir}': {e}"
+                )
index 0aab715..be0f6ed 100644 (file)
@@ -28,3 +28,7 @@ class MetricNotFound(Exception):
 
 class VcaDeploymentInfoNotFound(Exception):
     pass
+
+
+class CertificateNotCreated(Exception):
+    pass
index bf37076..989c486 100644 (file)
@@ -23,6 +23,7 @@
 from unittest import TestCase, mock
 
 from osm_mon.collector.utils.openstack import OpenstackUtils
+from osm_mon.core.exceptions import CertificateNotCreated
 
 
 @mock.patch("osm_mon.collector.utils.openstack.session")
@@ -70,3 +71,84 @@ class OpenstackUtilsTest(TestCase):
         mock_session.Session.assert_called_once_with(
             auth=mock.ANY, verify=True, timeout=10
         )
+
+    @mock.patch("osm_mon.collector.utils.openstack.OpenstackUtils._create_file_cert")
+    def test_session_with_ca_cert_content(self, mock_create_file_cert, mock_session):
+        creds = {
+            "_id": "1234",
+            "config": {"ca_cert_content": "test"},
+            "vim_url": "url",
+            "vim_user": "user",
+            "vim_password": "password",
+            "vim_tenant_name": "tenant_name",
+        }
+        mock_create_file_cert.return_value = {"ca_cert": "testfile"}
+        OpenstackUtils.get_session(creds)
+        mock_session.Session.assert_called_once_with(
+            auth=mock.ANY, verify="testfile", timeout=10
+        )
+
+    @mock.patch("osm_mon.collector.utils.openstack.makedirs", return_value="")
+    @mock.patch("osm_mon.collector.utils.openstack.path")
+    def test_create_file_cert(self, mock_path, mock_makedirs, mock_session):
+        vim_config = {"ca_cert_content": "test"}
+        target_id = "1234"
+        mock_path.isdir.return_value = False
+
+        with mock.patch("builtins.open", mock.mock_open()) as mocked_file:
+            OpenstackUtils._create_file_cert(vim_config, target_id)
+            mock_makedirs.assert_called_once_with("/app/osm_mon/certs/1234")
+            mocked_file.assert_called_once_with(
+                f"/app/osm_mon/certs/{target_id}/ca_cert", "w"
+            )
+            assert vim_config["ca_cert"] == f"/app/osm_mon/certs/{target_id}/ca_cert"
+
+    @mock.patch("osm_mon.collector.utils.openstack.makedirs")
+    @mock.patch("osm_mon.collector.utils.openstack.path")
+    def test_create_file_cert_exists(self, mock_path, mock_makedirs, mock_session):
+        vim_config = {"ca_cert_content": "test"}
+        target_id = "1234"
+        mock_path.isdir.return_value = True
+
+        with mock.patch("builtins.open", mock.mock_open()) as mocked_file:
+            OpenstackUtils._create_file_cert(vim_config, target_id)
+            mock_makedirs.assert_not_called()
+            mocked_file.assert_called_once_with(
+                f"/app/osm_mon/certs/{target_id}/ca_cert", "w"
+            )
+            assert vim_config["ca_cert"] == f"/app/osm_mon/certs/{target_id}/ca_cert"
+
+    @mock.patch("osm_mon.collector.utils.openstack.makedirs", side_effect=Exception)
+    @mock.patch("osm_mon.collector.utils.openstack.path")
+    def test_create_file_cert_makedirs_except(
+        self, mock_path, mock_makedirs, mock_session
+    ):
+        vim_config = {"ca_cert_content": "test"}
+        target_id = "1234"
+        mock_path.isdir.return_value = False
+
+        with mock.patch("builtins.open", mock.mock_open()) as mocked_file:
+            with self.assertRaises(CertificateNotCreated):
+                OpenstackUtils._create_file_cert(vim_config, target_id)
+            mock_makedirs.assert_called_once_with("/app/osm_mon/certs/1234")
+            mocked_file.assert_not_called()
+            assert vim_config["ca_cert_content"] == "test"
+
+    @mock.patch("osm_mon.collector.utils.openstack.makedirs", return_value="")
+    @mock.patch("osm_mon.collector.utils.openstack.path")
+    def test_create_file_cert_open_excepts(
+        self, mock_path, mock_makedirs, mock_session
+    ):
+        vim_config = {"ca_cert_content": "test"}
+        target_id = "1234"
+        mock_path.isdir.return_value = False
+
+        with mock.patch("builtins.open", mock.mock_open()) as mocked_file:
+            mocked_file.side_effect = Exception
+            with self.assertRaises(CertificateNotCreated):
+                OpenstackUtils._create_file_cert(vim_config, target_id)
+            mock_makedirs.assert_called_once_with("/app/osm_mon/certs/1234")
+            mocked_file.assert_called_once_with(
+                f"/app/osm_mon/certs/{target_id}/ca_cert", "w"
+            )
+            assert vim_config["ca_cert_content"] == "test"