blob: 89b13d11d25e981d68a7d71c59fc9dc960bdb70b [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2019 Whitestack, LLC
# *************************************************************
# This file is part of OSM Monitoring module
# All Rights Reserved to Whitestack, LLC
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
import logging
from os import makedirs, path
from keystoneauth1 import session
from keystoneauth1.identity import v3
from osm_mon.core.exceptions import CertificateNotCreated
log = logging.getLogger(__name__)
class OpenstackUtils:
@staticmethod
def get_session(creds: dict):
verify_ssl = True
project_domain_name = "Default"
user_domain_name = "Default"
try:
if "config" in creds:
vim_config = creds["config"]
if "insecure" in vim_config and vim_config["insecure"]:
verify_ssl = False
if "ca_cert" in vim_config:
verify_ssl = vim_config["ca_cert"]
elif "ca_cert_content" in vim_config:
vim_config = OpenstackUtils._create_file_cert(
vim_config, creds["_id"]
)
verify_ssl = vim_config["ca_cert"]
if "project_domain_name" in vim_config:
project_domain_name = vim_config["project_domain_name"]
if "user_domain_name" in vim_config:
user_domain_name = vim_config["user_domain_name"]
auth = v3.Password(
auth_url=creds["vim_url"],
username=creds["vim_user"],
password=creds["vim_password"],
project_name=creds["vim_tenant_name"],
project_domain_name=project_domain_name,
user_domain_name=user_domain_name,
)
return session.Session(auth=auth, verify=verify_ssl, timeout=10)
except CertificateNotCreated as e:
log.error(e)
@staticmethod
def _create_file_cert(vim_config: dict, target_id: str) -> dict:
"""
Process vim config, creating vim configuration files as ca_cert
Creates a folder '/app/osm_mon/certs/{target_id}' and the ca_cert inside
:param target_id: vim-id
:param db_vim: Vim dictionary obtained from database
:return: Modified vim configuration dictionary.
"""
work_dir = f"/app/osm_mon/certs/{target_id}"
file_name = ""
try:
if vim_config.get("ca_cert_content"):
if not path.isdir(work_dir):
makedirs(work_dir)
file_name = f"{work_dir}/ca_cert"
with open(file_name, "w") as f:
f.write(vim_config["ca_cert_content"])
del vim_config["ca_cert_content"]
vim_config["ca_cert"] = file_name
return vim_config
except Exception as e:
if file_name:
raise CertificateNotCreated(f"Error writing to file '{file_name}': {e}")
else:
raise CertificateNotCreated(
f"Error creating the directory '{work_dir}': {e}"
)