blob: 6afadbf042d65215403817381bee847d5d3238fe [file] [log] [blame]
##
# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
# This file is part of OSM
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: nfvlabs@tid.es
##
import os
import yaml
from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
from n2vc.exceptions import K8sException
class K8sHelm3Connector(K8sHelmBaseConnector):
"""
####################################################################################
################################### P U B L I C ####################################
####################################################################################
"""
def __init__(
self,
fs: object,
db: object,
kubectl_command: str = "/usr/bin/kubectl",
helm_command: str = "/usr/bin/helm3",
log: object = None,
on_update_db=None,
vca_config: dict = None,
):
"""
Initializes helm connector for helm v3
:param fs: file system for kubernetes and helm configuration
:param db: database object to write current operation status
:param kubectl_command: path to kubectl executable
:param helm_command: path to helm executable
:param log: logger
:param on_update_db: callback called when k8s connector updates database
"""
# parent class
K8sHelmBaseConnector.__init__(self,
db=db,
log=log,
fs=fs,
kubectl_command=kubectl_command,
helm_command=helm_command,
on_update_db=on_update_db,
vca_config=vca_config)
self.log.info("K8S Helm3 connector initialized")
async def install(
self,
cluster_uuid: str,
kdu_model: str,
kdu_instance: str,
atomic: bool = True,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
):
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
# sync local dir
self.fs.sync(from_path=cluster_id)
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
# for helm3 if namespace does not exist must create it
if namespace and namespace != "kube-system":
namespaces = await self._get_namespaces(cluster_id)
if namespace not in namespaces:
await self._create_namespace(cluster_id, namespace)
await self._install_impl(
cluster_id,
kdu_model,
paths,
env,
kdu_instance,
atomic=atomic,
timeout=timeout,
params=params,
db_dict=db_dict,
kdu_name=kdu_name,
namespace=namespace,
)
# sync fs
self.fs.reverse_sync(from_path=cluster_id)
self.log.debug("Returning kdu_instance {}".format(kdu_instance))
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
return await self._exec_inspect_comand(
inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
)
"""
####################################################################################
################################### P R I V A T E ##################################
####################################################################################
"""
def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
"""
Creates and returns base cluster and kube dirs and returns them.
Also created helm3 dirs according to new directory specification, paths are
returned and also environment variables that must be provided to execute commands
Helm 3 directory specification uses XDG categories for variable support:
- Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
- Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
- Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
The variables assigned for this paths are:
(In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
$HELM_PATH_DATA but looking and helm env the variable names are different)
- Cache: $HELM_CACHE_HOME
- Config: $HELM_CONFIG_HOME
- Data: $HELM_DATA_HOME
- helm kubeconfig: $KUBECONFIG
:param cluster_name: cluster_name
:return: Dictionary with config_paths and dictionary with helm environment variables
"""
base = self.fs.path
if base.endswith("/") or base.endswith("\\"):
base = base[:-1]
# base dir for cluster
cluster_dir = base + "/" + cluster_name
# kube dir
kube_dir = cluster_dir + "/" + ".kube"
if create_if_not_exist and not os.path.exists(kube_dir):
self.log.debug("Creating dir {}".format(kube_dir))
os.makedirs(kube_dir)
helm_path_cache = cluster_dir + "/.cache/helm"
if create_if_not_exist and not os.path.exists(helm_path_cache):
self.log.debug("Creating dir {}".format(helm_path_cache))
os.makedirs(helm_path_cache)
helm_path_config = cluster_dir + "/.config/helm"
if create_if_not_exist and not os.path.exists(helm_path_config):
self.log.debug("Creating dir {}".format(helm_path_config))
os.makedirs(helm_path_config)
helm_path_data = cluster_dir + "/.local/share/helm"
if create_if_not_exist and not os.path.exists(helm_path_data):
self.log.debug("Creating dir {}".format(helm_path_data))
os.makedirs(helm_path_data)
config_filename = kube_dir + "/config"
# 2 - Prepare dictionary with paths
paths = {
"kube_dir": kube_dir,
"kube_config": config_filename,
"cluster_dir": cluster_dir
}
# 3 - Prepare environment variables
env = {
"HELM_CACHE_HOME": helm_path_cache,
"HELM_CONFIG_HOME": helm_path_config,
"HELM_DATA_HOME": helm_path_data,
"KUBECONFIG": config_filename
}
for file_name, file in paths.items():
if "dir" in file_name and not os.path.exists(file):
err_msg = "{} dir does not exist".format(file)
self.log.error(err_msg)
raise K8sException(err_msg)
return paths, env
async def _get_namespaces(self,
cluster_id: str):
self.log.debug("get namespaces cluster_id {}".format(cluster_id))
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} get namespaces -o=yaml".format(
self.kubectl_command, paths["kube_config"]
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
data = yaml.load(output, Loader=yaml.SafeLoader)
namespaces = [item["metadata"]["name"] for item in data["items"]]
self.log.debug(f"namespaces {namespaces}")
return namespaces
async def _create_namespace(self,
cluster_id: str,
namespace: str):
self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} create namespace {}".format(
self.kubectl_command, paths["kube_config"], namespace
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
self.log.debug(f"namespace {namespace} created")
return _rc
async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command1 = "{} get manifest {} --namespace={}".format(
self._helm_command, kdu_instance, namespace
)
command2 = "{} get --namespace={} -f -".format(
self.kubectl_command, namespace
)
output, _rc = await self._local_async_exec_pipe(
command1, command2, env=env, raise_exception_on_error=True
)
services = self._parse_services(output)
return services
async def _cluster_init(self, cluster_id, namespace, paths, env):
"""
Implements the helm version dependent cluster initialization:
For helm3 it creates the namespace if it is not created
"""
if namespace != "kube-system":
namespaces = await self._get_namespaces(cluster_id)
if namespace not in namespaces:
await self._create_namespace(cluster_id, namespace)
# If default repo is not included add
cluster_uuid = "{}:{}".format(namespace, cluster_id)
repo_list = await self.repo_list(cluster_uuid)
for repo in repo_list:
self.log.debug("repo")
if repo["name"] == "stable":
self.log.debug("Default repo already present")
break
else:
await self.repo_add(cluster_uuid,
"stable",
self._stable_repo_url)
# Returns False as no software needs to be uninstalled
return False
async def _uninstall_sw(self, cluster_id: str, namespace: str):
# nothing to do to uninstall sw
pass
async def _instances_list(self, cluster_id: str):
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} list --all-namespaces --output yaml".format(
self._helm_command
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
if output and len(output) > 0:
self.log.debug("instances list output: {}".format(output))
return yaml.load(output, Loader=yaml.SafeLoader)
else:
return []
def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str,
version: str):
inspect_command = "{} show {} {}{} {}".format(
self._helm_command, inspect_command, kdu_model, repo_str, version
)
return inspect_command
async def _status_kdu(
self,
cluster_id: str,
kdu_instance: str,
namespace: str = None,
show_error_log: bool = False,
return_text: bool = False,
):
self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace))
if not namespace:
namespace = "kube-system"
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} status {} --namespace={} --output yaml".format(
self._helm_command, kdu_instance, namespace
)
output, rc = await self._local_async_exec(
command=command,
raise_exception_on_error=True,
show_error_log=show_error_log,
env=env
)
if return_text:
return str(output)
if rc != 0:
return None
data = yaml.load(output, Loader=yaml.SafeLoader)
# remove field 'notes' and manifest
try:
del data.get("info")["notes"]
del data["manifest"]
except KeyError:
pass
# unable to parse 'resources' as currently it is not included in helm3
return data
def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str,
params_str: str, version: str, atomic: bool, timeout: float) -> str:
timeout_str = ""
if timeout:
timeout_str = "--timeout {}s".format(timeout)
# atomic
atomic_str = ""
if atomic:
atomic_str = "--atomic"
# namespace
namespace_str = ""
if namespace:
namespace_str = "--namespace {}".format(namespace)
# version
version_str = ""
if version:
version_str = "--version {}".format(version)
command = (
"{helm} install {name} {atomic} --output yaml "
"{params} {timeout} {ns} {model} {ver}".format(
helm=self._helm_command,
name=kdu_instance,
atomic=atomic_str,
params=params_str,
timeout=timeout_str,
ns=namespace_str,
model=kdu_model,
ver=version_str,
)
)
return command
def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str,
params_str: str, version: str, atomic: bool, timeout: float) -> str:
timeout_str = ""
if timeout:
timeout_str = "--timeout {}s".format(timeout)
# atomic
atomic_str = ""
if atomic:
atomic_str = "--atomic"
# version
version_str = ""
if version:
version_str = "--version {}".format(version)
# namespace
namespace_str = ""
if namespace:
namespace_str = "--namespace {}".format(namespace)
command = (
"{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
"{timeout} {ver}".format(
helm=self._helm_command,
name=kdu_instance,
namespace=namespace_str,
atomic=atomic_str,
params=params_str,
timeout=timeout_str,
model=kdu_model,
ver=version_str,
)
)
return command
def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str:
return "{} rollback {} {} --namespace={} --wait".format(
self._helm_command, kdu_instance, revision, namespace
)
def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
return "{} uninstall {} --namespace={}".format(
self._helm_command, kdu_instance, namespace)
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
repo_ids = []
cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
cluster = self.db.get_one("k8sclusters", cluster_filter)
if cluster:
repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
return repo_ids
else:
raise K8sException(
"k8cluster with helm-id : {} not found".format(cluster_uuid)
)