| # -*- coding: utf-8 -*- |
| |
| # Copyright 2019 Whitestack, LLC |
| # ************************************************************* |
| |
| # This file is part of OSM Monitoring module |
| # All Rights Reserved to Whitestack, LLC |
| |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact: bdiaz@whitestack.com or glavado@whitestack.com |
| ## |
| import logging |
| from os import makedirs, path |
| |
| from keystoneauth1 import session |
| from keystoneauth1.identity import v3 |
| |
| from osm_mon.core.exceptions import CertificateNotCreated |
| |
| log = logging.getLogger(__name__) |
| |
| |
| class OpenstackUtils: |
| @staticmethod |
| def get_session(creds: dict): |
| verify_ssl = True |
| project_domain_name = "Default" |
| user_domain_name = "Default" |
| try: |
| if "config" in creds: |
| vim_config = creds["config"] |
| if "insecure" in vim_config and vim_config["insecure"]: |
| verify_ssl = False |
| if "ca_cert" in vim_config: |
| verify_ssl = vim_config["ca_cert"] |
| elif "ca_cert_content" in vim_config: |
| vim_config = OpenstackUtils._create_file_cert( |
| vim_config, creds["_id"] |
| ) |
| verify_ssl = vim_config["ca_cert"] |
| if "project_domain_name" in vim_config: |
| project_domain_name = vim_config["project_domain_name"] |
| if "user_domain_name" in vim_config: |
| user_domain_name = vim_config["user_domain_name"] |
| auth = v3.Password( |
| auth_url=creds["vim_url"], |
| username=creds["vim_user"], |
| password=creds["vim_password"], |
| project_name=creds["vim_tenant_name"], |
| project_domain_name=project_domain_name, |
| user_domain_name=user_domain_name, |
| ) |
| return session.Session(auth=auth, verify=verify_ssl, timeout=10) |
| except CertificateNotCreated as e: |
| log.error(e) |
| |
| @staticmethod |
| def _create_file_cert(vim_config: dict, target_id: str) -> dict: |
| """ |
| Process vim config, creating vim configuration files as ca_cert |
| Creates a folder '/app/osm_mon/certs/{target_id}' and the ca_cert inside |
| :param target_id: vim-id |
| :param db_vim: Vim dictionary obtained from database |
| :return: Modified vim configuration dictionary. |
| """ |
| |
| work_dir = f"/app/osm_mon/certs/{target_id}" |
| file_name = "" |
| |
| try: |
| if vim_config.get("ca_cert_content"): |
| if not path.isdir(work_dir): |
| makedirs(work_dir) |
| |
| file_name = f"{work_dir}/ca_cert" |
| with open(file_name, "w") as f: |
| f.write(vim_config["ca_cert_content"]) |
| del vim_config["ca_cert_content"] |
| vim_config["ca_cert"] = file_name |
| return vim_config |
| except Exception as e: |
| if file_name: |
| raise CertificateNotCreated(f"Error writing to file '{file_name}': {e}") |
| else: |
| raise CertificateNotCreated( |
| f"Error creating the directory '{work_dir}': {e}" |
| ) |