From 0597c071b77a846624fbf4dc03c197029c1c1e59 Mon Sep 17 00:00:00 2001 From: calvinosanc1 Date: Fri, 19 Aug 2022 10:31:19 +0000 Subject: [PATCH] Get VIM certificates from DB Change-Id: I02a71ee4b588274524e139195c3897573a6792cc Signed-off-by: calvinosanc1 --- osm_mon/collector/utils/openstack.py | 84 ++++++++++++++----- osm_mon/core/exceptions.py | 4 + .../unit/collector/utils/test_openstack.py | 82 ++++++++++++++++++ 3 files changed, 151 insertions(+), 19 deletions(-) diff --git a/osm_mon/collector/utils/openstack.py b/osm_mon/collector/utils/openstack.py index 9162f98..89b13d1 100644 --- a/osm_mon/collector/utils/openstack.py +++ b/osm_mon/collector/utils/openstack.py @@ -21,10 +21,16 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import logging +from os import makedirs, path from keystoneauth1 import session from keystoneauth1.identity import v3 +from osm_mon.core.exceptions import CertificateNotCreated + +log = logging.getLogger(__name__) + class OpenstackUtils: @staticmethod @@ -32,22 +38,62 @@ class OpenstackUtils: verify_ssl = True project_domain_name = "Default" user_domain_name = "Default" - if "config" in creds: - vim_config = creds["config"] - if "insecure" in vim_config and vim_config["insecure"]: - verify_ssl = False - if "ca_cert" in vim_config: - verify_ssl = vim_config["ca_cert"] - if "project_domain_name" in vim_config: - project_domain_name = vim_config["project_domain_name"] - if "user_domain_name" in vim_config: - user_domain_name = vim_config["user_domain_name"] - auth = v3.Password( - auth_url=creds["vim_url"], - username=creds["vim_user"], - password=creds["vim_password"], - project_name=creds["vim_tenant_name"], - project_domain_name=project_domain_name, - user_domain_name=user_domain_name, - ) - return session.Session(auth=auth, verify=verify_ssl, timeout=10) + try: + if "config" in creds: + vim_config = creds["config"] + if "insecure" in vim_config and vim_config["insecure"]: + verify_ssl = False + if "ca_cert" in vim_config: + verify_ssl = vim_config["ca_cert"] + elif "ca_cert_content" in vim_config: + vim_config = OpenstackUtils._create_file_cert( + vim_config, creds["_id"] + ) + verify_ssl = vim_config["ca_cert"] + if "project_domain_name" in vim_config: + project_domain_name = vim_config["project_domain_name"] + if "user_domain_name" in vim_config: + user_domain_name = vim_config["user_domain_name"] + auth = v3.Password( + auth_url=creds["vim_url"], + username=creds["vim_user"], + password=creds["vim_password"], + project_name=creds["vim_tenant_name"], + project_domain_name=project_domain_name, + user_domain_name=user_domain_name, + ) + return session.Session(auth=auth, verify=verify_ssl, timeout=10) + except CertificateNotCreated as e: + log.error(e) + + @staticmethod + def _create_file_cert(vim_config: dict, target_id: str) -> dict: + """ + Process vim config, creating vim configuration files as ca_cert + Creates a folder '/app/osm_mon/certs/{target_id}' and the ca_cert inside + :param target_id: vim-id + :param db_vim: Vim dictionary obtained from database + :return: Modified vim configuration dictionary. + """ + + work_dir = f"/app/osm_mon/certs/{target_id}" + file_name = "" + + try: + if vim_config.get("ca_cert_content"): + if not path.isdir(work_dir): + makedirs(work_dir) + + file_name = f"{work_dir}/ca_cert" + with open(file_name, "w") as f: + f.write(vim_config["ca_cert_content"]) + del vim_config["ca_cert_content"] + vim_config["ca_cert"] = file_name + return vim_config + except Exception as e: + if file_name: + raise CertificateNotCreated(f"Error writing to file '{file_name}': {e}") + else: + raise CertificateNotCreated( + f"Error creating the directory '{work_dir}': {e}" + ) diff --git a/osm_mon/core/exceptions.py b/osm_mon/core/exceptions.py index 0aab715..be0f6ed 100644 --- a/osm_mon/core/exceptions.py +++ b/osm_mon/core/exceptions.py @@ -28,3 +28,7 @@ class MetricNotFound(Exception): class VcaDeploymentInfoNotFound(Exception): pass + + +class CertificateNotCreated(Exception): + pass diff --git a/osm_mon/tests/unit/collector/utils/test_openstack.py b/osm_mon/tests/unit/collector/utils/test_openstack.py index bf37076..989c486 100644 --- a/osm_mon/tests/unit/collector/utils/test_openstack.py +++ b/osm_mon/tests/unit/collector/utils/test_openstack.py @@ -23,6 +23,7 @@ from unittest import TestCase, mock from osm_mon.collector.utils.openstack import OpenstackUtils +from osm_mon.core.exceptions import CertificateNotCreated @mock.patch("osm_mon.collector.utils.openstack.session") @@ -70,3 +71,84 @@ class OpenstackUtilsTest(TestCase): mock_session.Session.assert_called_once_with( auth=mock.ANY, verify=True, timeout=10 ) + + @mock.patch("osm_mon.collector.utils.openstack.OpenstackUtils._create_file_cert") + def test_session_with_ca_cert_content(self, mock_create_file_cert, mock_session): + creds = { + "_id": "1234", + "config": {"ca_cert_content": "test"}, + "vim_url": "url", + "vim_user": "user", + "vim_password": "password", + "vim_tenant_name": "tenant_name", + } + mock_create_file_cert.return_value = {"ca_cert": "testfile"} + OpenstackUtils.get_session(creds) + mock_session.Session.assert_called_once_with( + auth=mock.ANY, verify="testfile", timeout=10 + ) + + @mock.patch("osm_mon.collector.utils.openstack.makedirs", return_value="") + @mock.patch("osm_mon.collector.utils.openstack.path") + def test_create_file_cert(self, mock_path, mock_makedirs, mock_session): + vim_config = {"ca_cert_content": "test"} + target_id = "1234" + mock_path.isdir.return_value = False + + with mock.patch("builtins.open", mock.mock_open()) as mocked_file: + OpenstackUtils._create_file_cert(vim_config, target_id) + mock_makedirs.assert_called_once_with("/app/osm_mon/certs/1234") + mocked_file.assert_called_once_with( + f"/app/osm_mon/certs/{target_id}/ca_cert", "w" + ) + assert vim_config["ca_cert"] == f"/app/osm_mon/certs/{target_id}/ca_cert" + + @mock.patch("osm_mon.collector.utils.openstack.makedirs") + @mock.patch("osm_mon.collector.utils.openstack.path") + def test_create_file_cert_exists(self, mock_path, mock_makedirs, mock_session): + vim_config = {"ca_cert_content": "test"} + target_id = "1234" + mock_path.isdir.return_value = True + + with mock.patch("builtins.open", mock.mock_open()) as mocked_file: + OpenstackUtils._create_file_cert(vim_config, target_id) + mock_makedirs.assert_not_called() + mocked_file.assert_called_once_with( + f"/app/osm_mon/certs/{target_id}/ca_cert", "w" + ) + assert vim_config["ca_cert"] == f"/app/osm_mon/certs/{target_id}/ca_cert" + + @mock.patch("osm_mon.collector.utils.openstack.makedirs", side_effect=Exception) + @mock.patch("osm_mon.collector.utils.openstack.path") + def test_create_file_cert_makedirs_except( + self, mock_path, mock_makedirs, mock_session + ): + vim_config = {"ca_cert_content": "test"} + target_id = "1234" + mock_path.isdir.return_value = False + + with mock.patch("builtins.open", mock.mock_open()) as mocked_file: + with self.assertRaises(CertificateNotCreated): + OpenstackUtils._create_file_cert(vim_config, target_id) + mock_makedirs.assert_called_once_with("/app/osm_mon/certs/1234") + mocked_file.assert_not_called() + assert vim_config["ca_cert_content"] == "test" + + @mock.patch("osm_mon.collector.utils.openstack.makedirs", return_value="") + @mock.patch("osm_mon.collector.utils.openstack.path") + def test_create_file_cert_open_excepts( + self, mock_path, mock_makedirs, mock_session + ): + vim_config = {"ca_cert_content": "test"} + target_id = "1234" + mock_path.isdir.return_value = False + + with mock.patch("builtins.open", mock.mock_open()) as mocked_file: + mocked_file.side_effect = Exception + with self.assertRaises(CertificateNotCreated): + OpenstackUtils._create_file_cert(vim_config, target_id) + mock_makedirs.assert_called_once_with("/app/osm_mon/certs/1234") + mocked_file.assert_called_once_with( + f"/app/osm_mon/certs/{target_id}/ca_cert", "w" + ) + assert vim_config["ca_cert_content"] == "test" -- 2.25.1