# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import logging
+from os import makedirs, path
from keystoneauth1 import session
from keystoneauth1.identity import v3
+from osm_mon.core.exceptions import CertificateNotCreated
+
+log = logging.getLogger(__name__)
-class OpenstackUtils:
+class OpenstackUtils:
@staticmethod
def get_session(creds: dict):
- verify_ssl = False if 'insecure' in creds['config'] and creds['config']['insecure'] else True
- vim_config = creds['config']
- project_domain_name = 'Default'
- user_domain_name = 'Default'
- if 'project_domain_name' in vim_config:
- project_domain_name = vim_config['project_domain_name']
- if 'user_domain_name' in vim_config:
- user_domain_name = vim_config['user_domain_name']
- auth = v3.Password(auth_url=creds['vim_url'],
- username=creds['vim_user'],
- password=creds['vim_password'],
- project_name=creds['vim_tenant_name'],
- project_domain_name=project_domain_name,
- user_domain_name=user_domain_name)
- return session.Session(auth=auth, verify=verify_ssl)
+ verify_ssl = True
+ project_domain_name = "Default"
+ user_domain_name = "Default"
+ try:
+ if "config" in creds:
+ vim_config = creds["config"]
+ if "insecure" in vim_config and vim_config["insecure"]:
+ verify_ssl = False
+ if "ca_cert" in vim_config:
+ verify_ssl = vim_config["ca_cert"]
+ elif "ca_cert_content" in vim_config:
+ vim_config = OpenstackUtils._create_file_cert(
+ vim_config, creds["_id"]
+ )
+ verify_ssl = vim_config["ca_cert"]
+ if "project_domain_name" in vim_config:
+ project_domain_name = vim_config["project_domain_name"]
+ if "user_domain_name" in vim_config:
+ user_domain_name = vim_config["user_domain_name"]
+ auth = v3.Password(
+ auth_url=creds["vim_url"],
+ username=creds["vim_user"],
+ password=creds["vim_password"],
+ project_name=creds["vim_tenant_name"],
+ project_domain_name=project_domain_name,
+ user_domain_name=user_domain_name,
+ )
+ return session.Session(auth=auth, verify=verify_ssl, timeout=10)
+ except CertificateNotCreated as e:
+ log.error(e)
+
+ @staticmethod
+ def _create_file_cert(vim_config: dict, target_id: str) -> dict:
+ """
+ Process vim config, creating vim configuration files as ca_cert
+ Creates a folder '/app/osm_mon/certs/{target_id}' and the ca_cert inside
+ :param target_id: vim-id
+ :param db_vim: Vim dictionary obtained from database
+ :return: Modified vim configuration dictionary.
+ """
+
+ work_dir = f"/app/osm_mon/certs/{target_id}"
+ file_name = ""
+
+ try:
+ if vim_config.get("ca_cert_content"):
+ if not path.isdir(work_dir):
+ makedirs(work_dir)
+
+ file_name = f"{work_dir}/ca_cert"
+ with open(file_name, "w") as f:
+ f.write(vim_config["ca_cert_content"])
+ del vim_config["ca_cert_content"]
+ vim_config["ca_cert"] = file_name
+ return vim_config
+ except Exception as e:
+ if file_name:
+ raise CertificateNotCreated(f"Error writing to file '{file_name}': {e}")
+ else:
+ raise CertificateNotCreated(
+ f"Error creating the directory '{work_dir}': {e}"
+ )