From: cubag Date: Wed, 29 Nov 2023 21:07:12 +0000 (+0200) Subject: Revert "Feature 11002: Deprecate helmv2" X-Git-Tag: release-v15.0-start~2 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=dfb624e236597b96658da80fe7436b0f92416cc3;p=osm%2FN2VC.git Revert "Feature 11002: Deprecate helmv2" This reverts commit c81293be8ba0656a5bc7994e4333fedf73b527ff. Change-Id: I89c7d1009c4f059ba497a76557f045434a1d2186 Signed-off-by: Gabriel Cuba --- diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py new file mode 100644 index 0000000..17e960f --- /dev/null +++ b/n2vc/k8s_helm_conn.py @@ -0,0 +1,776 @@ +## +# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. +# This file is part of OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## +import asyncio +from typing import Union +from shlex import quote +import os +import yaml + +from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector +from n2vc.exceptions import K8sException + + +class K8sHelmConnector(K8sHelmBaseConnector): + + """ + #################################################################################### + ################################### P U B L I C #################################### + #################################################################################### + """ + + def __init__( + self, + fs: object, + db: object, + kubectl_command: str = "/usr/bin/kubectl", + helm_command: str = "/usr/bin/helm", + log: object = None, + on_update_db=None, + ): + """ + Initializes helm connector for helm v2 + + :param fs: file system for kubernetes and helm configuration + :param db: database object to write current operation status + :param kubectl_command: path to kubectl executable + :param helm_command: path to helm executable + :param log: logger + :param on_update_db: callback called when k8s connector updates database + """ + + # parent class + K8sHelmBaseConnector.__init__( + self, + db=db, + log=log, + fs=fs, + kubectl_command=kubectl_command, + helm_command=helm_command, + on_update_db=on_update_db, + ) + + self.log.info("Initializing K8S Helm2 connector") + + # initialize helm client-only + self.log.debug("Initializing helm client-only...") + command = "{} init --client-only {} ".format( + self._helm_command, + "--stable-repo-url {}".format(quote(self._stable_repo_url)) + if self._stable_repo_url + else "--skip-repos", + ) + try: + asyncio.create_task( + self._local_async_exec(command=command, raise_exception_on_error=False) + ) + except Exception as e: + self.warning( + msg="helm init failed (it was already initialized): {}".format(e) + ) + + self.log.info("K8S Helm2 connector initialized") + + async def install( + self, + cluster_uuid: str, + kdu_model: str, + kdu_instance: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, + kdu_name: str = None, + namespace: str = None, + **kwargs, + ): + """ + Deploys of a new KDU instance. It would implicitly rely on the `install` call + to deploy the Chart/Bundle properly parametrized (in practice, this call would + happen before any _initial-config-primitive_of the VNF is called). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_model: chart/reference (string), which can be either + of these options: + - a name of chart available via the repos known by OSM + (e.g. stable/openldap, stable/openldap:1.2.4) + - a path to a packaged chart (e.g. mychart.tgz) + - a path to an unpacked chart directory or a URL (e.g. mychart) + :param kdu_instance: Kdu instance name + :param atomic: If set, installation process purges chart/bundle on fail, also + will wait until all the K8s objects are active + :param timeout: Time in seconds to wait for the install of the chart/bundle + (defaults to Helm default timeout: 300s) + :param params: dictionary of key-value pairs for instantiation parameters + (overriding default values) + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} + :param kdu_name: Name of the KDU instance to be installed + :param namespace: K8s namespace to use for the KDU instance + :param kwargs: Additional parameters (None yet) + :return: True if successful + """ + self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) + + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + await self._install_impl( + cluster_uuid, + kdu_model, + paths, + env, + kdu_instance, + atomic=atomic, + timeout=timeout, + params=params, + db_dict=db_dict, + kdu_name=kdu_name, + namespace=namespace, + ) + + # sync fs + self.fs.reverse_sync(from_path=cluster_uuid) + + self.log.debug("Returning kdu_instance {}".format(kdu_instance)) + return True + + async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: + self.log.debug( + "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) + ) + + return await self._exec_inspect_command( + inspect_command="", kdu_model=kdu_model, repo_url=repo_url + ) + + """ + #################################################################################### + ################################### P R I V A T E ################################## + #################################################################################### + """ + + def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): + """ + Creates and returns base cluster and kube dirs and returns them. + Also created helm3 dirs according to new directory specification, paths are + returned and also environment variables that must be provided to execute commands + + Helm 2 directory specification uses helm_home dir: + + The variables assigned for this paths are: + - Helm hone: $HELM_HOME + - helm kubeconfig: $KUBECONFIG + + :param cluster_name: cluster_name + :return: Dictionary with config_paths and dictionary with helm environment variables + """ + base = self.fs.path + if base.endswith("/") or base.endswith("\\"): + base = base[:-1] + + # base dir for cluster + cluster_dir = base + "/" + cluster_name + + # kube dir + kube_dir = cluster_dir + "/" + ".kube" + if create_if_not_exist and not os.path.exists(kube_dir): + self.log.debug("Creating dir {}".format(kube_dir)) + os.makedirs(kube_dir) + + # helm home dir + helm_dir = cluster_dir + "/" + ".helm" + if create_if_not_exist and not os.path.exists(helm_dir): + self.log.debug("Creating dir {}".format(helm_dir)) + os.makedirs(helm_dir) + + config_filename = kube_dir + "/config" + + # 2 - Prepare dictionary with paths + paths = { + "kube_dir": kube_dir, + "kube_config": config_filename, + "cluster_dir": cluster_dir, + "helm_dir": helm_dir, + } + + for file_name, file in paths.items(): + if "dir" in file_name and not os.path.exists(file): + err_msg = "{} dir does not exist".format(file) + self.log.error(err_msg) + raise K8sException(err_msg) + + # 3 - Prepare environment variables + env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename} + + return paths, env + + async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): + # init config, env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + + command1 = "env KUBECONFIG={} {} get manifest {} ".format( + kubeconfig, self._helm_command, quote(kdu_instance) + ) + command2 = "{} get --namespace={} -f -".format( + self.kubectl_command, quote(namespace) + ) + output, _rc = await self._local_async_exec_pipe( + command1, command2, env=env, raise_exception_on_error=True + ) + services = self._parse_services(output) + + return services + + async def _cluster_init( + self, cluster_id: str, namespace: str, paths: dict, env: dict + ): + """ + Implements the helm version dependent cluster initialization: + For helm2 it initialized tiller environment if needed + """ + + # check if tiller pod is up in cluster + command = "{} --kubeconfig={} --namespace={} get deployments".format( + self.kubectl_command, paths["kube_config"], quote(namespace) + ) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) + + output_table = self._output_to_table(output=output) + + # find 'tiller' pod in all pods + already_initialized = False + try: + for row in output_table: + if row[0].startswith("tiller-deploy"): + already_initialized = True + break + except Exception: + pass + + # helm init + n2vc_installed_sw = False + if not already_initialized: + self.log.info( + "Initializing helm in client and server: {}".format(cluster_id) + ) + command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format( + self.kubectl_command, paths["kube_config"], quote(self.service_account) + ) + _, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + + command = ( + "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule " + "--clusterrole=cluster-admin --serviceaccount=kube-system:{}" + ).format( + self.kubectl_command, paths["kube_config"], quote(self.service_account) + ) + _, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + + command = ( + "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} " + " {} init" + ).format( + self._helm_command, + paths["kube_config"], + quote(namespace), + quote(paths["helm_dir"]), + quote(self.service_account), + "--stable-repo-url {}".format(quote(self._stable_repo_url)) + if self._stable_repo_url + else "--skip-repos", + ) + _, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) + n2vc_installed_sw = True + else: + # check client helm installation + check_file = paths["helm_dir"] + "/repository/repositories.yaml" + if not self._check_file_exists( + filename=check_file, exception_if_not_exists=False + ): + self.log.info("Initializing helm in client: {}".format(cluster_id)) + command = ( + "{} --kubeconfig={} --tiller-namespace={} " + "--home={} init --client-only {} " + ).format( + self._helm_command, + paths["kube_config"], + quote(namespace), + quote(paths["helm_dir"]), + "--stable-repo-url {}".format(quote(self._stable_repo_url)) + if self._stable_repo_url + else "--skip-repos", + ) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) + else: + self.log.info("Helm client already initialized") + + repo_list = await self.repo_list(cluster_id) + for repo in repo_list: + if repo["name"] == "stable" and repo["url"] != self._stable_repo_url: + self.log.debug("Add new stable repo url: {}") + await self.repo_remove(cluster_id, "stable") + if self._stable_repo_url: + await self.repo_add(cluster_id, "stable", self._stable_repo_url) + break + + return n2vc_installed_sw + + async def _uninstall_sw(self, cluster_id: str, namespace: str): + # uninstall Tiller if necessary + + self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) + + # init paths, env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + + if not namespace: + # find namespace for tiller pod + command = "{} --kubeconfig={} get deployments --all-namespaces".format( + self.kubectl_command, quote(paths["kube_config"]) + ) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + output_table = self._output_to_table(output=output) + namespace = None + for r in output_table: + try: + if "tiller-deploy" in r[1]: + namespace = r[0] + break + except Exception: + pass + else: + msg = "Tiller deployment not found in cluster {}".format(cluster_id) + self.log.error(msg) + + self.log.debug("namespace for tiller: {}".format(namespace)) + + if namespace: + # uninstall tiller from cluster + self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) + command = "{} --kubeconfig={} --home={} reset".format( + self._helm_command, + quote(paths["kube_config"]), + quote(paths["helm_dir"]), + ) + self.log.debug("resetting: {}".format(command)) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) + # Delete clusterrolebinding and serviceaccount. + # Ignore if errors for backward compatibility + command = ( + "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s." + "io/osm-tiller-cluster-rule" + ).format(self.kubectl_command, quote(paths["kube_config"])) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + command = ( + "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format( + self.kubectl_command, + quote(paths["kube_config"]), + quote(namespace), + quote(self.service_account), + ) + ) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + + else: + self.log.debug("namespace not found") + + async def _instances_list(self, cluster_id): + # init paths, env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + + command = "{} list --output yaml".format(self._helm_command) + + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) + + if output and len(output) > 0: + # parse yaml and update keys to lower case to unify with helm3 + instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases") + new_instances = [] + for instance in instances: + new_instance = dict((k.lower(), v) for k, v in instance.items()) + new_instances.append(new_instance) + return new_instances + else: + return [] + + def _get_inspect_command( + self, show_command: str, kdu_model: str, repo_str: str, version: str + ): + inspect_command = "{} inspect {} {}{} {}".format( + self._helm_command, show_command, quote(kdu_model), repo_str, version + ) + return inspect_command + + def _get_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format( + kubeconfig, self._helm_command, get_command, quote(kdu_instance) + ) + return get_command + + async def _status_kdu( + self, + cluster_id: str, + kdu_instance: str, + namespace: str = None, + yaml_format: bool = False, + show_error_log: bool = False, + ) -> Union[str, dict]: + self.log.debug( + "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) + ) + + # init config, env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + command = ("env KUBECONFIG={} {} status {} --output yaml").format( + paths["kube_config"], self._helm_command, quote(kdu_instance) + ) + output, rc = await self._local_async_exec( + command=command, + raise_exception_on_error=True, + show_error_log=show_error_log, + env=env, + ) + + if yaml_format: + return str(output) + + if rc != 0: + return None + + data = yaml.load(output, Loader=yaml.SafeLoader) + + # remove field 'notes' + try: + del data.get("info").get("status")["notes"] + except KeyError: + pass + + # parse the manifest to a list of dictionaries + if "manifest" in data: + manifest_str = data.get("manifest") + manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader) + + data["manifest"] = [] + for doc in manifest_docs: + data["manifest"].append(doc) + + # parse field 'resources' + try: + resources = str(data.get("info").get("status").get("resources")) + resource_table = self._output_to_table(resources) + data.get("info").get("status")["resources"] = resource_table + except Exception: + pass + + # set description to lowercase (unify with helm3) + try: + data.get("info")["description"] = data.get("info").pop("Description") + except KeyError: + pass + + return data + + def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: + repo_ids = [] + cluster_filter = {"_admin.helm-chart.id": cluster_uuid} + cluster = self.db.get_one("k8sclusters", cluster_filter) + if cluster: + repo_ids = cluster.get("_admin").get("helm_chart_repos") or [] + return repo_ids + else: + raise K8sException( + "k8cluster with helm-id : {} not found".format(cluster_uuid) + ) + + async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool: + # init config, env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + + status = await self._status_kdu( + cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False + ) + + # extract info.status.resources-> str + # format: + # ==> v1/Deployment + # NAME READY UP-TO-DATE AVAILABLE AGE + # halting-horse-mongodb 0/1 1 0 0s + # halting-petit-mongodb 1/1 1 0 0s + # blank line + resources = K8sHelmBaseConnector._get_deep( + status, ("info", "status", "resources") + ) + + # convert to table + resources = K8sHelmBaseConnector._output_to_table(resources) + + num_lines = len(resources) + index = 0 + ready = True + while index < num_lines: + try: + line1 = resources[index] + index += 1 + # find '==>' in column 0 + if line1[0] == "==>": + line2 = resources[index] + index += 1 + # find READY in column 1 + if line2[1] == "READY": + # read next lines + line3 = resources[index] + index += 1 + while len(line3) > 1 and index < num_lines: + ready_value = line3[1] + parts = ready_value.split(sep="/") + current = int(parts[0]) + total = int(parts[1]) + if current < total: + self.log.debug("NOT READY:\n {}".format(line3)) + ready = False + line3 = resources[index] + index += 1 + + except Exception: + pass + + return ready + + def _get_install_command( + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, + ) -> str: + timeout_str = "" + if timeout: + timeout_str = "--timeout {}".format(timeout) + + # atomic + atomic_str = "" + if atomic: + atomic_str = "--atomic" + # namespace + namespace_str = "" + if namespace: + namespace_str = "--namespace {}".format(quote(namespace)) + + # version + version_str = "" + if version: + version_str = "--version {}".format(version) + + command = ( + "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml " + "{params} {timeout} --name={name} {ns} {model} {ver}".format( + kubeconfig=kubeconfig, + helm=self._helm_command, + atomic=atomic_str, + params=params_str, + timeout=timeout_str, + name=quote(kdu_instance), + ns=namespace_str, + model=quote(kdu_model), + ver=version_str, + ) + ) + return command + + def _get_upgrade_scale_command( + self, + kdu_model: str, + kdu_instance: str, + namespace: str, + scale: int, + version: str, + atomic: bool, + replica_str: str, + timeout: float, + resource_name: str, + kubeconfig: str, + ) -> str: + """Generates the command to scale a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + scale (int): Scale count + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + replica_str (str): The key under resource_name key where the scale count is stored + timeout (float): The time, in seconds, to wait + resource_name (str): The KDU's resource to scale + kubeconfig (str): Kubeconfig file path + + Returns: + str: command to scale a Helm Chart release + """ + + # scale + if resource_name: + scale_dict = {"{}.{}".format(resource_name, replica_str): scale} + else: + scale_dict = {replica_str: scale} + + scale_str = self._params_to_set_option(scale_dict) + + return self._get_upgrade_command( + kdu_model=kdu_model, + kdu_instance=kdu_instance, + namespace=namespace, + params_str=scale_str, + version=version, + atomic=atomic, + timeout=timeout, + kubeconfig=kubeconfig, + ) + + def _get_upgrade_command( + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, + force: bool = False, + ) -> str: + """Generates the command to upgrade a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + params_str (str): Params used to upgrade the Helm Chart release + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + timeout (float): The time, in seconds, to wait + kubeconfig (str): Kubeconfig file path + force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. + Returns: + str: command to upgrade a Helm Chart release + """ + + timeout_str = "" + if timeout: + timeout_str = "--timeout {}".format(timeout) + + # atomic + atomic_str = "" + if atomic: + atomic_str = "--atomic" + + # force + force_str = "" + if force: + force_str = "--force " + + # version + version_str = "" + if version: + version_str = "--version {}".format(quote(version)) + + # namespace + namespace_str = "" + if namespace: + namespace_str = "--namespace {}".format(quote(namespace)) + + command = ( + "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}" + "--reuse-values {name} {model} {ver}" + ).format( + kubeconfig=kubeconfig, + helm=self._helm_command, + namespace=namespace_str, + atomic=atomic_str, + force=force_str, + params=params_str, + timeout=timeout_str, + name=quote(kdu_instance), + model=quote(kdu_model), + ver=version_str, + ) + return command + + def _get_rollback_command( + self, kdu_instance, namespace, revision, kubeconfig + ) -> str: + return "env KUBECONFIG={} {} rollback {} {} --wait".format( + kubeconfig, self._helm_command, quote(kdu_instance), revision + ) + + def _get_uninstall_command( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: + return "env KUBECONFIG={} {} delete --purge {}".format( + kubeconfig, self._helm_command, quote(kdu_instance) + ) diff --git a/n2vc/tests/unit/test_k8s_helm_conn.py b/n2vc/tests/unit/test_k8s_helm_conn.py new file mode 100644 index 0000000..161471a --- /dev/null +++ b/n2vc/tests/unit/test_k8s_helm_conn.py @@ -0,0 +1,740 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact: alfonso.tiernosepulveda@telefonica.com +## + +import asynctest +import logging + +from asynctest.mock import Mock +from osm_common.dbmemory import DbMemory +from osm_common.fslocal import FsLocal +from n2vc.k8s_helm_conn import K8sHelmConnector + +__author__ = "Isabel Lloret " + + +class TestK8sHelmConn(asynctest.TestCase): + logging.basicConfig(level=logging.DEBUG) + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) + + async def setUp(self): + self.db = Mock(DbMemory()) + self.fs = asynctest.Mock(FsLocal()) + self.fs.path = "./tmp/" + self.namespace = "testk8s" + self.service_account = "osm" + self.cluster_id = "helm_cluster_id" + self.cluster_uuid = self.cluster_id + # pass fake kubectl and helm commands to make sure it does not call actual commands + K8sHelmConnector._check_file_exists = asynctest.Mock(return_value=True) + K8sHelmConnector._local_async_exec = asynctest.CoroutineMock( + return_value=(0, "") + ) + cluster_dir = self.fs.path + self.cluster_id + self.kube_config = self.fs.path + self.cluster_id + "/.kube/config" + self.helm_home = self.fs.path + self.cluster_id + "/.helm" + self.env = { + "HELM_HOME": "{}/.helm".format(cluster_dir), + "KUBECONFIG": "{}/.kube/config".format(cluster_dir), + } + self.helm_conn = K8sHelmConnector(self.fs, self.db, log=self.logger) + self.logger.debug("Set up executed") + + @asynctest.fail_on(active_handles=True) + async def test_init_env(self): + # TODO + pass + + @asynctest.fail_on(active_handles=True) + async def test_repo_add(self): + repo_name = "bitnami" + repo_url = "https://charts.bitnami.com/bitnami" + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + await self.helm_conn.repo_add(self.cluster_uuid, repo_name, repo_url) + + self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_called_once_with( + from_path=self.cluster_id + ) + self.assertEqual( + self.helm_conn._local_async_exec.call_count, + 2, + "local_async_exec expected 2 calls, called {}".format( + self.helm_conn._local_async_exec.call_count + ), + ) + + repo_update_command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo update {}" + ).format(repo_name) + repo_add_command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo add {} {}" + ).format(repo_name, repo_url) + calls = self.helm_conn._local_async_exec.call_args_list + call0_kargs = calls[0][1] + self.assertEqual( + call0_kargs.get("command"), + repo_add_command, + "Invalid repo add command: {}".format(call0_kargs.get("command")), + ) + self.assertEqual( + call0_kargs.get("env"), + self.env, + "Invalid env for add command: {}".format(call0_kargs.get("env")), + ) + call1_kargs = calls[1][1] + self.assertEqual( + call1_kargs.get("command"), + repo_update_command, + "Invalid repo update command: {}".format(call1_kargs.get("command")), + ) + self.assertEqual( + call1_kargs.get("env"), + self.env, + "Invalid env for update command: {}".format(call1_kargs.get("env")), + ) + + @asynctest.fail_on(active_handles=True) + async def test_repo_list(self): + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + await self.helm_conn.repo_list(self.cluster_uuid) + + self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_called_once_with( + from_path=self.cluster_id + ) + command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo list --output yaml" + self.helm_conn._local_async_exec.assert_called_with( + command=command, env=self.env, raise_exception_on_error=False + ) + + @asynctest.fail_on(active_handles=True) + async def test_repo_remove(self): + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + repo_name = "bitnami" + await self.helm_conn.repo_remove(self.cluster_uuid, repo_name) + + self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_called_once_with( + from_path=self.cluster_id + ) + command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo remove {}".format( + repo_name + ) + self.helm_conn._local_async_exec.assert_called_once_with( + command=command, env=self.env, raise_exception_on_error=True + ) + + @asynctest.fail_on(active_handles=True) + async def test_install(self): + kdu_model = "stable/openldap:1.2.2" + kdu_instance = "stable-openldap-0005399828" + db_dict = {} + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=None) + self.helm_conn._store_status = asynctest.CoroutineMock() + self.helm_conn.generate_kdu_instance_name = Mock(return_value=kdu_instance) + + await self.helm_conn.install( + self.cluster_uuid, + kdu_model, + kdu_instance, + atomic=True, + namespace=self.namespace, + db_dict=db_dict, + ) + + self.helm_conn.fs.sync.assert_has_calls( + [ + asynctest.call(from_path=self.cluster_id), + asynctest.call(from_path=self.cluster_id), + ] + ) + self.helm_conn.fs.reverse_sync.assert_has_calls( + [ + asynctest.call(from_path=self.cluster_id), + asynctest.call(from_path=self.cluster_id), + ] + ) + self.helm_conn._store_status.assert_called_with( + cluster_id=self.cluster_id, + kdu_instance=kdu_instance, + namespace=self.namespace, + db_dict=db_dict, + operation="install", + ) + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm install " + "--atomic --output yaml --timeout 300 " + "--name=stable-openldap-0005399828 --namespace testk8s stable/openldap " + "--version 1.2.2" + ) + self.helm_conn._local_async_exec.assert_called_with( + command=command, env=self.env, raise_exception_on_error=False + ) + + @asynctest.fail_on(active_handles=True) + async def test_upgrade_force_true(self): + kdu_model = "stable/openldap:1.2.3" + kdu_instance = "stable-openldap-0005399828" + db_dict = {} + instance_info = { + "chart": "openldap-1.2.2", + "name": kdu_instance, + "namespace": self.namespace, + "revision": 1, + "status": "DEPLOYED", + } + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + self.helm_conn._store_status = asynctest.CoroutineMock() + self.helm_conn.get_instance_info = asynctest.CoroutineMock( + return_value=instance_info + ) + # TEST-1 (--force true) + await self.helm_conn.upgrade( + self.cluster_uuid, + kdu_instance, + kdu_model, + atomic=True, + db_dict=db_dict, + force=True, + ) + self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_has_calls( + [ + asynctest.call(from_path=self.cluster_id), + asynctest.call(from_path=self.cluster_id), + ] + ) + self.helm_conn._store_status.assert_called_with( + cluster_id=self.cluster_id, + kdu_instance=kdu_instance, + namespace=self.namespace, + db_dict=db_dict, + operation="upgrade", + ) + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm upgrade --namespace testk8s " + "--atomic --output yaml --timeout 300 --force --reuse-values stable-openldap-0005399828 stable/openldap " + "--version 1.2.3" + ) + self.helm_conn._local_async_exec.assert_called_with( + command=command, env=self.env, raise_exception_on_error=False + ) + # TEST-2 (--force false) + await self.helm_conn.upgrade( + self.cluster_uuid, + kdu_instance, + kdu_model, + atomic=True, + db_dict=db_dict, + ) + self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_has_calls( + [ + asynctest.call(from_path=self.cluster_id), + asynctest.call(from_path=self.cluster_id), + ] + ) + self.helm_conn._store_status.assert_called_with( + cluster_id=self.cluster_id, + kdu_instance=kdu_instance, + namespace=self.namespace, + db_dict=db_dict, + operation="upgrade", + ) + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm upgrade --namespace testk8s " + "--atomic --output yaml --timeout 300 --reuse-values stable-openldap-0005399828 stable/openldap " + "--version 1.2.3" + ) + self.helm_conn._local_async_exec.assert_called_with( + command=command, env=self.env, raise_exception_on_error=False + ) + + @asynctest.fail_on(active_handles=True) + async def test_upgrade_namespace(self): + kdu_model = "stable/openldap:1.2.3" + kdu_instance = "stable-openldap-0005399828" + db_dict = {} + instance_info = { + "chart": "openldap-1.2.2", + "name": kdu_instance, + "namespace": self.namespace, + "revision": 1, + "status": "DEPLOYED", + } + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + self.helm_conn._store_status = asynctest.CoroutineMock() + self.helm_conn.get_instance_info = asynctest.CoroutineMock( + return_value=instance_info + ) + + await self.helm_conn.upgrade( + self.cluster_uuid, + kdu_instance, + kdu_model, + atomic=True, + db_dict=db_dict, + namespace="default", + ) + self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_has_calls( + [ + asynctest.call(from_path=self.cluster_id), + asynctest.call(from_path=self.cluster_id), + ] + ) + self.helm_conn._store_status.assert_called_with( + cluster_id=self.cluster_id, + kdu_instance=kdu_instance, + namespace="default", + db_dict=db_dict, + operation="upgrade", + ) + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm upgrade --namespace default " + "--atomic --output yaml --timeout 300 --reuse-values stable-openldap-0005399828 stable/openldap " + "--version 1.2.3" + ) + self.helm_conn._local_async_exec.assert_called_with( + command=command, env=self.env, raise_exception_on_error=False + ) + + @asynctest.fail_on(active_handles=True) + async def test_scale(self): + kdu_model = "stable/openldap:1.2.3" + kdu_instance = "stable-openldap-0005399828" + db_dict = {} + instance_info = { + "chart": "openldap-1.2.3", + "name": kdu_instance, + "namespace": self.namespace, + "revision": 1, + "status": "DEPLOYED", + } + repo_list = [ + { + "name": "stable", + "url": "https://kubernetes-charts.storage.googleapis.com/", + } + ] + kdu_values = """ + # Default values for openldap. + # This is a YAML-formatted file. + # Declare variables to be passed into your templates. + + replicaCount: 1 + dummy-app: + replicas: 2 + """ + + self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=repo_list) + self.helm_conn.values_kdu = asynctest.CoroutineMock(return_value=kdu_values) + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + self.helm_conn._store_status = asynctest.CoroutineMock() + self.helm_conn.get_instance_info = asynctest.CoroutineMock( + return_value=instance_info + ) + + # TEST-1 + await self.helm_conn.scale( + kdu_instance, + 2, + "", + kdu_model=kdu_model, + cluster_uuid=self.cluster_uuid, + atomic=True, + db_dict=db_dict, + ) + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config " + "/usr/bin/helm upgrade --namespace testk8s --atomic --output yaml --set replicaCount=2 " + "--timeout 1800 --reuse-values stable-openldap-0005399828 stable/openldap " + "--version 1.2.3" + ) + self.helm_conn._local_async_exec.assert_called_once_with( + command=command, env=self.env, raise_exception_on_error=False + ) + + # TEST-2 + await self.helm_conn.scale( + kdu_instance, + 3, + "dummy-app", + kdu_model=kdu_model, + cluster_uuid=self.cluster_uuid, + atomic=True, + db_dict=db_dict, + ) + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config " + "/usr/bin/helm upgrade --namespace testk8s --atomic --output yaml --set dummy-app.replicas=3 " + "--timeout 1800 --reuse-values stable-openldap-0005399828 stable/openldap " + "--version 1.2.3" + ) + self.helm_conn._local_async_exec.assert_called_with( + command=command, env=self.env, raise_exception_on_error=False + ) + self.helm_conn.fs.reverse_sync.assert_called_with(from_path=self.cluster_id) + self.helm_conn._store_status.assert_called_with( + cluster_id=self.cluster_id, + kdu_instance=kdu_instance, + namespace=self.namespace, + db_dict=db_dict, + operation="scale", + ) + + @asynctest.fail_on(active_handles=True) + async def test_rollback(self): + kdu_instance = "stable-openldap-0005399828" + db_dict = {} + instance_info = { + "chart": "openldap-1.2.3", + "name": kdu_instance, + "namespace": self.namespace, + "revision": 2, + "status": "DEPLOYED", + } + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + self.helm_conn._store_status = asynctest.CoroutineMock() + self.helm_conn.get_instance_info = asynctest.CoroutineMock( + return_value=instance_info + ) + + await self.helm_conn.rollback( + self.cluster_uuid, kdu_instance=kdu_instance, revision=1, db_dict=db_dict + ) + self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_called_once_with( + from_path=self.cluster_id + ) + self.helm_conn._store_status.assert_called_with( + cluster_id=self.cluster_id, + kdu_instance=kdu_instance, + namespace=self.namespace, + db_dict=db_dict, + operation="rollback", + ) + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config " + "/usr/bin/helm rollback stable-openldap-0005399828 1 --wait" + ) + self.helm_conn._local_async_exec.assert_called_once_with( + command=command, env=self.env, raise_exception_on_error=False + ) + + @asynctest.fail_on(active_handles=True) + async def test_uninstall(self): + kdu_instance = "stable-openldap-0005399828" + instance_info = { + "chart": "openldap-1.2.2", + "name": kdu_instance, + "namespace": self.namespace, + "revision": 3, + "status": "DEPLOYED", + } + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + self.helm_conn._store_status = asynctest.CoroutineMock() + self.helm_conn.get_instance_info = asynctest.CoroutineMock( + return_value=instance_info + ) + + await self.helm_conn.uninstall(self.cluster_uuid, kdu_instance) + self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_called_once_with( + from_path=self.cluster_id + ) + command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm delete --purge {}".format( + kdu_instance + ) + self.helm_conn._local_async_exec.assert_called_once_with( + command=command, env=self.env, raise_exception_on_error=True + ) + + @asynctest.fail_on(active_handles=True) + async def test_get_services(self): + kdu_instance = "test_services_1" + service = {"name": "testservice", "type": "LoadBalancer"} + self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock( + return_value=("", 0) + ) + self.helm_conn._parse_services = Mock(return_value=["testservice"]) + self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service) + + services = await self.helm_conn.get_services( + self.cluster_uuid, kdu_instance, self.namespace + ) + self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_called_once_with( + from_path=self.cluster_id + ) + self.helm_conn._parse_services.assert_called_once() + command1 = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get manifest {} ".format( + kdu_instance + ) + command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace) + self.helm_conn._local_async_exec_pipe.assert_called_once_with( + command1, command2, env=self.env, raise_exception_on_error=True + ) + self.assertEqual( + services, [service], "Invalid service returned from get_service" + ) + + @asynctest.fail_on(active_handles=True) + async def test_get_service(self): + service_name = "service1" + + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + await self.helm_conn.get_service( + self.cluster_uuid, service_name, self.namespace + ) + + self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_called_once_with( + from_path=self.cluster_id + ) + command = ( + "/usr/bin/kubectl --kubeconfig=./tmp/helm_cluster_id/.kube/config " + "--namespace=testk8s get service service1 -o=yaml" + ) + self.helm_conn._local_async_exec.assert_called_once_with( + command=command, env=self.env, raise_exception_on_error=True + ) + + @asynctest.fail_on(active_handles=True) + async def test_inspect_kdu(self): + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + kdu_model = "stable/openldap:1.2.4" + repo_url = "https://kubernetes-charts.storage.googleapis.com/" + await self.helm_conn.inspect_kdu(kdu_model, repo_url) + + command = ( + "/usr/bin/helm inspect openldap --repo " + "https://kubernetes-charts.storage.googleapis.com/ " + "--version 1.2.4" + ) + self.helm_conn._local_async_exec.assert_called_with(command=command) + + @asynctest.fail_on(active_handles=True) + async def test_help_kdu(self): + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + kdu_model = "stable/openldap:1.2.4" + repo_url = "https://kubernetes-charts.storage.googleapis.com/" + await self.helm_conn.help_kdu(kdu_model, repo_url) + + command = ( + "/usr/bin/helm inspect readme openldap --repo " + "https://kubernetes-charts.storage.googleapis.com/ " + "--version 1.2.4" + ) + self.helm_conn._local_async_exec.assert_called_with(command=command) + + @asynctest.fail_on(active_handles=True) + async def test_values_kdu(self): + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + kdu_model = "stable/openldap:1.2.4" + repo_url = "https://kubernetes-charts.storage.googleapis.com/" + await self.helm_conn.values_kdu(kdu_model, repo_url) + + command = ( + "/usr/bin/helm inspect values openldap --repo " + "https://kubernetes-charts.storage.googleapis.com/ " + "--version 1.2.4" + ) + self.helm_conn._local_async_exec.assert_called_with(command=command) + + @asynctest.fail_on(active_handles=True) + async def test_get_values_kdu(self): + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + kdu_instance = "stable-openldap-0005399828" + await self.helm_conn.get_values_kdu( + kdu_instance, self.namespace, self.env["KUBECONFIG"] + ) + + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get values " + "stable-openldap-0005399828 --output yaml" + ) + self.helm_conn._local_async_exec.assert_called_with(command=command) + + @asynctest.fail_on(active_handles=True) + async def test_instances_list(self): + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + await self.helm_conn.instances_list(self.cluster_uuid) + self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id) + self.helm_conn.fs.reverse_sync.assert_called_once_with( + from_path=self.cluster_id + ) + command = "/usr/bin/helm list --output yaml" + self.helm_conn._local_async_exec.assert_called_once_with( + command=command, env=self.env, raise_exception_on_error=True + ) + + @asynctest.fail_on(active_handles=True) + async def test_status_kdu(self): + kdu_instance = "stable-openldap-0005399828" + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + await self.helm_conn._status_kdu( + self.cluster_id, kdu_instance, self.namespace, yaml_format=True + ) + command = ( + "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm status {} --output yaml" + ).format(kdu_instance) + self.helm_conn._local_async_exec.assert_called_once_with( + command=command, + env=self.env, + raise_exception_on_error=True, + show_error_log=False, + ) + + @asynctest.fail_on(active_handles=True) + async def test_store_status(self): + kdu_instance = "stable-openldap-0005399828" + db_dict = {} + status = { + "info": { + "description": "Install complete", + "status": { + "code": "1", + "notes": "The openldap helm chart has been installed", + }, + } + } + self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=status) + self.helm_conn.write_app_status_to_db = asynctest.CoroutineMock( + return_value=status + ) + + await self.helm_conn._store_status( + cluster_id=self.cluster_id, + kdu_instance=kdu_instance, + namespace=self.namespace, + db_dict=db_dict, + operation="install", + ) + self.helm_conn._status_kdu.assert_called_once_with( + cluster_id=self.cluster_id, + kdu_instance=kdu_instance, + namespace=self.namespace, + yaml_format=False, + ) + self.helm_conn.write_app_status_to_db.assert_called_once_with( + db_dict=db_dict, + status="Install complete", + detailed_status=str(status), + operation="install", + ) + + @asynctest.fail_on(active_handles=True) + async def test_reset_uninstall_false(self): + self.helm_conn._uninstall_sw = asynctest.CoroutineMock() + + await self.helm_conn.reset(self.cluster_uuid, force=False, uninstall_sw=False) + self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id) + self.helm_conn.fs.file_delete.assert_called_once_with( + self.cluster_id, ignore_non_exist=True + ) + self.helm_conn._uninstall_sw.assert_not_called() + + @asynctest.fail_on(active_handles=True) + async def test_reset_uninstall(self): + kdu_instance = "stable-openldap-0021099429" + instances = [ + { + "app_version": "2.4.48", + "chart": "openldap-1.2.3", + "name": kdu_instance, + "namespace": self.namespace, + "revision": "1", + "status": "deployed", + "updated": "2020-10-30 11:11:20.376744191 +0000 UTC", + } + ] + self.helm_conn._get_namespace = Mock(return_value=self.namespace) + self.helm_conn._uninstall_sw = asynctest.CoroutineMock() + self.helm_conn.instances_list = asynctest.CoroutineMock(return_value=instances) + self.helm_conn.uninstall = asynctest.CoroutineMock() + + await self.helm_conn.reset(self.cluster_uuid, force=True, uninstall_sw=True) + self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id) + self.helm_conn.fs.file_delete.assert_called_once_with( + self.cluster_id, ignore_non_exist=True + ) + self.helm_conn._get_namespace.assert_called_once_with( + cluster_uuid=self.cluster_uuid + ) + self.helm_conn.instances_list.assert_called_once_with( + cluster_uuid=self.cluster_uuid + ) + self.helm_conn.uninstall.assert_called_once_with( + cluster_uuid=self.cluster_uuid, kdu_instance=kdu_instance + ) + self.helm_conn._uninstall_sw.assert_called_once_with( + cluster_id=self.cluster_id, namespace=self.namespace + ) + + @asynctest.fail_on(active_handles=True) + async def test_uninstall_sw_namespace(self): + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) + + await self.helm_conn._uninstall_sw(self.cluster_id, self.namespace) + calls = self.helm_conn._local_async_exec.call_args_list + self.assertEqual( + len(calls), 3, "To uninstall should have executed three commands" + ) + call0_kargs = calls[0][1] + command_0 = "/usr/bin/helm --kubeconfig={} --home={} reset".format( + self.kube_config, self.helm_home + ) + self.assertEqual( + call0_kargs, + {"command": command_0, "raise_exception_on_error": True, "env": self.env}, + "Invalid args for first call to local_exec", + ) + call1_kargs = calls[1][1] + command_1 = ( + "/usr/bin/kubectl --kubeconfig={} delete " + "clusterrolebinding.rbac.authorization.k8s.io/osm-tiller-cluster-rule".format( + self.kube_config + ) + ) + self.assertEqual( + call1_kargs, + {"command": command_1, "raise_exception_on_error": False, "env": self.env}, + "Invalid args for second call to local_exec", + ) + call2_kargs = calls[2][1] + command_2 = ( + "/usr/bin/kubectl --kubeconfig={} --namespace {} delete " + "serviceaccount/{}".format( + self.kube_config, self.namespace, self.service_account + ) + ) + self.assertEqual( + call2_kargs, + {"command": command_2, "raise_exception_on_error": False, "env": self.env}, + "Invalid args for third call to local_exec", + )