--- /dev/null
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+import os
+import yaml
+
+from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
+from n2vc.exceptions import K8sException
+
+
+class K8sHelm3Connector(K8sHelmBaseConnector):
+
+ """
+ ####################################################################################
+ ################################### P U B L I C ####################################
+ ####################################################################################
+ """
+
+ def __init__(
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ helm_command: str = "/usr/bin/helm3",
+ log: object = None,
+ on_update_db=None,
+ ):
+ """
+ Initializes helm connector for helm v3
+
+ :param fs: file system for kubernetes and helm configuration
+ :param db: database object to write current operation status
+ :param kubectl_command: path to kubectl executable
+ :param helm_command: path to helm executable
+ :param log: logger
+ :param on_update_db: callback called when k8s connector updates database
+ """
+
+ # parent class
+ K8sHelmBaseConnector.__init__(self,
+ db=db,
+ log=log,
+ fs=fs,
+ kubectl_command=kubectl_command,
+ helm_command=helm_command,
+ on_update_db=on_update_db)
+
+ self.log.info("K8S Helm3 connector initialized")
+
+ async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+
+ self.log.debug(
+ "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
+ )
+
+ return await self._exec_inspect_comand(
+ inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
+ )
+
+ """
+ ####################################################################################
+ ################################### P R I V A T E ##################################
+ ####################################################################################
+ """
+
+ def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
+ """
+ Creates and returns base cluster and kube dirs and returns them.
+ Also created helm3 dirs according to new directory specification, paths are
+ returned and also environment variables that must be provided to execute commands
+
+ Helm 3 directory specification uses XDG categories for variable support:
+ - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
+ - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
+ - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
+
+ The variables assigned for this paths are:
+ (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
+ $HELM_PATH_DATA but looking and helm env the variable names are different)
+ - Cache: $HELM_CACHE_HOME
+ - Config: $HELM_CONFIG_HOME
+ - Data: $HELM_DATA_HOME
+ - helm kubeconfig: $KUBECONFIG
+
+ :param cluster_name: cluster_name
+ :return: Dictionary with config_paths and dictionary with helm environment variables
+ """
+
+ base = self.fs.path
+ if base.endswith("/") or base.endswith("\\"):
+ base = base[:-1]
+
+ # base dir for cluster
+ cluster_dir = base + "/" + cluster_name
+
+ # kube dir
+ kube_dir = cluster_dir + "/" + ".kube"
+ if create_if_not_exist and not os.path.exists(kube_dir):
+ self.log.debug("Creating dir {}".format(kube_dir))
+ os.makedirs(kube_dir)
+
+ helm_path_cache = cluster_dir + "/.cache/helm"
+ if create_if_not_exist and not os.path.exists(helm_path_cache):
+ self.log.debug("Creating dir {}".format(helm_path_cache))
+ os.makedirs(helm_path_cache)
+
+ helm_path_config = cluster_dir + "/.config/helm"
+ if create_if_not_exist and not os.path.exists(helm_path_config):
+ self.log.debug("Creating dir {}".format(helm_path_config))
+ os.makedirs(helm_path_config)
+
+ helm_path_data = cluster_dir + "/.local/share/helm"
+ if create_if_not_exist and not os.path.exists(helm_path_data):
+ self.log.debug("Creating dir {}".format(helm_path_data))
+ os.makedirs(helm_path_data)
+
+ config_filename = kube_dir + "/config"
+
+ # 2 - Prepare dictionary with paths
+ paths = {
+ "kube_dir": kube_dir,
+ "kube_config": config_filename,
+ "cluster_dir": cluster_dir
+ }
+
+ # 3 - Prepare environment variables
+ env = {
+ "HELM_CACHE_HOME": helm_path_cache,
+ "HELM_CONFIG_HOME": helm_path_config,
+ "HELM_DATA_HOME": helm_path_data,
+ "KUBECONFIG": config_filename
+ }
+
+ for file_name, file in paths.items():
+ if "dir" in file_name and not os.path.exists(file):
+ err_msg = "{} dir does not exist".format(file)
+ self.log.error(err_msg)
+ raise K8sException(err_msg)
+
+ return paths, env
+
+ async def _get_namespaces(self,
+ cluster_id: str):
+
+ self.log.debug("get namespaces cluster_id {}".format(cluster_id))
+
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = "{} --kubeconfig={} get namespaces -o=yaml".format(
+ self.kubectl_command, paths["kube_config"]
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
+
+ data = yaml.load(output, Loader=yaml.SafeLoader)
+ namespaces = [item["metadata"]["name"] for item in data["items"]]
+ self.log.debug(f"namespaces {namespaces}")
+
+ return namespaces
+
+ async def _create_namespace(self,
+ cluster_id: str,
+ namespace: str):
+
+ self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
+
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = "{} --kubeconfig={} create namespace {}".format(
+ self.kubectl_command, paths["kube_config"], namespace
+ )
+ _, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
+ self.log.debug(f"namespace {namespace} created")
+
+ return _rc
+
+ async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
+
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command1 = "{} get manifest {} --namespace={}".format(
+ self._helm_command, kdu_instance, namespace
+ )
+ command2 = "{} get --namespace={} -f -".format(
+ self.kubectl_command, namespace
+ )
+ output, _rc = await self._local_async_exec_pipe(
+ command1, command2, env=env, raise_exception_on_error=True
+ )
+ services = self._parse_services(output)
+
+ return services
+
+ async def _cluster_init(self, cluster_id, namespace, paths, env):
+ """
+ Implements the helm version dependent cluster initialization:
+ For helm3 it creates the namespace if it is not created
+ """
+ if namespace != "kube-system":
+ namespaces = await self._get_namespaces(cluster_id)
+ if namespace not in namespaces:
+ await self._create_namespace(cluster_id, namespace)
+
+ # If default repo is not included add
+ cluster_uuid = "{}:{}".format(namespace, cluster_id)
+ repo_list = await self.repo_list(cluster_uuid)
+ for repo in repo_list:
+ self.log.debug("repo")
+ if repo["name"] == "stable":
+ self.log.debug("Default repo already present")
+ break
+ else:
+ await self.repo_add(cluster_uuid,
+ "stable",
+ "https://kubernetes-charts.storage.googleapis.com/")
+
+ # Returns False as no software needs to be uninstalled
+ return False
+
+ async def _uninstall_sw(self, cluster_id: str, namespace: str):
+ # nothing to do to uninstall sw
+ pass
+
+ async def _instances_list(self, cluster_id: str):
+
+ # init paths, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = "{} list --all-namespaces --output yaml".format(
+ self._helm_command
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
+
+ if output and len(output) > 0:
+ self.log.debug("instances list output: {}".format(output))
+ return yaml.load(output, Loader=yaml.SafeLoader)
+ else:
+ return []
+
+ def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str,
+ version: str):
+ inspect_command = "{} show {} {}{} {}".format(
+ self._helm_command, inspect_command, kdu_model, repo_str, version
+ )
+ return inspect_command
+
+ async def _status_kdu(
+ self,
+ cluster_id: str,
+ kdu_instance: str,
+ namespace: str = None,
+ show_error_log: bool = False,
+ return_text: bool = False,
+ ):
+
+ self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace))
+
+ if not namespace:
+ namespace = "kube-system"
+
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+ command = "{} status {} --namespace={} --output yaml".format(
+ self._helm_command, kdu_instance, namespace
+ )
+
+ output, rc = await self._local_async_exec(
+ command=command,
+ raise_exception_on_error=True,
+ show_error_log=show_error_log,
+ env=env
+ )
+
+ if return_text:
+ return str(output)
+
+ if rc != 0:
+ return None
+
+ data = yaml.load(output, Loader=yaml.SafeLoader)
+
+ # remove field 'notes' and manifest
+ try:
+ del data.get("info")["notes"]
+ del data["manifest"]
+ except KeyError:
+ pass
+
+ # unable to parse 'resources' as currently it is not included in helm3
+ return data
+
+ def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str,
+ params_str: str, version: str, atomic: bool, timeout: float) -> str:
+
+ timeout_str = ""
+ if timeout:
+ timeout_str = "--timeout {}s".format(timeout)
+
+ # atomic
+ atomic_str = ""
+ if atomic:
+ atomic_str = "--atomic"
+ # namespace
+ namespace_str = ""
+ if namespace:
+ namespace_str = "--namespace {}".format(namespace)
+
+ # version
+ version_str = ""
+ if version:
+ version_str = "--version {}".format(version)
+
+ command = (
+ "{helm} install {name} {atomic} --output yaml "
+ "{params} {timeout} {ns} {model} {ver}".format(
+ helm=self._helm_command,
+ name=kdu_instance,
+ atomic=atomic_str,
+ params=params_str,
+ timeout=timeout_str,
+ ns=namespace_str,
+ model=kdu_model,
+ ver=version_str,
+ )
+ )
+ return command
+
+ def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str,
+ params_str: str, version: str, atomic: bool, timeout: float) -> str:
+
+ timeout_str = ""
+ if timeout:
+ timeout_str = "--timeout {}s".format(timeout)
+
+ # atomic
+ atomic_str = ""
+ if atomic:
+ atomic_str = "--atomic"
+
+ # version
+ version_str = ""
+ if version:
+ version_str = "--version {}".format(version)
+
+ # namespace
+ namespace_str = ""
+ if namespace:
+ namespace_str = "--namespace {}".format(namespace)
+
+ command = (
+ "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
+ "{timeout} {ver}".format(
+ helm=self._helm_command,
+ name=kdu_instance,
+ namespace=namespace_str,
+ atomic=atomic_str,
+ params=params_str,
+ timeout=timeout_str,
+ model=kdu_model,
+ ver=version_str,
+ )
+ )
+ return command
+
+ def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str:
+ return "{} rollback {} {} --namespace={} --wait".format(
+ self._helm_command, kdu_instance, revision, namespace
+ )
+
+ def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+
+ return "{} uninstall {} --namespace={}".format(
+ self._helm_command, kdu_instance, namespace)
--- /dev/null
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+import abc
+import asyncio
+import random
+import time
+import shlex
+import shutil
+import stat
+import subprocess
+import os
+import yaml
+from uuid import uuid4
+
+from n2vc.exceptions import K8sException
+from n2vc.k8s_conn import K8sConnector
+
+
+class K8sHelmBaseConnector(K8sConnector):
+
+ """
+ ####################################################################################
+ ################################### P U B L I C ####################################
+ ####################################################################################
+ """
+ service_account = "osm"
+
+ def __init__(
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ helm_command: str = "/usr/bin/helm",
+ log: object = None,
+ on_update_db=None,
+ ):
+ """
+
+ :param fs: file system for kubernetes and helm configuration
+ :param db: database object to write current operation status
+ :param kubectl_command: path to kubectl executable
+ :param helm_command: path to helm executable
+ :param log: logger
+ :param on_update_db: callback called when k8s connector updates database
+ """
+
+ # parent class
+ K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
+
+ self.log.info("Initializing K8S Helm connector")
+
+ # random numbers for release name generation
+ random.seed(time.time())
+
+ # the file system
+ self.fs = fs
+
+ # exception if kubectl is not installed
+ self.kubectl_command = kubectl_command
+ self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
+
+ # exception if helm is not installed
+ self._helm_command = helm_command
+ self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+
+ @staticmethod
+ def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
+ """
+ Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
+ cluster_id for backward compatibility
+ """
+ namespace, _, cluster_id = cluster_uuid.rpartition(':')
+ return namespace, cluster_id
+
+ async def init_env(
+ self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
+ ) -> (str, bool):
+ """
+ It prepares a given K8s cluster environment to run Charts
+
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+ '.kube/config'
+ :param namespace: optional namespace to be used for helm. By default,
+ 'kube-system' will be used
+ :param reuse_cluster_uuid: existing cluster uuid for reuse
+ :return: uuid of the K8s cluster and True if connector has installed some
+ software in the cluster
+ (on error, an exception will be raised)
+ """
+
+ if reuse_cluster_uuid:
+ namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
+ namespace = namespace_ or namespace
+ else:
+ cluster_id = str(uuid4())
+ cluster_uuid = "{}:{}".format(namespace, cluster_id)
+
+ self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
+
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+ mode = stat.S_IRUSR | stat.S_IWUSR
+ with open(paths["kube_config"], "w", mode) as f:
+ f.write(k8s_creds)
+ os.chmod(paths["kube_config"], 0o600)
+
+ # Code with initialization specific of helm version
+ n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
+
+ # sync fs with local data
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ self.log.info("Cluster {} initialized".format(cluster_id))
+
+ return cluster_uuid, n2vc_installed_sw
+
+ async def repo_add(
+ self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
+ ):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
+ cluster_id, repo_type, name, url))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # init_env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ # helm repo update
+ command = "{} repo update".format(
+ self._helm_command
+ )
+ self.log.debug("updating repo: {}".format(command))
+ await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
+
+ # helm repo add name url
+ command = "{} repo add {} {}".format(
+ self._helm_command, name, url
+ )
+ self.log.debug("adding repo: {}".format(command))
+ await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ async def repo_list(self, cluster_uuid: str) -> list:
+ """
+ Get the list of registered repositories
+
+ :return: list of registered repositories: [ (name, url) .... ]
+ """
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list repositories for cluster {}".format(cluster_id))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # config filename
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = "{} repo list --output yaml".format(
+ self._helm_command
+ )
+
+ # Set exception to false because if there are no repos just want an empty list
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ if _rc == 0:
+ if output and len(output) > 0:
+ repos = yaml.load(output, Loader=yaml.SafeLoader)
+ # unify format between helm2 and helm3 setting all keys lowercase
+ return self._lower_keys_list(repos)
+ else:
+ return []
+ else:
+ return []
+
+ async def repo_remove(self, cluster_uuid: str, name: str):
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = "{} repo remove {}".format(
+ self._helm_command, name
+ )
+ await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ async def reset(
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+ ) -> bool:
+
+ namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
+ .format(cluster_id, uninstall_sw))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # uninstall releases if needed.
+ if uninstall_sw:
+ releases = await self.instances_list(cluster_uuid=cluster_uuid)
+ if len(releases) > 0:
+ if force:
+ for r in releases:
+ try:
+ kdu_instance = r.get("name")
+ chart = r.get("chart")
+ self.log.debug(
+ "Uninstalling {} -> {}".format(chart, kdu_instance)
+ )
+ await self.uninstall(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
+ except Exception as e:
+ # will not raise exception as it was found
+ # that in some cases of previously installed helm releases it
+ # raised an error
+ self.log.warn(
+ "Error uninstalling release {}: {}".format(kdu_instance, e)
+ )
+ else:
+ msg = (
+ "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
+ ).format(cluster_id)
+ self.log.warn(msg)
+ uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
+
+ if uninstall_sw:
+ await self._uninstall_sw(cluster_id, namespace)
+
+ # delete cluster directory
+ self.log.debug("Removing directory {}".format(cluster_id))
+ self.fs.file_delete(cluster_id, ignore_non_exist=True)
+ # Remove also local directorio if still exist
+ direct = self.fs.path + "/" + cluster_id
+ shutil.rmtree(direct, ignore_errors=True)
+
+ return True
+
+ async def install(
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
+ ):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ # params to str
+ params_str, file_to_delete = self._params_to_file_option(
+ cluster_id=cluster_id, params=params
+ )
+
+ # version
+ version = None
+ if ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ version = str(parts[1])
+ kdu_model = parts[0]
+
+ # generate a name for the release. Then, check if already exists
+ kdu_instance = None
+ while kdu_instance is None:
+ kdu_instance = self._generate_release_name(kdu_model)
+ try:
+ result = await self._status_kdu(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=namespace,
+ show_error_log=False,
+ )
+ if result is not None:
+ # instance already exists: generate a new one
+ kdu_instance = None
+ except K8sException:
+ pass
+
+ command = self._get_install_command(kdu_model, kdu_instance, namespace,
+ params_str, version, atomic, timeout)
+
+ self.log.debug("installing: {}".format(command))
+
+ if atomic:
+ # exec helm in a task
+ exec_task = asyncio.ensure_future(
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ )
+
+ # write status in another task
+ status_task = asyncio.ensure_future(
+ coro_or_future=self._store_status(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=False,
+ )
+ )
+
+ # wait for execution task
+ await asyncio.wait([exec_task])
+
+ # cancel status task
+ status_task.cancel()
+
+ output, rc = exec_task.result()
+
+ else:
+
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ # remove temporal values yaml file
+ if file_to_delete:
+ os.remove(file_to_delete)
+
+ # write final status
+ await self._store_status(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0,
+ )
+
+ if rc != 0:
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ self.log.debug("Returning kdu_instance {}".format(kdu_instance))
+ return kdu_instance
+
+ async def upgrade(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ kdu_model: str = None,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ ):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # look for instance to obtain namespace
+ instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+ if not instance_info:
+ raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ # params to str
+ params_str, file_to_delete = self._params_to_file_option(
+ cluster_id=cluster_id, params=params
+ )
+
+ # version
+ version = None
+ if ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ version = str(parts[1])
+ kdu_model = parts[0]
+
+ command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
+ params_str, version, atomic, timeout)
+
+ self.log.debug("upgrading: {}".format(command))
+
+ if atomic:
+
+ # exec helm in a task
+ exec_task = asyncio.ensure_future(
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ )
+ # write status in another task
+ status_task = asyncio.ensure_future(
+ coro_or_future=self._store_status(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ db_dict=db_dict,
+ operation="upgrade",
+ run_once=False,
+ )
+ )
+
+ # wait for execution task
+ await asyncio.wait([exec_task])
+
+ # cancel status task
+ status_task.cancel()
+ output, rc = exec_task.result()
+
+ else:
+
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ # remove temporal values yaml file
+ if file_to_delete:
+ os.remove(file_to_delete)
+
+ # write final status
+ await self._store_status(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ db_dict=db_dict,
+ operation="upgrade",
+ run_once=True,
+ check_every=0,
+ )
+
+ if rc != 0:
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ # return new revision number
+ instance = await self.get_instance_info(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
+ if instance:
+ revision = int(instance.get("revision"))
+ self.log.debug("New revision: {}".format(revision))
+ return revision
+ else:
+ return 0
+
+ async def rollback(
+ self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
+ ):
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug(
+ "rollback kdu_instance {} to revision {} from cluster {}".format(
+ kdu_instance, revision, cluster_id
+ )
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # look for instance to obtain namespace
+ instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+ if not instance_info:
+ raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
+ revision)
+
+ self.log.debug("rolling_back: {}".format(command))
+
+ # exec helm in a task
+ exec_task = asyncio.ensure_future(
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ )
+ # write status in another task
+ status_task = asyncio.ensure_future(
+ coro_or_future=self._store_status(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ db_dict=db_dict,
+ operation="rollback",
+ run_once=False,
+ )
+ )
+
+ # wait for execution task
+ await asyncio.wait([exec_task])
+
+ # cancel status task
+ status_task.cancel()
+
+ output, rc = exec_task.result()
+
+ # write final status
+ await self._store_status(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ db_dict=db_dict,
+ operation="rollback",
+ run_once=True,
+ check_every=0,
+ )
+
+ if rc != 0:
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ # return new revision number
+ instance = await self.get_instance_info(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
+ if instance:
+ revision = int(instance.get("revision"))
+ self.log.debug("New revision: {}".format(revision))
+ return revision
+ else:
+ return 0
+
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str):
+ """
+ Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
+ (this call should happen after all _terminate-config-primitive_ of the VNF
+ are invoked).
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
+ :param kdu_instance: unique name for the KDU instance to be deleted
+ :return: True if successful
+ """
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug(
+ "uninstall kdu_instance {} from cluster {}".format(
+ kdu_instance, cluster_id
+ )
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # look for instance to obtain namespace
+ instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+ if not instance_info:
+ raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ return self._output_to_table(output)
+
+ async def instances_list(self, cluster_uuid: str) -> list:
+ """
+ returns a list of deployed releases in a cluster
+
+ :param cluster_uuid: the 'cluster' or 'namespace:cluster'
+ :return:
+ """
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list releases for cluster {}".format(cluster_id))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # execute internal command
+ result = await self._instances_list(cluster_id)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ return result
+
+ async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
+ instances = await self.instances_list(cluster_uuid=cluster_uuid)
+ for instance in instances:
+ if instance.get("name") == kdu_instance:
+ return instance
+ self.log.debug("Instance {} not found".format(kdu_instance))
+ return None
+
+ async def exec_primitive(
+ self,
+ cluster_uuid: str = None,
+ kdu_instance: str = None,
+ primitive_name: str = None,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ ) -> str:
+ """Exec primitive (Juju action)
+
+ :param cluster_uuid: The UUID of the cluster or namespace:cluster
+ :param kdu_instance: The unique name of the KDU instance
+ :param primitive_name: Name of action that will be executed
+ :param timeout: Timeout for action execution
+ :param params: Dictionary of all the parameters needed for the action
+ :db_dict: Dictionary for any additional data
+
+ :return: Returns the output of the action
+ """
+ raise K8sException(
+ "KDUs deployed with Helm don't support actions "
+ "different from rollback, upgrade and status"
+ )
+
+ async def get_services(self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ namespace: str) -> list:
+ """
+ Returns a list of services defined for the specified kdu instance.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :param namespace: K8s namespace used by the KDU instance
+ :return: If successful, it will return a list of services, Each service
+ can have the following data:
+ - `name` of the service
+ - `type` type of service in the k8 cluster
+ - `ports` List of ports offered by the service, for each port includes at least
+ name, port, protocol
+ - `cluster_ip` Internal ip to be used inside k8s cluster
+ - `external_ip` List of external ips (in case they are available)
+ """
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug(
+ "get_services: cluster_uuid: {}, kdu_instance: {}".format(
+ cluster_uuid, kdu_instance
+ )
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # get list of services names for kdu
+ service_names = await self._get_services(cluster_id, kdu_instance, namespace)
+
+ service_list = []
+ for service in service_names:
+ service = await self._get_service(cluster_id, service, namespace)
+ service_list.append(service)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ return service_list
+
+ async def get_service(self,
+ cluster_uuid: str,
+ service_name: str,
+ namespace: str) -> object:
+
+ self.log.debug(
+ "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
+ service_name, namespace, cluster_uuid)
+ )
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ service = await self._get_service(cluster_id, service_name, namespace)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ return service
+
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+
+ self.log.debug(
+ "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
+ cluster_uuid, kdu_instance
+ )
+ )
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # get instance: needed to obtain namespace
+ instances = await self._instances_list(cluster_id=cluster_id)
+ for instance in instances:
+ if instance.get("name") == kdu_instance:
+ break
+ else:
+ # instance does not exist
+ raise K8sException("Instance name: {} not found in cluster: {}".format(
+ kdu_instance, cluster_id))
+
+ status = await self._status_kdu(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=instance["namespace"],
+ show_error_log=True,
+ return_text=True,
+ )
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ return status
+
+ async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+
+ self.log.debug(
+ "inspect kdu_model values {} from (optional) repo: {}".format(
+ kdu_model, repo_url
+ )
+ )
+
+ return await self._exec_inspect_comand(
+ inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
+ )
+
+ async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+
+ self.log.debug(
+ "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
+ )
+
+ return await self._exec_inspect_comand(
+ inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
+ )
+
+ async def synchronize_repos(self, cluster_uuid: str):
+
+ self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
+ try:
+ db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
+ db_repo_dict = self._get_db_repos_dict(db_repo_ids)
+
+ local_repo_list = await self.repo_list(cluster_uuid)
+ local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
+
+ deleted_repo_list = []
+ added_repo_dict = {}
+
+ # iterate over the list of repos in the database that should be
+ # added if not present
+ for repo_name, db_repo in db_repo_dict.items():
+ try:
+ # check if it is already present
+ curr_repo_url = local_repo_dict.get(db_repo["name"])
+ repo_id = db_repo.get("_id")
+ if curr_repo_url != db_repo["url"]:
+ if curr_repo_url:
+ self.log.debug("repo {} url changed, delete and and again".format(
+ db_repo["url"]))
+ await self.repo_remove(cluster_uuid, db_repo["name"])
+ deleted_repo_list.append(repo_id)
+
+ # add repo
+ self.log.debug("add repo {}".format(db_repo["name"]))
+ await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
+ added_repo_dict[repo_id] = db_repo["name"]
+ except Exception as e:
+ raise K8sException(
+ "Error adding repo id: {}, err_msg: {} ".format(
+ repo_id, repr(e)
+ )
+ )
+
+ # Delete repos that are present but not in nbi_list
+ for repo_name in local_repo_dict:
+ if not db_repo_dict.get(repo_name) and repo_name != "stable":
+ self.log.debug("delete repo {}".format(repo_name))
+ try:
+ await self.repo_remove(cluster_uuid, repo_name)
+ deleted_repo_list.append(repo_name)
+ except Exception as e:
+ self.warning(
+ "Error deleting repo, name: {}, err_msg: {}".format(
+ repo_name, str(e)
+ )
+ )
+
+ return deleted_repo_list, added_repo_dict
+
+ except K8sException:
+ raise
+ except Exception as e:
+ # Do not raise errors synchronizing repos
+ self.log.error("Error synchronizing repos: {}".format(e))
+ raise Exception("Error synchronizing repos: {}".format(e))
+
+ def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+ repo_ids = []
+ cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
+ cluster = self.db.get_one("k8sclusters", cluster_filter)
+ if cluster:
+ repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
+ return repo_ids
+ else:
+ raise K8sException(
+ "k8cluster with helm-id : {} not found".format(cluster_uuid)
+ )
+
+ def _get_db_repos_dict(self, repo_ids: list):
+ db_repos_dict = {}
+ for repo_id in repo_ids:
+ db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
+ db_repos_dict[db_repo["name"]] = db_repo
+ return db_repos_dict
+
+ """
+ ####################################################################################
+ ################################### TO BE IMPLEMENTED SUBCLASSES ###################
+ ####################################################################################
+ """
+
+ @abc.abstractmethod
+ def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
+ """
+ Creates and returns base cluster and kube dirs and returns them.
+ Also created helm3 dirs according to new directory specification, paths are
+ not returned but assigned to helm environment variables
+
+ :param cluster_name: cluster_name
+ :return: Dictionary with config_paths and dictionary with helm environment variables
+ """
+
+ @abc.abstractmethod
+ async def _cluster_init(self, cluster_id, namespace, paths, env):
+ """
+ Implements the helm version dependent cluster initialization
+ """
+
+ @abc.abstractmethod
+ async def _instances_list(self, cluster_id):
+ """
+ Implements the helm version dependent helm instances list
+ """
+
+ @abc.abstractmethod
+ async def _get_services(self, cluster_id, kdu_instance, namespace):
+ """
+ Implements the helm version dependent method to obtain services from a helm instance
+ """
+
+ @abc.abstractmethod
+ async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
+ show_error_log: bool = False, return_text: bool = False):
+ """
+ Implements the helm version dependent method to obtain status of a helm instance
+ """
+
+ @abc.abstractmethod
+ def _get_install_command(self, kdu_model, kdu_instance, namespace,
+ params_str, version, atomic, timeout) -> str:
+ """
+ Obtain command to be executed to delete the indicated instance
+ """
+
+ @abc.abstractmethod
+ def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
+ params_str, version, atomic, timeout) -> str:
+ """
+ Obtain command to be executed to upgrade the indicated instance
+ """
+
+ @abc.abstractmethod
+ def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
+ """
+ Obtain command to be executed to rollback the indicated instance
+ """
+
+ @abc.abstractmethod
+ def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+ """
+ Obtain command to be executed to delete the indicated instance
+ """
+
+ @abc.abstractmethod
+ def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
+ version: str):
+ """
+ Obtain command to be executed to obtain information about the kdu
+ """
+
+ @abc.abstractmethod
+ async def _uninstall_sw(self, cluster_id: str, namespace: str):
+ """
+ Method call to uninstall cluster software for helm. This method is dependent
+ of helm version
+ For Helm v2 it will be called when Tiller must be uninstalled
+ For Helm v3 it does nothing and does not need to be callled
+ """
+
+ """
+ ####################################################################################
+ ################################### P R I V A T E ##################################
+ ####################################################################################
+ """
+
+ @staticmethod
+ def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
+ if os.path.exists(filename):
+ return True
+ else:
+ msg = "File {} does not exist".format(filename)
+ if exception_if_not_exists:
+ raise K8sException(msg)
+
+ @staticmethod
+ def _remove_multiple_spaces(strobj):
+ strobj = strobj.strip()
+ while " " in strobj:
+ strobj = strobj.replace(" ", " ")
+ return strobj
+
+ @staticmethod
+ def _output_to_lines(output: str) -> list:
+ output_lines = list()
+ lines = output.splitlines(keepends=False)
+ for line in lines:
+ line = line.strip()
+ if len(line) > 0:
+ output_lines.append(line)
+ return output_lines
+
+ @staticmethod
+ def _output_to_table(output: str) -> list:
+ output_table = list()
+ lines = output.splitlines(keepends=False)
+ for line in lines:
+ line = line.replace("\t", " ")
+ line_list = list()
+ output_table.append(line_list)
+ cells = line.split(sep=" ")
+ for cell in cells:
+ cell = cell.strip()
+ if len(cell) > 0:
+ line_list.append(cell)
+ return output_table
+
+ @staticmethod
+ def _parse_services(output: str) -> list:
+ lines = output.splitlines(keepends=False)
+ services = []
+ for line in lines:
+ line = line.replace("\t", " ")
+ cells = line.split(sep=" ")
+ if len(cells) > 0 and cells[0].startswith("service/"):
+ elems = cells[0].split(sep="/")
+ if len(elems) > 1:
+ services.append(elems[1])
+ return services
+
+ @staticmethod
+ def _get_deep(dictionary: dict, members: tuple):
+ target = dictionary
+ value = None
+ try:
+ for m in members:
+ value = target.get(m)
+ if not value:
+ return None
+ else:
+ target = value
+ except Exception:
+ pass
+ return value
+
+ # find key:value in several lines
+ @staticmethod
+ def _find_in_lines(p_lines: list, p_key: str) -> str:
+ for line in p_lines:
+ try:
+ if line.startswith(p_key + ":"):
+ parts = line.split(":")
+ the_value = parts[1].strip()
+ return the_value
+ except Exception:
+ # ignore it
+ pass
+ return None
+
+ @staticmethod
+ def _lower_keys_list(input_list: list):
+ """
+ Transform the keys in a list of dictionaries to lower case and returns a new list
+ of dictionaries
+ """
+ new_list = []
+ for dictionary in input_list:
+ new_dict = dict((k.lower(), v) for k, v in dictionary.items())
+ new_list.append(new_dict)
+ return new_list
+
+ def _local_exec(self, command: str) -> (str, int):
+ command = self._remove_multiple_spaces(command)
+ self.log.debug("Executing sync local command: {}".format(command))
+ # raise exception if fails
+ output = ""
+ try:
+ output = subprocess.check_output(
+ command, shell=True, universal_newlines=True
+ )
+ return_code = 0
+ self.log.debug(output)
+ except Exception:
+ return_code = 1
+
+ return output, return_code
+
+ async def _local_async_exec(
+ self,
+ command: str,
+ raise_exception_on_error: bool = False,
+ show_error_log: bool = True,
+ encode_utf8: bool = False,
+ env: dict = None
+ ) -> (str, int):
+
+ command = K8sHelmBaseConnector._remove_multiple_spaces(command)
+ self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+
+ # split command
+ command = shlex.split(command)
+
+ environ = os.environ.copy()
+ if env:
+ environ.update(env)
+
+ try:
+ process = await asyncio.create_subprocess_exec(
+ *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ env=environ
+ )
+
+ # wait for command terminate
+ stdout, stderr = await process.communicate()
+
+ return_code = process.returncode
+
+ output = ""
+ if stdout:
+ output = stdout.decode("utf-8").strip()
+ # output = stdout.decode()
+ if stderr:
+ output = stderr.decode("utf-8").strip()
+ # output = stderr.decode()
+
+ if return_code != 0 and show_error_log:
+ self.log.debug(
+ "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
+ )
+ else:
+ self.log.debug("Return code: {}".format(return_code))
+
+ if raise_exception_on_error and return_code != 0:
+ raise K8sException(output)
+
+ if encode_utf8:
+ output = output.encode("utf-8").strip()
+ output = str(output).replace("\\n", "\n")
+
+ return output, return_code
+
+ except asyncio.CancelledError:
+ raise
+ except K8sException:
+ raise
+ except Exception as e:
+ msg = "Exception executing command: {} -> {}".format(command, e)
+ self.log.error(msg)
+ if raise_exception_on_error:
+ raise K8sException(e) from e
+ else:
+ return "", -1
+
+ async def _local_async_exec_pipe(self,
+ command1: str,
+ command2: str,
+ raise_exception_on_error: bool = True,
+ show_error_log: bool = True,
+ encode_utf8: bool = False,
+ env: dict = None):
+
+ command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
+ command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
+ command = "{} | {}".format(command1, command2)
+ self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+
+ # split command
+ command1 = shlex.split(command1)
+ command2 = shlex.split(command2)
+
+ environ = os.environ.copy()
+ if env:
+ environ.update(env)
+
+ try:
+ read, write = os.pipe()
+ await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
+ os.close(write)
+ process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
+ stdout=asyncio.subprocess.PIPE,
+ env=environ)
+ os.close(read)
+ stdout, stderr = await process_2.communicate()
+
+ return_code = process_2.returncode
+
+ output = ""
+ if stdout:
+ output = stdout.decode("utf-8").strip()
+ # output = stdout.decode()
+ if stderr:
+ output = stderr.decode("utf-8").strip()
+ # output = stderr.decode()
+
+ if return_code != 0 and show_error_log:
+ self.log.debug(
+ "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
+ )
+ else:
+ self.log.debug("Return code: {}".format(return_code))
+
+ if raise_exception_on_error and return_code != 0:
+ raise K8sException(output)
+
+ if encode_utf8:
+ output = output.encode("utf-8").strip()
+ output = str(output).replace("\\n", "\n")
+
+ return output, return_code
+ except asyncio.CancelledError:
+ raise
+ except K8sException:
+ raise
+ except Exception as e:
+ msg = "Exception executing command: {} -> {}".format(command, e)
+ self.log.error(msg)
+ if raise_exception_on_error:
+ raise K8sException(e) from e
+ else:
+ return "", -1
+
+ async def _get_service(self, cluster_id, service_name, namespace):
+ """
+ Obtains the data of the specified service in the k8cluster.
+
+ :param cluster_id: id of a K8s cluster known by OSM
+ :param service_name: name of the K8s service in the specified namespace
+ :param namespace: K8s namespace used by the KDU instance
+ :return: If successful, it will return a service with the following data:
+ - `name` of the service
+ - `type` type of service in the k8 cluster
+ - `ports` List of ports offered by the service, for each port includes at least
+ name, port, protocol
+ - `cluster_ip` Internal ip to be used inside k8s cluster
+ - `external_ip` List of external ips (in case they are available)
+ """
+
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
+ self.kubectl_command, paths["kube_config"], namespace, service_name
+ )
+
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
+
+ data = yaml.load(output, Loader=yaml.SafeLoader)
+
+ service = {
+ "name": service_name,
+ "type": self._get_deep(data, ("spec", "type")),
+ "ports": self._get_deep(data, ("spec", "ports")),
+ "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+ }
+ if service["type"] == "LoadBalancer":
+ ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
+ ip_list = [elem["ip"] for elem in ip_map_list]
+ service["external_ip"] = ip_list
+
+ return service
+
+ async def _exec_inspect_comand(
+ self, inspect_command: str, kdu_model: str, repo_url: str = None
+ ):
+ """
+ Obtains information about a kdu, no cluster (no env)
+ """
+
+ repo_str = ""
+ if repo_url:
+ repo_str = " --repo {}".format(repo_url)
+
+ idx = kdu_model.find("/")
+ if idx >= 0:
+ idx += 1
+ kdu_model = kdu_model[idx:]
+
+ version = ""
+ if ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ version = "--version {}".format(str(parts[1]))
+ kdu_model = parts[0]
+
+ full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
+ output, _rc = await self._local_async_exec(
+ command=full_command, encode_utf8=True
+ )
+
+ return output
+
+ async def _store_status(
+ self,
+ cluster_id: str,
+ operation: str,
+ kdu_instance: str,
+ namespace: str = None,
+ check_every: float = 10,
+ db_dict: dict = None,
+ run_once: bool = False,
+ ):
+ while True:
+ try:
+ await asyncio.sleep(check_every)
+ detailed_status = await self._status_kdu(
+ cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
+ return_text=False
+ )
+ status = detailed_status.get("info").get("description")
+ self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
+ # write status to db
+ result = await self.write_app_status_to_db(
+ db_dict=db_dict,
+ status=str(status),
+ detailed_status=str(detailed_status),
+ operation=operation,
+ )
+ if not result:
+ self.log.info("Error writing in database. Task exiting...")
+ return
+ except asyncio.CancelledError:
+ self.log.debug("Task cancelled")
+ return
+ except Exception as e:
+ self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
+ pass
+ finally:
+ if run_once:
+ return
+
+ # params for use in -f file
+ # returns values file option and filename (in order to delete it at the end)
+ def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
+
+ if params and len(params) > 0:
+ self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ def get_random_number():
+ r = random.randrange(start=1, stop=99999999)
+ s = str(r)
+ while len(s) < 10:
+ s = "0" + s
+ return s
+
+ params2 = dict()
+ for key in params:
+ value = params.get(key)
+ if "!!yaml" in str(value):
+ value = yaml.load(value[7:])
+ params2[key] = value
+
+ values_file = get_random_number() + ".yaml"
+ with open(values_file, "w") as stream:
+ yaml.dump(params2, stream, indent=4, default_flow_style=False)
+
+ return "-f {}".format(values_file), values_file
+
+ return "", None
+
+ # params for use in --set option
+ @staticmethod
+ def _params_to_set_option(params: dict) -> str:
+ params_str = ""
+ if params and len(params) > 0:
+ start = True
+ for key in params:
+ value = params.get(key, None)
+ if value is not None:
+ if start:
+ params_str += "--set "
+ start = False
+ else:
+ params_str += ","
+ params_str += "{}={}".format(key, value)
+ return params_str
+
+ @staticmethod
+ def _generate_release_name(chart_name: str):
+ # check embeded chart (file or dir)
+ if chart_name.startswith("/"):
+ # extract file or directory name
+ chart_name = chart_name[chart_name.rfind("/") + 1:]
+ # check URL
+ elif "://" in chart_name:
+ # extract last portion of URL
+ chart_name = chart_name[chart_name.rfind("/") + 1:]
+
+ name = ""
+ for c in chart_name:
+ if c.isalpha() or c.isnumeric():
+ name += c
+ else:
+ name += "-"
+ if len(name) > 35:
+ name = name[0:35]
+
+ # if does not start with alpha character, prefix 'a'
+ if not name[0].isalpha():
+ name = "a" + name
+
+ name += "-"
+
+ def get_random_number():
+ r = random.randrange(start=1, stop=99999999)
+ s = str(r)
+ s = s.rjust(10, "0")
+ return s
+
+ name = name + get_random_number()
+ return name.lower()
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: nfvlabs@tid.es
##
-
import asyncio
import os
-import random
-import shutil
-import subprocess
-import time
-from uuid import uuid4
+import yaml
+from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
from n2vc.exceptions import K8sException
-from n2vc.k8s_conn import K8sConnector
-import yaml
-class K8sHelmConnector(K8sConnector):
+class K8sHelmConnector(K8sHelmBaseConnector):
"""
####################################################################################
################################### P U B L I C ####################################
####################################################################################
"""
- service_account = "osm"
def __init__(
self,
on_update_db=None,
):
"""
+ Initializes helm connector for helm v2
:param fs: file system for kubernetes and helm configuration
:param db: database object to write current operation status
"""
# parent class
- K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
-
- self.log.info("Initializing K8S Helm connector")
-
- # random numbers for release name generation
- random.seed(time.time())
-
- # the file system
- self.fs = fs
-
- # exception if kubectl is not installed
- self.kubectl_command = kubectl_command
- self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
+ K8sHelmBaseConnector.__init__(
+ self,
+ db=db,
+ log=log,
+ fs=fs,
+ kubectl_command=kubectl_command,
+ helm_command=helm_command,
+ on_update_db=on_update_db,
+ )
- # exception if helm is not installed
- self._helm_command = helm_command
- self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+ self.log.info("Initializing K8S Helm2 connector")
# initialize helm client-only
self.log.debug("Initializing helm client-only...")
msg="helm init failed (it was already initialized): {}".format(e)
)
- self.log.info("K8S Helm connector initialized")
+ self.log.info("K8S Helm2 connector initialized")
- @staticmethod
- def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
- """
- Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
- cluster_id for backward compatibility
- """
- namespace, _, cluster_id = cluster_uuid.rpartition(':')
- return namespace, cluster_id
+ async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+
+ self.log.debug(
+ "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
+ )
- async def init_env(
- self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
- ) -> (str, bool):
+ return await self._exec_inspect_comand(
+ inspect_command="", kdu_model=kdu_model, repo_url=repo_url
+ )
+
+ """
+ ####################################################################################
+ ################################### P R I V A T E ##################################
+ ####################################################################################
+ """
+
+ def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
"""
- It prepares a given K8s cluster environment to run Charts on both sides:
- client (OSM)
- server (Tiller)
-
- :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
- '.kube/config'
- :param namespace: optional namespace to be used for helm. By default,
- 'kube-system' will be used
- :param reuse_cluster_uuid: existing cluster uuid for reuse
- :return: uuid of the K8s cluster and True if connector has installed some
- software in the cluster
- (on error, an exception will be raised)
+ Creates and returns base cluster and kube dirs and returns them.
+ Also created helm3 dirs according to new directory specification, paths are
+ returned and also environment variables that must be provided to execute commands
+
+ Helm 2 directory specification uses helm_home dir:
+
+ The variables assigned for this paths are:
+ - Helm hone: $HELM_HOME
+ - helm kubeconfig: $KUBECONFIG
+
+ :param cluster_name: cluster_name
+ :return: Dictionary with config_paths and dictionary with helm environment variables
"""
+ base = self.fs.path
+ if base.endswith("/") or base.endswith("\\"):
+ base = base[:-1]
- if reuse_cluster_uuid:
- namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
- namespace = namespace_ or namespace
- else:
- cluster_id = str(uuid4())
- cluster_uuid = "{}:{}".format(namespace, cluster_id)
+ # base dir for cluster
+ cluster_dir = base + "/" + cluster_name
+
+ # kube dir
+ kube_dir = cluster_dir + "/" + ".kube"
+ if create_if_not_exist and not os.path.exists(kube_dir):
+ self.log.debug("Creating dir {}".format(kube_dir))
+ os.makedirs(kube_dir)
+
+ # helm home dir
+ helm_dir = cluster_dir + "/" + ".helm"
+ if create_if_not_exist and not os.path.exists(helm_dir):
+ self.log.debug("Creating dir {}".format(helm_dir))
+ os.makedirs(helm_dir)
- self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
+ config_filename = kube_dir + "/config"
- # create config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ # 2 - Prepare dictionary with paths
+ paths = {
+ "kube_dir": kube_dir,
+ "kube_config": config_filename,
+ "cluster_dir": cluster_dir,
+ "helm_dir": helm_dir,
+ }
+
+ for file_name, file in paths.items():
+ if "dir" in file_name and not os.path.exists(file):
+ err_msg = "{} dir does not exist".format(file)
+ self.log.error(err_msg)
+ raise K8sException(err_msg)
+
+ # 3 - Prepare environment variables
+ env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
+
+ return paths, env
+
+ async def _get_services(self, cluster_id, kdu_instance, namespace):
+
+ # init config, env
+ paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- with open(config_filename, "w") as f:
- f.write(k8s_creds)
+
+ command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
+ command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
+ output, _rc = await self._local_async_exec_pipe(
+ command1, command2, env=env, raise_exception_on_error=True
+ )
+ services = self._parse_services(output)
+
+ return services
+
+ async def _cluster_init(self, cluster_id: str, namespace: str,
+ paths: dict, env: dict):
+ """
+ Implements the helm version dependent cluster initialization:
+ For helm2 it initialized tiller environment if needed
+ """
# check if tiller pod is up in cluster
command = "{} --kubeconfig={} --namespace={} get deployments".format(
- self.kubectl_command, config_filename, namespace
+ self.kubectl_command, paths["kube_config"], namespace
)
output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
+ command=command, raise_exception_on_error=True, env=env
)
output_table = self._output_to_table(output=output)
"Initializing helm in client and server: {}".format(cluster_id)
)
command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
- self.kubectl_command, config_filename, self.service_account)
- _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
- "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
- ).format(self.kubectl_command, config_filename, self.service_account)
- _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
- "init").format(self._helm_command, config_filename, namespace, helm_dir,
- self.service_account)
- _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ self.kubectl_command, paths["kube_config"], self.service_account
+ )
+ _, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ command = (
+ "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
+ "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
+ ).format(self.kubectl_command, paths["kube_config"], self.service_account)
+ _, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ command = (
+ "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
+ "init"
+ ).format(
+ self._helm_command,
+ paths["kube_config"],
+ namespace,
+ paths["helm_dir"],
+ self.service_account,
+ )
+ _, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
n2vc_installed_sw = True
else:
# check client helm installation
- check_file = helm_dir + "/repository/repositories.yaml"
- if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
+ check_file = paths["helm_dir"] + "/repository/repositories.yaml"
+ if not self._check_file_exists(
+ filename=check_file, exception_if_not_exists=False
+ ):
self.log.info("Initializing helm in client: {}".format(cluster_id))
command = (
"{} --kubeconfig={} --tiller-namespace={} "
"--home={} init --client-only"
- ).format(self._helm_command, config_filename, namespace, helm_dir)
+ ).format(
+ self._helm_command,
+ paths["kube_config"],
+ namespace,
+ paths["helm_dir"],
+ )
output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
+ command=command, raise_exception_on_error=True, env=env
)
else:
self.log.info("Helm client already initialized")
- # sync fs with local data
- self.fs.reverse_sync(from_path=cluster_id)
-
- self.log.info("Cluster {} initialized".format(cluster_id))
+ return n2vc_installed_sw
- return cluster_uuid, n2vc_installed_sw
+ async def _uninstall_sw(self, cluster_id: str, namespace: str):
+ # uninstall Tiller if necessary
- async def repo_add(
- self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
- ):
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
- cluster_id, repo_type, name, url))
+ self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ # init paths, env
+ paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- # helm repo update
- command = "{} --kubeconfig={} --home={} repo update".format(
- self._helm_command, config_filename, helm_dir
- )
- self.log.debug("updating repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- # helm repo add name url
- command = "{} --kubeconfig={} --home={} repo add {} {}".format(
- self._helm_command, config_filename, helm_dir, name, url
- )
- self.log.debug("adding repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- async def repo_list(self, cluster_uuid: str) -> list:
- """
- Get the list of registered repositories
-
- :return: list of registered repositories: [ (name, url) .... ]
- """
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("list repositories for cluster {}".format(cluster_id))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
- self._helm_command, config_filename, helm_dir
- )
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
- if output and len(output) > 0:
- return yaml.load(output, Loader=yaml.SafeLoader)
- else:
- return []
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- async def repo_remove(self, cluster_uuid: str, name: str):
- """
- Remove a repository from OSM
-
- :param cluster_uuid: the cluster or 'namespace:cluster'
- :param name: repo name in OSM
- :return: True if successful
- """
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("list repositories for cluster {}".format(cluster_id))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- command = "{} --kubeconfig={} --home={} repo remove {}".format(
- self._helm_command, config_filename, helm_dir, name
- )
-
- await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
- ) -> bool:
-
- namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
- .format(cluster_id, uninstall_sw))
-
- # get kube and helm directories
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=False
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- # uninstall releases if needed.
- if uninstall_sw:
- releases = await self.instances_list(cluster_uuid=cluster_uuid)
- if len(releases) > 0:
- if force:
- for r in releases:
- try:
- kdu_instance = r.get("Name")
- chart = r.get("Chart")
- self.log.debug(
- "Uninstalling {} -> {}".format(chart, kdu_instance)
- )
- await self.uninstall(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- except Exception as e:
- self.log.error(
- "Error uninstalling release {}: {}".format(kdu_instance, e)
- )
- else:
- msg = (
- "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
- ).format(cluster_id)
- self.log.warn(msg)
- uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
-
- if uninstall_sw:
-
- self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
-
- if not namespace:
- # find namespace for tiller pod
- command = "{} --kubeconfig={} get deployments --all-namespaces".format(
- self.kubectl_command, config_filename
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- output_table = K8sHelmConnector._output_to_table(output=output)
- namespace = None
- for r in output_table:
- try:
- if "tiller-deploy" in r[1]:
- namespace = r[0]
- break
- except Exception:
- pass
- else:
- msg = "Tiller deployment not found in cluster {}".format(cluster_id)
- self.log.error(msg)
-
- self.log.debug("namespace for tiller: {}".format(namespace))
-
- if namespace:
- # uninstall tiller from cluster
- self.log.debug(
- "Uninstalling tiller from cluster {}".format(cluster_id)
- )
- command = "{} --kubeconfig={} --home={} reset".format(
- self._helm_command, config_filename, helm_dir
- )
- self.log.debug("resetting: {}".format(command))
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
- # Delete clusterrolebinding and serviceaccount.
- # Ignore if errors for backward compatibility
- command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
- "io/osm-tiller-cluster-rule").format(self.kubectl_command,
- config_filename)
- output, _rc = await self._local_async_exec(command=command,
- raise_exception_on_error=False)
- command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
- format(self.kubectl_command, config_filename, self.service_account)
- output, _rc = await self._local_async_exec(command=command,
- raise_exception_on_error=False)
-
+ if not namespace:
+ # find namespace for tiller pod
+ command = "{} --kubeconfig={} get deployments --all-namespaces".format(
+ self.kubectl_command, paths["kube_config"]
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ output_table = self._output_to_table(output=output)
+ namespace = None
+ for r in output_table:
+ try:
+ if "tiller-deploy" in r[1]:
+ namespace = r[0]
+ break
+ except Exception:
+ pass
else:
- self.log.debug("namespace not found")
+ msg = "Tiller deployment not found in cluster {}".format(cluster_id)
+ self.log.error(msg)
- # delete cluster directory
- self.log.debug("Removing directory {}".format(cluster_id))
- self.fs.file_delete(cluster_id, ignore_non_exist=True)
- # Remove also local directorio if still exist
- direct = self.fs.path + "/" + cluster_id
- shutil.rmtree(direct, ignore_errors=True)
+ self.log.debug("namespace for tiller: {}".format(namespace))
- return True
-
- async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- kdu_name: str = None,
- namespace: str = None,
- ):
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # sync local dir
- self.log.debug("sync cluster_id: {}".format(_cluster_dir))
- self.fs.sync(from_path=cluster_id)
-
- # params to str
- # params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(
- cluster_id=cluster_id, params=params
- )
-
- timeout_str = ""
- if timeout:
- timeout_str = "--timeout {}".format(timeout)
-
- # atomic
- atomic_str = ""
- if atomic:
- atomic_str = "--atomic"
- # namespace
- namespace_str = ""
if namespace:
- namespace_str = "--namespace {}".format(namespace)
-
- # version
- version_str = ""
- if ":" in kdu_model:
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- version_str = "--version {}".format(parts[1])
- kdu_model = parts[0]
-
- # generate a name for the release. Then, check if already exists
- kdu_instance = None
- while kdu_instance is None:
- kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
- try:
- result = await self._status_kdu(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- show_error_log=False,
- )
- if result is not None:
- # instance already exists: generate a new one
- kdu_instance = None
- except K8sException:
- pass
-
- # helm repo install
- command = (
- "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
- "{params} {timeout} --name={name} {ns} {model} {ver}".format(
- helm=self._helm_command,
- atomic=atomic_str,
- config=config_filename,
- dir=helm_dir,
- params=params_str,
- timeout=timeout_str,
- name=kdu_instance,
- ns=namespace_str,
- model=kdu_model,
- ver=version_str,
+ # uninstall tiller from cluster
+ self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
+ command = "{} --kubeconfig={} --home={} reset".format(
+ self._helm_command, paths["kube_config"], paths["helm_dir"]
)
- )
- self.log.debug("installing: {}".format(command))
-
- if atomic:
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
+ self.log.debug("resetting: {}".format(command))
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
)
-
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="install",
- run_once=False,
- )
+ # Delete clusterrolebinding and serviceaccount.
+ # Ignore if errors for backward compatibility
+ command = (
+ "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
+ "io/osm-tiller-cluster-rule"
+ ).format(self.kubectl_command, paths["kube_config"])
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
)
-
- # wait for execution task
- await asyncio.wait([exec_task])
-
- # cancel status task
- status_task.cancel()
-
- output, rc = exec_task.result()
-
- else:
-
- output, rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False
+ command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
+ self.kubectl_command, paths["kube_config"], self.service_account
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
)
- # remove temporal values yaml file
- if file_to_delete:
- os.remove(file_to_delete)
-
- # write final status
- await self._store_status(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="install",
- run_once=True,
- check_every=0,
- )
-
- if rc != 0:
- msg = "Error executing command: {}\nOutput: {}".format(command, output)
- self.log.error(msg)
- raise K8sException(msg)
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- self.log.debug("Returning kdu_instance {}".format(kdu_instance))
- return kdu_instance
-
- async def instances_list(self, cluster_uuid: str) -> list:
- """
- returns a list of deployed releases in a cluster
-
- :param cluster_uuid: the 'cluster' or 'namespace:cluster'
- :return:
- """
+ else:
+ self.log.debug("namespace not found")
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("list releases for cluster {}".format(cluster_id))
+ async def _instances_list(self, cluster_id):
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ # init paths, env
+ paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- command = "{} --kubeconfig={} --home={} list --output yaml".format(
- self._helm_command, config_filename, helm_dir
- )
+ command = "{} list --output yaml".format(self._helm_command)
output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
+ command=command, raise_exception_on_error=True, env=env
)
if output and len(output) > 0:
- return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
+ # parse yaml and update keys to lower case to unify with helm3
+ instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
+ new_instances = []
+ for instance in instances:
+ new_instance = dict((k.lower(), v) for k, v in instance.items())
+ new_instances.append(new_instance)
+ return new_instances
else:
return []
- async def upgrade(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- kdu_model: str = None,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- ):
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- # params to str
- # params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(
- cluster_id=cluster_id, params=params
+ def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
+ version: str):
+ inspect_command = "{} inspect {} {}{} {}".format(
+ self._helm_command, show_command, kdu_model, repo_str, version
)
-
- timeout_str = ""
- if timeout:
- timeout_str = "--timeout {}".format(timeout)
-
- # atomic
- atomic_str = ""
- if atomic:
- atomic_str = "--atomic"
-
- # version
- version_str = ""
- if kdu_model and ":" in kdu_model:
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- version_str = "--version {}".format(parts[1])
- kdu_model = parts[0]
-
- # helm repo upgrade
- command = (
- "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
- ).format(
- self._helm_command,
- atomic_str,
- config_filename,
- helm_dir,
- params_str,
- timeout_str,
- kdu_instance,
- kdu_model,
- version_str,
- )
- self.log.debug("upgrading: {}".format(command))
-
- if atomic:
-
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- )
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="upgrade",
- run_once=False,
- )
- )
-
- # wait for execution task
- await asyncio.wait([exec_task])
-
- # cancel status task
- status_task.cancel()
- output, rc = exec_task.result()
-
- else:
-
- output, rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
-
- # remove temporal values yaml file
- if file_to_delete:
- os.remove(file_to_delete)
-
- # write final status
- await self._store_status(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="upgrade",
- run_once=True,
- check_every=0,
- )
-
- if rc != 0:
- msg = "Error executing command: {}\nOutput: {}".format(command, output)
- self.log.error(msg)
- raise K8sException(msg)
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- # return new revision number
- instance = await self.get_instance_info(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- if instance:
- revision = int(instance.get("Revision"))
- self.log.debug("New revision: {}".format(revision))
- return revision
- else:
- return 0
-
- async def rollback(
- self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
- ):
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug(
- "rollback kdu_instance {} to revision {} from cluster {}".format(
- kdu_instance, revision, cluster_id
- )
- )
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
- self._helm_command, config_filename, helm_dir, kdu_instance, revision
- )
-
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- )
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="rollback",
- run_once=False,
- )
- )
-
- # wait for execution task
- await asyncio.wait([exec_task])
-
- # cancel status task
- status_task.cancel()
-
- output, rc = exec_task.result()
-
- # write final status
- await self._store_status(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="rollback",
- run_once=True,
- check_every=0,
- )
-
- if rc != 0:
- msg = "Error executing command: {}\nOutput: {}".format(command, output)
- self.log.error(msg)
- raise K8sException(msg)
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- # return new revision number
- instance = await self.get_instance_info(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- if instance:
- revision = int(instance.get("Revision"))
- self.log.debug("New revision: {}".format(revision))
- return revision
- else:
- return 0
-
- async def uninstall(self, cluster_uuid: str, kdu_instance: str):
- """
- Removes an existing KDU instance. It would implicitly use the `delete` call
- (this call would happen after all _terminate-config-primitive_ of the VNF
- are invoked).
-
- :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
- :param kdu_instance: unique name for the KDU instance to be deleted
- :return: True if successful
- """
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug(
- "uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_id
- )
- )
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- command = "{} --kubeconfig={} --home={} delete --purge {}".format(
- self._helm_command, config_filename, helm_dir, kdu_instance
- )
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- return self._output_to_table(output)
-
- async def exec_primitive(
- self,
- cluster_uuid: str = None,
- kdu_instance: str = None,
- primitive_name: str = None,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- ) -> str:
- """Exec primitive (Juju action)
-
- :param cluster_uuid str: The UUID of the cluster or namespace:cluster
- :param kdu_instance str: The unique name of the KDU instance
- :param primitive_name: Name of action that will be executed
- :param timeout: Timeout for action execution
- :param params: Dictionary of all the parameters needed for the action
- :db_dict: Dictionary for any additional data
-
- :return: Returns the output of the action
- """
- raise K8sException(
- "KDUs deployed with Helm don't support actions "
- "different from rollback, upgrade and status"
- )
-
- async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_comand(
- inspect_command="", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model values {} from (optional) repo: {}".format(
- kdu_model, repo_url
- )
- )
-
- return await self._exec_inspect_comand(
- inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_comand(
- inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
-
- # call internal function
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- return await self._status_kdu(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- show_error_log=True,
- return_text=True,
- )
-
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str) -> list:
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug(
- "get_services: cluster_uuid: {}, kdu_instance: {}".format(
- cluster_uuid, kdu_instance
- )
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- status = await self._status_kdu(
- cluster_id, kdu_instance, return_text=False
- )
-
- service_names = self._parse_helm_status_service_info(status)
- service_list = []
- for service in service_names:
- service = await self.get_service(cluster_uuid, service, namespace)
- service_list.append(service)
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- return service_list
-
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str) -> object:
-
- self.log.debug(
- "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
- service_name, namespace, cluster_uuid)
- )
-
- # get paths
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
- self.kubectl_command, config_filename, namespace, service_name
- )
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
-
- data = yaml.load(output, Loader=yaml.SafeLoader)
-
- service = {
- "name": service_name,
- "type": self._get_deep(data, ("spec", "type")),
- "ports": self._get_deep(data, ("spec", "ports")),
- "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
- }
- if service["type"] == "LoadBalancer":
- ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
- ip_list = [elem["ip"] for elem in ip_map_list]
- service["external_ip"] = ip_list
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- return service
-
- async def synchronize_repos(self, cluster_uuid: str):
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("syncronize repos for cluster helm-id: {}",)
- try:
- update_repos_timeout = (
- 300 # max timeout to sync a single repos, more than this is too much
- )
- db_k8scluster = self.db.get_one(
- "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
- )
- if db_k8scluster:
- nbi_repo_list = (
- db_k8scluster.get("_admin").get("helm_chart_repos") or []
- )
- cluster_repo_dict = (
- db_k8scluster.get("_admin").get("helm_charts_added") or {}
- )
- # elements that must be deleted
- deleted_repo_list = []
- added_repo_dict = {}
- self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
- self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
-
- # obtain repos to add: registered by nbi but not added
- repos_to_add = [
- repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
- ]
-
- # obtain repos to delete: added by cluster but not in nbi list
- repos_to_delete = [
- repo
- for repo in cluster_repo_dict.keys()
- if repo not in nbi_repo_list
- ]
-
- # delete repos: must delete first then add because there may be
- # different repos with same name but
- # different id and url
- self.log.debug("repos to delete: {}".format(repos_to_delete))
- for repo_id in repos_to_delete:
- # try to delete repos
- try:
- repo_delete_task = asyncio.ensure_future(
- self.repo_remove(
- cluster_uuid=cluster_uuid,
- name=cluster_repo_dict[repo_id],
- )
- )
- await asyncio.wait_for(repo_delete_task, update_repos_timeout)
- except Exception as e:
- self.warning(
- "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
- repo_id, cluster_repo_dict[repo_id], str(e)
- )
- )
- # always add to the list of to_delete if there is an error
- # because if is not there
- # deleting raises error
- deleted_repo_list.append(repo_id)
-
- # add repos
- self.log.debug("repos to add: {}".format(repos_to_add))
- for repo_id in repos_to_add:
- # obtain the repo data from the db
- # if there is an error getting the repo in the database we will
- # ignore this repo and continue
- # because there is a possible race condition where the repo has
- # been deleted while processing
- db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
- self.log.debug(
- "obtained repo: id, {}, name: {}, url: {}".format(
- repo_id, db_repo["name"], db_repo["url"]
- )
- )
- try:
- repo_add_task = asyncio.ensure_future(
- self.repo_add(
- cluster_uuid=cluster_uuid,
- name=db_repo["name"],
- url=db_repo["url"],
- repo_type="chart",
- )
- )
- await asyncio.wait_for(repo_add_task, update_repos_timeout)
- added_repo_dict[repo_id] = db_repo["name"]
- self.log.debug(
- "added repo: id, {}, name: {}".format(
- repo_id, db_repo["name"]
- )
- )
- except Exception as e:
- # deal with error adding repo, adding a repo that already
- # exists does not raise any error
- # will not raise error because a wrong repos added by
- # anyone could prevent instantiating any ns
- self.log.error(
- "Error adding repo id: {}, err_msg: {} ".format(
- repo_id, repr(e)
- )
- )
-
- return deleted_repo_list, added_repo_dict
-
- else: # else db_k8scluster does not exist
- raise K8sException(
- "k8cluster with helm-id : {} not found".format(cluster_uuid)
- )
-
- except Exception as e:
- self.log.error("Error synchronizing repos: {}".format(str(e)))
- raise K8sException("Error synchronizing repos")
-
- """
- ####################################################################################
- ################################### P R I V A T E ##################################
- ####################################################################################
- """
-
- async def _exec_inspect_comand(
- self, inspect_command: str, kdu_model: str, repo_url: str = None
- ):
-
- repo_str = ""
- if repo_url:
- repo_str = " --repo {}".format(repo_url)
- idx = kdu_model.find("/")
- if idx >= 0:
- idx += 1
- kdu_model = kdu_model[idx:]
-
- inspect_command = "{} inspect {} {}{}".format(
- self._helm_command, inspect_command, kdu_model, repo_str
- )
- output, _rc = await self._local_async_exec(
- command=inspect_command, encode_utf8=True
- )
-
- return output
+ return inspect_command
async def _status_kdu(
self,
cluster_id: str,
kdu_instance: str,
+ namespace: str = None,
show_error_log: bool = False,
return_text: bool = False,
):
- self.log.debug("status of kdu_instance {}".format(kdu_instance))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
+ self.log.debug(
+ "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
- command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
- self._helm_command, config_filename, helm_dir, kdu_instance
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
)
-
+ command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
output, rc = await self._local_async_exec(
command=command,
raise_exception_on_error=True,
show_error_log=show_error_log,
+ env=env,
)
if return_text:
except Exception:
pass
- return data
-
- async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
- instances = await self.instances_list(cluster_uuid=cluster_uuid)
- for instance in instances:
- if instance.get("Name") == kdu_instance:
- return instance
- self.log.debug("Instance {} not found".format(kdu_instance))
- return None
-
- @staticmethod
- def _generate_release_name(chart_name: str):
- # check embeded chart (file or dir)
- if chart_name.startswith("/"):
- # extract file or directory name
- chart_name = chart_name[chart_name.rfind("/") + 1 :]
- # check URL
- elif "://" in chart_name:
- # extract last portion of URL
- chart_name = chart_name[chart_name.rfind("/") + 1 :]
-
- name = ""
- for c in chart_name:
- if c.isalpha() or c.isnumeric():
- name += c
- else:
- name += "-"
- if len(name) > 35:
- name = name[0:35]
-
- # if does not start with alpha character, prefix 'a'
- if not name[0].isalpha():
- name = "a" + name
-
- name += "-"
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- s = s.rjust(10, "0")
- return s
-
- name = name + get_random_number()
- return name.lower()
+ # set description to lowercase (unify with helm3)
+ try:
+ data.get("info")["description"] = data.get("info").pop("Description")
+ except KeyError:
+ pass
- async def _store_status(
- self,
- cluster_id: str,
- operation: str,
- kdu_instance: str,
- check_every: float = 10,
- db_dict: dict = None,
- run_once: bool = False,
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance,
- return_text=False
- )
- status = detailed_status.get("info").get("Description")
- self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation,
- )
- if not result:
- self.log.info("Error writing in database. Task exiting...")
- return
- except asyncio.CancelledError:
- self.log.debug("Task cancelled")
- return
- except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
- pass
- finally:
- if run_once:
- return
+ return data
async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
# halting-horse-mongodb 0/1 1 0 0s
# halting-petit-mongodb 1/1 1 0 0s
# blank line
- resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
+ resources = K8sHelmBaseConnector._get_deep(
+ status, ("info", "status", "resources")
+ )
# convert to table
- resources = K8sHelmConnector._output_to_table(resources)
+ resources = K8sHelmBaseConnector._output_to_table(resources)
num_lines = len(resources)
index = 0
+ ready = True
while index < num_lines:
try:
line1 = resources[index]
return ready
- def _parse_helm_status_service_info(self, status):
-
- # extract info.status.resources-> str
- # format:
- # ==> v1/Deployment
- # NAME READY UP-TO-DATE AVAILABLE AGE
- # halting-horse-mongodb 0/1 1 0 0s
- # halting-petit-mongodb 1/1 1 0 0s
- # blank line
- resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
-
- service_list = []
- first_line_skipped = service_found = False
- for line in resources:
- if not service_found:
- if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
- service_found = True
- continue
- else:
- if len(line) >= 2 and line[0] == "==>":
- service_found = first_line_skipped = False
- continue
- if not line:
- continue
- if not first_line_skipped:
- first_line_skipped = True
- continue
- service_list.append(line[0])
-
- return service_list
-
- @staticmethod
- def _get_deep(dictionary: dict, members: tuple):
- target = dictionary
- value = None
- try:
- for m in members:
- value = target.get(m)
- if not value:
- return None
- else:
- target = value
- except Exception:
- pass
- return value
-
- # find key:value in several lines
- @staticmethod
- def _find_in_lines(p_lines: list, p_key: str) -> str:
- for line in p_lines:
- try:
- if line.startswith(p_key + ":"):
- parts = line.split(":")
- the_value = parts[1].strip()
- return the_value
- except Exception:
- # ignore it
- pass
- return None
-
- # params for use in -f file
- # returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
-
- if params and len(params) > 0:
- self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- while len(s) < 10:
- s = "0" + s
- return s
-
- params2 = dict()
- for key in params:
- value = params.get(key)
- if "!!yaml" in str(value):
- value = yaml.load(value[7:])
- params2[key] = value
-
- values_file = get_random_number() + ".yaml"
- with open(values_file, "w") as stream:
- yaml.dump(params2, stream, indent=4, default_flow_style=False)
-
- return "-f {}".format(values_file), values_file
-
- return "", None
-
- # params for use in --set option
- @staticmethod
- def _params_to_set_option(params: dict) -> str:
- params_str = ""
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += "--set "
- start = False
- else:
- params_str += ","
- params_str += "{}={}".format(key, value)
- return params_str
-
- @staticmethod
- def _output_to_lines(output: str) -> list:
- output_lines = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.strip()
- if len(line) > 0:
- output_lines.append(line)
- return output_lines
-
- @staticmethod
- def _output_to_table(output: str) -> list:
- output_table = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.replace("\t", " ")
- line_list = list()
- output_table.append(line_list)
- cells = line.split(sep=" ")
- for cell in cells:
- cell = cell.strip()
- if len(cell) > 0:
- line_list.append(cell)
- return output_table
-
- def _get_paths(
- self, cluster_name: str, create_if_not_exist: bool = False
- ) -> (str, str, str, str):
- """
- Returns kube and helm directories
-
- :param cluster_name:
- :param create_if_not_exist:
- :return: kube, helm directories, config filename and cluster dir.
- Raises exception if not exist and cannot create
- """
-
- base = self.fs.path
- if base.endswith("/") or base.endswith("\\"):
- base = base[:-1]
-
- # base dir for cluster
- cluster_dir = base + "/" + cluster_name
- if create_if_not_exist and not os.path.exists(cluster_dir):
- self.log.debug("Creating dir {}".format(cluster_dir))
- os.makedirs(cluster_dir)
- if not os.path.exists(cluster_dir):
- msg = "Base cluster dir {} does not exist".format(cluster_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- # kube dir
- kube_dir = cluster_dir + "/" + ".kube"
- if create_if_not_exist and not os.path.exists(kube_dir):
- self.log.debug("Creating dir {}".format(kube_dir))
- os.makedirs(kube_dir)
- if not os.path.exists(kube_dir):
- msg = "Kube config dir {} does not exist".format(kube_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- # helm home dir
- helm_dir = cluster_dir + "/" + ".helm"
- if create_if_not_exist and not os.path.exists(helm_dir):
- self.log.debug("Creating dir {}".format(helm_dir))
- os.makedirs(helm_dir)
- if not os.path.exists(helm_dir):
- msg = "Helm config dir {} does not exist".format(helm_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- config_filename = kube_dir + "/config"
- return kube_dir, helm_dir, config_filename, cluster_dir
-
- @staticmethod
- def _remove_multiple_spaces(strobj):
- strobj = strobj.strip()
- while " " in strobj:
- strobj = strobj.replace(" ", " ")
- return strobj
-
- def _local_exec(self, command: str) -> (str, int):
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.log.debug("Executing sync local command: {}".format(command))
- # raise exception if fails
- output = ""
- try:
- output = subprocess.check_output(
- command, shell=True, universal_newlines=True
- )
- return_code = 0
- self.log.debug(output)
- except Exception:
- return_code = 1
-
- return output, return_code
+ def _get_install_command(
+ self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ ) -> str:
- async def _local_async_exec(
- self,
- command: str,
- raise_exception_on_error: bool = False,
- show_error_log: bool = True,
- encode_utf8: bool = False,
- ) -> (str, int):
+ timeout_str = ""
+ if timeout:
+ timeout_str = "--timeout {}".format(timeout)
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.log.debug("Executing async local command: {}".format(command))
+ # atomic
+ atomic_str = ""
+ if atomic:
+ atomic_str = "--atomic"
+ # namespace
+ namespace_str = ""
+ if namespace:
+ namespace_str = "--namespace {}".format(namespace)
- # split command
- command = command.split(sep=" ")
+ # version
+ version_str = ""
+ if version:
+ version_str = version_str = "--version {}".format(version)
- try:
- process = await asyncio.create_subprocess_exec(
- *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
+ command = (
+ "{helm} install {atomic} --output yaml "
+ "{params} {timeout} --name={name} {ns} {model} {ver}".format(
+ helm=self._helm_command,
+ atomic=atomic_str,
+ params=params_str,
+ timeout=timeout_str,
+ name=kdu_instance,
+ ns=namespace_str,
+ model=kdu_model,
+ ver=version_str,
)
+ )
+ return command
- # wait for command terminate
- stdout, stderr = await process.communicate()
-
- return_code = process.returncode
-
- output = ""
- if stdout:
- output = stdout.decode("utf-8").strip()
- # output = stdout.decode()
- if stderr:
- output = stderr.decode("utf-8").strip()
- # output = stderr.decode()
-
- if return_code != 0 and show_error_log:
- self.log.debug(
- "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
- )
- else:
- self.log.debug("Return code: {}".format(return_code))
+ def _get_upgrade_command(
+ self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ ) -> str:
- if raise_exception_on_error and return_code != 0:
- raise K8sException(output)
+ timeout_str = ""
+ if timeout:
+ timeout_str = "--timeout {}".format(timeout)
- if encode_utf8:
- output = output.encode("utf-8").strip()
- output = str(output).replace("\\n", "\n")
+ # atomic
+ atomic_str = ""
+ if atomic:
+ atomic_str = "--atomic"
- return output, return_code
+ # version
+ version_str = ""
+ if version:
+ version_str = "--version {}".format(version)
+
+ command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
+ .format(helm=self._helm_command,
+ atomic=atomic_str,
+ params=params_str,
+ timeout=timeout_str,
+ name=kdu_instance,
+ model=kdu_model,
+ ver=version_str
+ )
+ return command
- except asyncio.CancelledError:
- raise
- except K8sException:
- raise
- except Exception as e:
- msg = "Exception executing command: {} -> {}".format(command, e)
- self.log.error(msg)
- if raise_exception_on_error:
- raise K8sException(e) from e
- else:
- return "", -1
+ def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
+ return "{} rollback {} {} --wait".format(
+ self._helm_command, kdu_instance, revision
+ )
- def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
- # self.log.debug('Checking if file {} exists...'.format(filename))
- if os.path.exists(filename):
- return True
- else:
- msg = "File {} does not exist".format(filename)
- if exception_if_not_exists:
- # self.log.error(msg)
- raise K8sException(msg)
+ def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+ return "{} delete --purge {}".format(self._helm_command, kdu_instance)
--- /dev/null
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: alfonso.tiernosepulveda@telefonica.com
+##
+
+import asynctest
+import logging
+
+from asynctest.mock import Mock
+from osm_common.dbmemory import DbMemory
+from osm_common.fslocal import FsLocal
+from n2vc.k8s_helm3_conn import K8sHelm3Connector
+
+__author__ = "Isabel Lloret <illoret@indra.es>"
+
+
+class TestK8sHelm3Conn(asynctest.TestCase):
+ logging.basicConfig(level=logging.DEBUG)
+ logger = logging.getLogger(__name__)
+ logger.setLevel(logging.DEBUG)
+
+ async def setUp(self):
+ self.db = Mock(DbMemory())
+ self.fs = asynctest.Mock(FsLocal())
+ self.fs.path = "./tmp/"
+ self.namespace = "testk8s"
+ self.cluster_id = "helm3_cluster_id"
+ self.cluster_uuid = "{}:{}".format(self.namespace, self.cluster_id)
+ # pass fake kubectl and helm commands to make sure it does not call actual commands
+ K8sHelm3Connector._check_file_exists = asynctest.Mock(return_value=True)
+ cluster_dir = self.fs.path + self.cluster_id
+ self.env = {
+ "HELM_CACHE_HOME": "{}/.cache/helm".format(cluster_dir),
+ "HELM_CONFIG_HOME": "{}/.config/helm".format(cluster_dir),
+ "HELM_DATA_HOME": "{}/.local/share/helm".format(cluster_dir),
+ "KUBECONFIG": "{}/.kube/config".format(cluster_dir)
+ }
+ self.helm_conn = K8sHelm3Connector(self.fs, self.db,
+ log=self.logger)
+ self.logger.debug("Set up executed")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_init_env(self):
+ k8s_creds = "false_credentials_string"
+ self.helm_conn._get_namespaces = asynctest.CoroutineMock(return_value=[])
+ self.helm_conn._create_namespace = asynctest.CoroutineMock()
+ self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=[])
+ self.helm_conn.repo_add = asynctest.CoroutineMock()
+
+ k8scluster_uuid, installed = await self.helm_conn.init_env(
+ k8s_creds, namespace=self.namespace, reuse_cluster_uuid=self.cluster_id)
+
+ self.assertEqual(k8scluster_uuid, "{}:{}".format(self.namespace, self.cluster_id),
+ "Check cluster_uuid format: <namespace>.<cluster_id>")
+ self.helm_conn._get_namespaces.assert_called_once_with(self.cluster_id)
+ self.helm_conn._create_namespace.assert_called_once_with(self.cluster_id, self.namespace)
+ self.helm_conn.repo_list.assert_called_once_with(k8scluster_uuid)
+ self.helm_conn.repo_add.assert_called_once_with(
+ k8scluster_uuid, "stable", "https://kubernetes-charts.storage.googleapis.com/")
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.logger.debug(f"cluster_uuid: {k8scluster_uuid}")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_repo_add(self):
+ repo_name = "bitnami"
+ repo_url = "https://charts.bitnami.com/bitnami"
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn.repo_add(self.cluster_uuid, repo_name, repo_url)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.assertEqual(self.helm_conn._local_async_exec.call_count, 2,
+ "local_async_exec expected 2 calls, called {}".format(
+ self.helm_conn._local_async_exec.call_count))
+
+ repo_update_command = "/usr/bin/helm3 repo update"
+ repo_add_command = "/usr/bin/helm3 repo add {} {}".format(repo_name, repo_url)
+ calls = self.helm_conn._local_async_exec.call_args_list
+ call0_kargs = calls[0][1]
+ self.assertEqual(call0_kargs.get("command"), repo_update_command,
+ "Invalid repo update command: {}".format(call0_kargs.get("command")))
+ self.assertEqual(call0_kargs.get("env"), self.env,
+ "Invalid env for update command: {}".format(call0_kargs.get("env")))
+ call1_kargs = calls[1][1]
+ self.assertEqual(call1_kargs.get("command"), repo_add_command,
+ "Invalid repo add command: {}".format(call1_kargs.get("command")))
+ self.assertEqual(call1_kargs.get("env"), self.env,
+ "Invalid env for add command: {}".format(call1_kargs.get("env")))
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_repo_list(self):
+
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn.repo_list(self.cluster_uuid)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/helm3 repo list --output yaml"
+ self.helm_conn._local_async_exec.assert_called_with(command=command, env=self.env,
+ raise_exception_on_error=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_repo_remove(self):
+
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ repo_name = "bitnami"
+ await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/helm3 repo remove {}".format(repo_name)
+ self.helm_conn._local_async_exec.assert_called_with(command=command, env=self.env,
+ raise_exception_on_error=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_install(self):
+ kdu_model = "stable/openldap:1.2.2"
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=None)
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn._generate_release_name = Mock(return_value="stable-openldap-0005399828")
+
+ kdu_instance = await self.helm_conn.install(self.cluster_uuid,
+ kdu_model,
+ atomic=True,
+ namespace=self.namespace,
+ db_dict=db_dict)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn._generate_release_name.assert_called_once_with("stable/openldap")
+ self.helm_conn._status_kdu.assert_called_once_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ show_error_log=False)
+ self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0)
+ command = "/usr/bin/helm3 install stable-openldap-0005399828 --atomic --output yaml " \
+ "--timeout 300s --namespace testk8s stable/openldap --version 1.2.2"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_upgrade(self):
+ kdu_model = "stable/openldap:1.2.3"
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ instance_info = {
+ "chart": "openldap-1.2.2",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": 1,
+ "status": "DEPLOYED"
+ }
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+
+ await self.helm_conn.upgrade(self.cluster_uuid,
+ kdu_instance,
+ kdu_model,
+ atomic=True,
+ db_dict=db_dict)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="upgrade",
+ run_once=True,
+ check_every=0)
+ command = "/usr/bin/helm3 upgrade stable-openldap-0005399828 stable/openldap " \
+ "--namespace testk8s --atomic --output yaml --timeout 300s " \
+ "--version 1.2.3"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_rollback(self):
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ instance_info = {
+ "chart": "openldap-1.2.3",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": 2,
+ "status": "DEPLOYED"
+ }
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+
+ await self.helm_conn.rollback(self.cluster_uuid,
+ kdu_instance=kdu_instance,
+ revision=1,
+ db_dict=db_dict)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="rollback",
+ run_once=True,
+ check_every=0)
+ command = "/usr/bin/helm3 rollback stable-openldap-0005399828 1 --namespace=testk8s --wait"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_uninstall(self):
+ kdu_instance = "stable-openldap-0005399828"
+ instance_info = {
+ "chart": "openldap-1.2.2",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": 3,
+ "status": "DEPLOYED"
+ }
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+
+ await self.helm_conn.uninstall(self.cluster_uuid, kdu_instance)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/helm3 uninstall {} --namespace={}".format(
+ kdu_instance, self.namespace)
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_get_services(self):
+ kdu_instance = "test_services_1"
+ service = {
+ "name": "testservice",
+ "type": "LoadBalancer"
+ }
+ self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._parse_services = Mock(return_value=["testservice"])
+ self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
+
+ services = await self.helm_conn.get_services(self.cluster_uuid, kdu_instance,
+ self.namespace)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn._parse_services.assert_called_once()
+ command1 = "/usr/bin/helm3 get manifest {} --namespace=testk8s".format(kdu_instance)
+ command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
+ self.helm_conn._local_async_exec_pipe.assert_called_once_with(command1, command2,
+ env=self.env,
+ raise_exception_on_error=True)
+ self.assertEqual(services, [service], "Invalid service returned from get_service")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_get_service(self):
+ service_name = "service1"
+
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ await self.helm_conn.get_service(self.cluster_uuid, service_name, self.namespace)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/kubectl --kubeconfig=./tmp/helm3_cluster_id/.kube/config " \
+ "--namespace=testk8s get service service1 -o=yaml"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_inspect_kdu(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ kdu_model = "stable/openldap:1.2.4"
+ repo_url = "https://kubernetes-charts.storage.googleapis.com/"
+ await self.helm_conn.inspect_kdu(kdu_model, repo_url)
+
+ command = "/usr/bin/helm3 show all openldap --repo " \
+ "https://kubernetes-charts.storage.googleapis.com/ " \
+ "--version 1.2.4"
+ self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_help_kdu(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ kdu_model = "stable/openldap:1.2.4"
+ repo_url = "https://kubernetes-charts.storage.googleapis.com/"
+ await self.helm_conn.help_kdu(kdu_model, repo_url)
+
+ command = "/usr/bin/helm3 show readme openldap --repo " \
+ "https://kubernetes-charts.storage.googleapis.com/ " \
+ "--version 1.2.4"
+ self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_values_kdu(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ kdu_model = "stable/openldap:1.2.4"
+ repo_url = "https://kubernetes-charts.storage.googleapis.com/"
+ await self.helm_conn.values_kdu(kdu_model, repo_url)
+
+ command = "/usr/bin/helm3 show values openldap --repo " \
+ "https://kubernetes-charts.storage.googleapis.com/ " \
+ "--version 1.2.4"
+ self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_instances_list(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn.instances_list(self.cluster_uuid)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/helm3 list --all-namespaces --output yaml"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_status_kdu(self):
+ kdu_instance = "stable-openldap-0005399828"
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn._status_kdu(self.cluster_id, kdu_instance,
+ self.namespace, return_text=True)
+ command = "/usr/bin/helm3 status {} --namespace={} --output yaml".format(
+ kdu_instance, self.namespace
+ )
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=True,
+ show_error_log=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_store_status(self):
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ status = {
+ "info": {
+ "description": "Install complete",
+ "status": {
+ "code": "1",
+ "notes": "The openldap helm chart has been installed"
+ }
+ }
+ }
+ self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=status)
+ self.helm_conn.write_app_status_to_db = asynctest.CoroutineMock(return_value=status)
+
+ await self.helm_conn._store_status(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0)
+ self.helm_conn._status_kdu.assert_called_once_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ return_text=False)
+ self.helm_conn.write_app_status_to_db.assert_called_once_with(db_dict=db_dict,
+ status="Install complete",
+ detailed_status=str(status),
+ operation="install")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_reset_uninstall_false(self):
+ self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
+
+ await self.helm_conn.reset(self.cluster_uuid, force=False, uninstall_sw=False)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.file_delete.assert_called_once_with(self.cluster_id,
+ ignore_non_exist=True)
+ self.helm_conn._uninstall_sw.assert_not_called()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_reset_uninstall(self):
+ kdu_instance = 'stable-openldap-0021099429'
+ instances = [
+ {
+ 'app_version': '2.4.48',
+ 'chart': 'openldap-1.2.3',
+ 'name': kdu_instance,
+ 'namespace': self.namespace,
+ 'revision': '1',
+ 'status': 'deployed',
+ 'updated': '2020-10-30 11:11:20.376744191 +0000 UTC'
+ }
+ ]
+ self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
+ self.helm_conn.instances_list = asynctest.CoroutineMock(return_value=instances)
+ self.helm_conn.uninstall = asynctest.CoroutineMock()
+
+ await self.helm_conn.reset(self.cluster_uuid, force=True, uninstall_sw=True)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.file_delete.assert_called_once_with(self.cluster_id,
+ ignore_non_exist=True)
+ self.helm_conn.instances_list.assert_called_once_with(cluster_uuid=self.cluster_uuid)
+ self.helm_conn.uninstall.assert_called_once_with(cluster_uuid=self.cluster_uuid,
+ kdu_instance=kdu_instance)
+ self.helm_conn._uninstall_sw.assert_called_once_with(self.cluster_id, self.namespace)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_sync_repos_add(self):
+ repo_list = [
+ {
+ "name": "stable",
+ "url": "https://kubernetes-charts.storage.googleapis.com/"
+ }
+ ]
+ self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=repo_list)
+
+ def get_one_result(*args, **kwargs):
+ if args[0] == "k8sclusters":
+ return {
+ "_admin": {
+ "helm_chart_repos" : [
+ "4b5550a9-990d-4d95-8a48-1f4614d6ac9c"
+ ]
+ }
+ }
+ elif args[0] == "k8srepos":
+ return {
+ "_id": "4b5550a9-990d-4d95-8a48-1f4614d6ac9c",
+ "type": "helm-chart",
+ "name": "bitnami",
+ "url": "https://charts.bitnami.com/bitnami"
+ }
+ self.helm_conn.db.get_one = asynctest.Mock()
+ self.helm_conn.db.get_one.side_effect = get_one_result
+
+ self.helm_conn.repo_add = asynctest.CoroutineMock()
+ self.helm_conn.repo_remove = asynctest.CoroutineMock()
+
+ deleted_repo_list, added_repo_dict = await self.helm_conn.synchronize_repos(
+ self.cluster_uuid)
+ self.helm_conn.repo_remove.assert_not_called()
+ self.helm_conn.repo_add.assert_called_once_with(self.cluster_uuid, "bitnami",
+ "https://charts.bitnami.com/bitnami")
+ self.assertEqual(deleted_repo_list, [], "Deleted repo list should be empty")
+ self.assertEqual(added_repo_dict,
+ {"4b5550a9-990d-4d95-8a48-1f4614d6ac9c": "bitnami"},
+ "Repos added should include only one bitnami")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_sync_repos_delete(self):
+ repo_list = [
+ {
+ "name": "stable",
+ "url": "https://kubernetes-charts.storage.googleapis.com/"
+ },
+ {
+ "name": "bitnami",
+ "url": "https://charts.bitnami.com/bitnami"
+ }
+ ]
+ self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=repo_list)
+
+ def get_one_result(*args, **kwargs):
+ if args[0] == "k8sclusters":
+ return {
+ "_admin": {
+ "helm_chart_repos": []
+ }
+ }
+
+ self.helm_conn.db.get_one = asynctest.Mock()
+ self.helm_conn.db.get_one.side_effect = get_one_result
+
+ self.helm_conn.repo_add = asynctest.CoroutineMock()
+ self.helm_conn.repo_remove = asynctest.CoroutineMock()
+
+ deleted_repo_list, added_repo_dict = await self.helm_conn.synchronize_repos(
+ self.cluster_uuid)
+ self.helm_conn.repo_add.assert_not_called()
+ self.helm_conn.repo_remove.assert_called_once_with(self.cluster_uuid, "bitnami")
+ self.assertEqual(deleted_repo_list, ["bitnami"], "Deleted repo list should be bitnami")
+ self.assertEqual(added_repo_dict, {}, "No repos should be added")
--- /dev/null
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: alfonso.tiernosepulveda@telefonica.com
+##
+
+import asynctest
+import logging
+
+from asynctest.mock import Mock
+from osm_common.dbmemory import DbMemory
+from osm_common.fslocal import FsLocal
+from n2vc.k8s_helm_conn import K8sHelmConnector
+
+__author__ = "Isabel Lloret <illoret@indra.es>"
+
+
+class TestK8sHelmConn(asynctest.TestCase):
+ logging.basicConfig(level=logging.DEBUG)
+ logger = logging.getLogger(__name__)
+ logger.setLevel(logging.DEBUG)
+
+ async def setUp(self):
+ self.db = Mock(DbMemory())
+ self.fs = asynctest.Mock(FsLocal())
+ self.fs.path = "./tmp/"
+ self.namespace = "testk8s"
+ self.service_account = "osm"
+ self.cluster_id = "helm_cluster_id"
+ self.cluster_uuid = "{}:{}".format(self.namespace, self.cluster_id)
+ # pass fake kubectl and helm commands to make sure it does not call actual commands
+ K8sHelmConnector._check_file_exists = asynctest.Mock(return_value=True)
+ K8sHelmConnector._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ cluster_dir = self.fs.path + self.cluster_id
+ self.kube_config = self.fs.path + self.cluster_id + "/.kube/config"
+ self.helm_home = self.fs.path + self.cluster_id + "/.helm"
+ self.env = {
+ "HELM_HOME": "{}/.helm".format(cluster_dir),
+ "KUBECONFIG": "{}/.kube/config".format(cluster_dir)
+ }
+ self.helm_conn = K8sHelmConnector(self.fs, self.db,
+ log=self.logger)
+ self.logger.debug("Set up executed")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_init_env(self):
+ # TODO
+ pass
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_repo_add(self):
+ repo_name = "bitnami"
+ repo_url = "https://charts.bitnami.com/bitnami"
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn.repo_add(self.cluster_uuid, repo_name, repo_url)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.assertEqual(self.helm_conn._local_async_exec.call_count, 2,
+ "local_async_exec expected 2 calls, called {}".format(
+ self.helm_conn._local_async_exec.call_count))
+
+ repo_update_command = "/usr/bin/helm repo update"
+ repo_add_command = "/usr/bin/helm repo add {} {}".format(repo_name, repo_url)
+ calls = self.helm_conn._local_async_exec.call_args_list
+ call0_kargs = calls[0][1]
+ self.assertEqual(call0_kargs.get("command"), repo_update_command,
+ "Invalid repo update command: {}".format(call0_kargs.get("command")))
+ self.assertEqual(call0_kargs.get("env"), self.env,
+ "Invalid env for update command: {}".format(call0_kargs.get("env")))
+ call1_kargs = calls[1][1]
+ self.assertEqual(call1_kargs.get("command"), repo_add_command,
+ "Invalid repo add command: {}".format(call1_kargs.get("command")))
+ self.assertEqual(call1_kargs.get("env"), self.env,
+ "Invalid env for add command: {}".format(call1_kargs.get("env")))
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_repo_list(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn.repo_list(self.cluster_uuid)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/helm repo list --output yaml"
+ self.helm_conn._local_async_exec.assert_called_with(command=command, env=self.env,
+ raise_exception_on_error=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_repo_remove(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ repo_name = "bitnami"
+ await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/helm repo remove {}".format(repo_name)
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command, env=self.env,
+ raise_exception_on_error=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_install(self):
+ kdu_model = "stable/openldap:1.2.2"
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=None)
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn._generate_release_name = Mock(return_value="stable-openldap-0005399828")
+
+ kdu_instance = await self.helm_conn.install(self.cluster_uuid,
+ kdu_model,
+ atomic=True,
+ namespace=self.namespace,
+ db_dict=db_dict)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn._generate_release_name.assert_called_once_with("stable/openldap")
+ self.helm_conn._status_kdu.assert_called_once_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ show_error_log=False)
+ self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0)
+ command = "/usr/bin/helm install --atomic --output yaml --timeout 300 " \
+ "--name=stable-openldap-0005399828 --namespace testk8s stable/openldap " \
+ "--version 1.2.2"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_upgrade(self):
+ kdu_model = "stable/openldap:1.2.3"
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ instance_info = {
+ "chart": "openldap-1.2.2",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": 1,
+ "status": "DEPLOYED"
+ }
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+
+ await self.helm_conn.upgrade(self.cluster_uuid,
+ kdu_instance,
+ kdu_model,
+ atomic=True,
+ db_dict=db_dict)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="upgrade",
+ run_once=True,
+ check_every=0)
+ command = "/usr/bin/helm upgrade --atomic --output yaml --timeout 300 " \
+ "stable-openldap-0005399828 stable/openldap --version 1.2.3"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_rollback(self):
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ instance_info = {
+ "chart": "openldap-1.2.3",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": 2,
+ "status": "DEPLOYED"
+ }
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+
+ await self.helm_conn.rollback(self.cluster_uuid,
+ kdu_instance=kdu_instance,
+ revision=1,
+ db_dict=db_dict)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="rollback",
+ run_once=True,
+ check_every=0)
+ command = "/usr/bin/helm rollback stable-openldap-0005399828 1 --wait"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_uninstall(self):
+ kdu_instance = "stable-openldap-0005399828"
+ instance_info = {
+ "chart": "openldap-1.2.2",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": 3,
+ "status": "DEPLOYED"
+ }
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+
+ await self.helm_conn.uninstall(self.cluster_uuid, kdu_instance)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/helm delete --purge {}".format(kdu_instance)
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_get_services(self):
+ kdu_instance = "test_services_1"
+ service = {
+ "name": "testservice",
+ "type": "LoadBalancer"
+ }
+ self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._parse_services = Mock(return_value=["testservice"])
+ self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
+
+ services = await self.helm_conn.get_services(self.cluster_uuid, kdu_instance,
+ self.namespace)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn._parse_services.assert_called_once()
+ command1 = "/usr/bin/helm get manifest {} ".format(kdu_instance)
+ command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
+ self.helm_conn._local_async_exec_pipe.assert_called_once_with(command1, command2,
+ env=self.env,
+ raise_exception_on_error=True)
+ self.assertEqual(services, [service], "Invalid service returned from get_service")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_get_service(self):
+ service_name = "service1"
+
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ await self.helm_conn.get_service(self.cluster_uuid, service_name, self.namespace)
+
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/kubectl --kubeconfig=./tmp/helm_cluster_id/.kube/config " \
+ "--namespace=testk8s get service service1 -o=yaml"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_inspect_kdu(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ kdu_model = "stable/openldap:1.2.4"
+ repo_url = "https://kubernetes-charts.storage.googleapis.com/"
+ await self.helm_conn.inspect_kdu(kdu_model, repo_url)
+
+ command = "/usr/bin/helm inspect openldap --repo " \
+ "https://kubernetes-charts.storage.googleapis.com/ " \
+ "--version 1.2.4"
+ self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_help_kdu(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ kdu_model = "stable/openldap:1.2.4"
+ repo_url = "https://kubernetes-charts.storage.googleapis.com/"
+ await self.helm_conn.help_kdu(kdu_model, repo_url)
+
+ command = "/usr/bin/helm inspect readme openldap --repo " \
+ "https://kubernetes-charts.storage.googleapis.com/ " \
+ "--version 1.2.4"
+ self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_values_kdu(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ kdu_model = "stable/openldap:1.2.4"
+ repo_url = "https://kubernetes-charts.storage.googleapis.com/"
+ await self.helm_conn.values_kdu(kdu_model, repo_url)
+
+ command = "/usr/bin/helm inspect values openldap --repo " \
+ "https://kubernetes-charts.storage.googleapis.com/ " \
+ "--version 1.2.4"
+ self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_instances_list(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn.instances_list(self.cluster_uuid)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ command = "/usr/bin/helm list --output yaml"
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=True)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_status_kdu(self):
+ kdu_instance = "stable-openldap-0005399828"
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn._status_kdu(self.cluster_id, kdu_instance,
+ self.namespace, return_text=True)
+ command = "/usr/bin/helm status {} --output yaml".format(kdu_instance)
+ self.helm_conn._local_async_exec.assert_called_once_with(command=command,
+ env=self.env,
+ raise_exception_on_error=True,
+ show_error_log=False)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_store_status(self):
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ status = {
+ "info": {
+ "description": "Install complete",
+ "status": {
+ "code": "1",
+ "notes": "The openldap helm chart has been installed"
+ }
+ }
+ }
+ self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=status)
+ self.helm_conn.write_app_status_to_db = asynctest.CoroutineMock(return_value=status)
+
+ await self.helm_conn._store_status(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0)
+ self.helm_conn._status_kdu.assert_called_once_with(cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ return_text=False)
+ self.helm_conn.write_app_status_to_db.assert_called_once_with(db_dict=db_dict,
+ status="Install complete",
+ detailed_status=str(status),
+ operation="install")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_reset_uninstall_false(self):
+ self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
+
+ await self.helm_conn.reset(self.cluster_uuid, force=False, uninstall_sw=False)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.file_delete.assert_called_once_with(self.cluster_id,
+ ignore_non_exist=True)
+ self.helm_conn._uninstall_sw.assert_not_called()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_reset_uninstall(self):
+ kdu_instance = 'stable-openldap-0021099429'
+ instances = [
+ {
+ 'app_version': '2.4.48',
+ 'chart': 'openldap-1.2.3',
+ 'name': kdu_instance,
+ 'namespace': self.namespace,
+ 'revision': '1',
+ 'status': 'deployed',
+ 'updated': '2020-10-30 11:11:20.376744191 +0000 UTC'
+ }
+ ]
+ self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
+ self.helm_conn.instances_list = asynctest.CoroutineMock(return_value=instances)
+ self.helm_conn.uninstall = asynctest.CoroutineMock()
+
+ await self.helm_conn.reset(self.cluster_uuid, force=True, uninstall_sw=True)
+ self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.file_delete.assert_called_once_with(self.cluster_id,
+ ignore_non_exist=True)
+ self.helm_conn.instances_list.assert_called_once_with(cluster_uuid=self.cluster_uuid)
+ self.helm_conn.uninstall.assert_called_once_with(cluster_uuid=self.cluster_uuid,
+ kdu_instance=kdu_instance)
+ self.helm_conn._uninstall_sw.assert_called_once_with(self.cluster_id, self.namespace)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_uninstall_sw_namespace(self):
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+ await self.helm_conn._uninstall_sw(self.cluster_id, self.namespace)
+ calls = self.helm_conn._local_async_exec.call_args_list
+ self.assertEqual(len(calls), 3, "To uninstall should have executed three commands")
+ call0_kargs = calls[0][1]
+ command_0 = "/usr/bin/helm --kubeconfig={} --home={} reset".format(self.kube_config,
+ self.helm_home)
+ self.assertEqual(call0_kargs,
+ {"command": command_0,
+ "raise_exception_on_error": True,
+ "env": self.env}, "Invalid args for first call to local_exec")
+ call1_kargs = calls[1][1]
+ command_1 = "/usr/bin/kubectl --kubeconfig={} delete " \
+ "clusterrolebinding.rbac.authorization.k8s.io/osm-tiller-cluster-rule".\
+ format(self.kube_config)
+ self.assertEqual(call1_kargs,
+ {"command": command_1,
+ "raise_exception_on_error": False,
+ "env": self.env}, "Invalid args for second call to local_exec")
+ call2_kargs = calls[2][1]
+ command_2 = "/usr/bin/kubectl --kubeconfig={} --namespace kube-system delete " \
+ "serviceaccount/{}".\
+ format(self.kube_config, self.service_account)
+ self.assertEqual(call2_kargs,
+ {"command": command_2,
+ "raise_exception_on_error": False,
+ "env": self.env}, "Invalid args for third call to local_exec")