Update to Python 3.10 and Ubuntu 22.04
[osm/MON.git] / osm_mon / collector / utils / openstack.py
index 3a2b1b9..89b13d1 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import logging
+from os import makedirs, path
 
 from keystoneauth1 import session
 from keystoneauth1.identity import v3
 
+from osm_mon.core.exceptions import CertificateNotCreated
+
+log = logging.getLogger(__name__)
 
-class OpenstackUtils:
 
+class OpenstackUtils:
     @staticmethod
     def get_session(creds: dict):
         verify_ssl = True
-        project_domain_name = 'Default'
-        user_domain_name = 'Default'
-        if 'config' in creds:
-            vim_config = creds['config']
-            verify_ssl = False if 'insecure' in vim_config and vim_config['insecure'] else True
-            if 'project_domain_name' in vim_config:
-                project_domain_name = vim_config['project_domain_name']
-            if 'user_domain_name' in vim_config:
-                user_domain_name = vim_config['user_domain_name']
-        auth = v3.Password(auth_url=creds['vim_url'],
-                           username=creds['vim_user'],
-                           password=creds['vim_password'],
-                           project_name=creds['vim_tenant_name'],
-                           project_domain_name=project_domain_name,
-                           user_domain_name=user_domain_name)
-        return session.Session(auth=auth, verify=verify_ssl)
+        project_domain_name = "Default"
+        user_domain_name = "Default"
+        try:
+            if "config" in creds:
+                vim_config = creds["config"]
+                if "insecure" in vim_config and vim_config["insecure"]:
+                    verify_ssl = False
+                if "ca_cert" in vim_config:
+                    verify_ssl = vim_config["ca_cert"]
+                elif "ca_cert_content" in vim_config:
+                    vim_config = OpenstackUtils._create_file_cert(
+                        vim_config, creds["_id"]
+                    )
+                    verify_ssl = vim_config["ca_cert"]
+                if "project_domain_name" in vim_config:
+                    project_domain_name = vim_config["project_domain_name"]
+                if "user_domain_name" in vim_config:
+                    user_domain_name = vim_config["user_domain_name"]
+            auth = v3.Password(
+                auth_url=creds["vim_url"],
+                username=creds["vim_user"],
+                password=creds["vim_password"],
+                project_name=creds["vim_tenant_name"],
+                project_domain_name=project_domain_name,
+                user_domain_name=user_domain_name,
+            )
+            return session.Session(auth=auth, verify=verify_ssl, timeout=10)
+        except CertificateNotCreated as e:
+            log.error(e)
+
+    @staticmethod
+    def _create_file_cert(vim_config: dict, target_id: str) -> dict:
+        """
+        Process vim config, creating vim configuration files as ca_cert
+        Creates a folder '/app/osm_mon/certs/{target_id}' and the ca_cert inside
+        :param target_id: vim-id
+        :param db_vim: Vim dictionary obtained from database
+        :return: Modified vim configuration dictionary.
+        """
+
+        work_dir = f"/app/osm_mon/certs/{target_id}"
+        file_name = ""
+
+        try:
+            if vim_config.get("ca_cert_content"):
+                if not path.isdir(work_dir):
+                    makedirs(work_dir)
+
+                file_name = f"{work_dir}/ca_cert"
+                with open(file_name, "w") as f:
+                    f.write(vim_config["ca_cert_content"])
+                    del vim_config["ca_cert_content"]
+                    vim_config["ca_cert"] = file_name
+                return vim_config
+        except Exception as e:
+            if file_name:
+                raise CertificateNotCreated(f"Error writing to file '{file_name}': {e}")
+            else:
+                raise CertificateNotCreated(
+                    f"Error creating the directory '{work_dir}': {e}"
+                )