| ## |
| # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. |
| # This file is part of OSM |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact with: nfvlabs@tid.es |
| ## |
| import asyncio |
| from typing import Union |
| from shlex import quote |
| import os |
| import yaml |
| |
| from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector |
| from n2vc.exceptions import K8sException |
| |
| |
| class K8sHelmConnector(K8sHelmBaseConnector): |
| |
| """ |
| #################################################################################### |
| ################################### P U B L I C #################################### |
| #################################################################################### |
| """ |
| |
| def __init__( |
| self, |
| fs: object, |
| db: object, |
| kubectl_command: str = "/usr/bin/kubectl", |
| helm_command: str = "/usr/bin/helm", |
| log: object = None, |
| on_update_db=None, |
| ): |
| """ |
| Initializes helm connector for helm v2 |
| |
| :param fs: file system for kubernetes and helm configuration |
| :param db: database object to write current operation status |
| :param kubectl_command: path to kubectl executable |
| :param helm_command: path to helm executable |
| :param log: logger |
| :param on_update_db: callback called when k8s connector updates database |
| """ |
| |
| # parent class |
| K8sHelmBaseConnector.__init__( |
| self, |
| db=db, |
| log=log, |
| fs=fs, |
| kubectl_command=kubectl_command, |
| helm_command=helm_command, |
| on_update_db=on_update_db, |
| ) |
| |
| self.log.info("Initializing K8S Helm2 connector") |
| |
| # initialize helm client-only |
| self.log.debug("Initializing helm client-only...") |
| command = "{} init --client-only {} ".format( |
| self._helm_command, |
| "--stable-repo-url {}".format(quote(self._stable_repo_url)) |
| if self._stable_repo_url |
| else "--skip-repos", |
| ) |
| try: |
| asyncio.create_task( |
| self._local_async_exec(command=command, raise_exception_on_error=False) |
| ) |
| except Exception as e: |
| self.warning( |
| msg="helm init failed (it was already initialized): {}".format(e) |
| ) |
| |
| self.log.info("K8S Helm2 connector initialized") |
| |
| async def install( |
| self, |
| cluster_uuid: str, |
| kdu_model: str, |
| kdu_instance: str, |
| atomic: bool = True, |
| timeout: float = 300, |
| params: dict = None, |
| db_dict: dict = None, |
| kdu_name: str = None, |
| namespace: str = None, |
| **kwargs, |
| ): |
| """ |
| Deploys of a new KDU instance. It would implicitly rely on the `install` call |
| to deploy the Chart/Bundle properly parametrized (in practice, this call would |
| happen before any _initial-config-primitive_of the VNF is called). |
| |
| :param cluster_uuid: UUID of a K8s cluster known by OSM |
| :param kdu_model: chart/reference (string), which can be either |
| of these options: |
| - a name of chart available via the repos known by OSM |
| (e.g. stable/openldap, stable/openldap:1.2.4) |
| - a path to a packaged chart (e.g. mychart.tgz) |
| - a path to an unpacked chart directory or a URL (e.g. mychart) |
| :param kdu_instance: Kdu instance name |
| :param atomic: If set, installation process purges chart/bundle on fail, also |
| will wait until all the K8s objects are active |
| :param timeout: Time in seconds to wait for the install of the chart/bundle |
| (defaults to Helm default timeout: 300s) |
| :param params: dictionary of key-value pairs for instantiation parameters |
| (overriding default values) |
| :param dict db_dict: where to write into database when the status changes. |
| It contains a dict with {collection: <str>, filter: {}, |
| path: <str>}, |
| e.g. {collection: "nsrs", filter: |
| {_id: <nsd-id>, path: "_admin.deployed.K8S.3"} |
| :param kdu_name: Name of the KDU instance to be installed |
| :param namespace: K8s namespace to use for the KDU instance |
| :param kwargs: Additional parameters (None yet) |
| :return: True if successful |
| """ |
| self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) |
| |
| # sync local dir |
| self.fs.sync(from_path=cluster_uuid) |
| |
| # init env, paths |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_uuid, create_if_not_exist=True |
| ) |
| |
| await self._install_impl( |
| cluster_uuid, |
| kdu_model, |
| paths, |
| env, |
| kdu_instance, |
| atomic=atomic, |
| timeout=timeout, |
| params=params, |
| db_dict=db_dict, |
| kdu_name=kdu_name, |
| namespace=namespace, |
| ) |
| |
| # sync fs |
| self.fs.reverse_sync(from_path=cluster_uuid) |
| |
| self.log.debug("Returning kdu_instance {}".format(kdu_instance)) |
| return True |
| |
| async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
| self.log.debug( |
| "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) |
| ) |
| |
| return await self._exec_inspect_command( |
| inspect_command="", kdu_model=kdu_model, repo_url=repo_url |
| ) |
| |
| """ |
| #################################################################################### |
| ################################### P R I V A T E ################################## |
| #################################################################################### |
| """ |
| |
| def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): |
| """ |
| Creates and returns base cluster and kube dirs and returns them. |
| Also created helm3 dirs according to new directory specification, paths are |
| returned and also environment variables that must be provided to execute commands |
| |
| Helm 2 directory specification uses helm_home dir: |
| |
| The variables assigned for this paths are: |
| - Helm hone: $HELM_HOME |
| - helm kubeconfig: $KUBECONFIG |
| |
| :param cluster_name: cluster_name |
| :return: Dictionary with config_paths and dictionary with helm environment variables |
| """ |
| base = self.fs.path |
| if base.endswith("/") or base.endswith("\\"): |
| base = base[:-1] |
| |
| # base dir for cluster |
| cluster_dir = base + "/" + cluster_name |
| |
| # kube dir |
| kube_dir = cluster_dir + "/" + ".kube" |
| if create_if_not_exist and not os.path.exists(kube_dir): |
| self.log.debug("Creating dir {}".format(kube_dir)) |
| os.makedirs(kube_dir) |
| |
| # helm home dir |
| helm_dir = cluster_dir + "/" + ".helm" |
| if create_if_not_exist and not os.path.exists(helm_dir): |
| self.log.debug("Creating dir {}".format(helm_dir)) |
| os.makedirs(helm_dir) |
| |
| config_filename = kube_dir + "/config" |
| |
| # 2 - Prepare dictionary with paths |
| paths = { |
| "kube_dir": kube_dir, |
| "kube_config": config_filename, |
| "cluster_dir": cluster_dir, |
| "helm_dir": helm_dir, |
| } |
| |
| for file_name, file in paths.items(): |
| if "dir" in file_name and not os.path.exists(file): |
| err_msg = "{} dir does not exist".format(file) |
| self.log.error(err_msg) |
| raise K8sException(err_msg) |
| |
| # 3 - Prepare environment variables |
| env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename} |
| |
| return paths, env |
| |
| async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command1 = "env KUBECONFIG={} {} get manifest {} ".format( |
| kubeconfig, self._helm_command, quote(kdu_instance) |
| ) |
| command2 = "{} get --namespace={} -f -".format( |
| self.kubectl_command, quote(namespace) |
| ) |
| output, _rc = await self._local_async_exec_pipe( |
| command1, command2, env=env, raise_exception_on_error=True |
| ) |
| services = self._parse_services(output) |
| |
| return services |
| |
| async def _cluster_init( |
| self, cluster_id: str, namespace: str, paths: dict, env: dict |
| ): |
| """ |
| Implements the helm version dependent cluster initialization: |
| For helm2 it initialized tiller environment if needed |
| """ |
| |
| # check if tiller pod is up in cluster |
| command = "{} --kubeconfig={} --namespace={} get deployments".format( |
| self.kubectl_command, paths["kube_config"], quote(namespace) |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| |
| output_table = self._output_to_table(output=output) |
| |
| # find 'tiller' pod in all pods |
| already_initialized = False |
| try: |
| for row in output_table: |
| if row[0].startswith("tiller-deploy"): |
| already_initialized = True |
| break |
| except Exception: |
| pass |
| |
| # helm init |
| n2vc_installed_sw = False |
| if not already_initialized: |
| self.log.info( |
| "Initializing helm in client and server: {}".format(cluster_id) |
| ) |
| command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format( |
| self.kubectl_command, paths["kube_config"], quote(self.service_account) |
| ) |
| _, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| |
| command = ( |
| "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule " |
| "--clusterrole=cluster-admin --serviceaccount=kube-system:{}" |
| ).format( |
| self.kubectl_command, paths["kube_config"], quote(self.service_account) |
| ) |
| _, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| |
| command = ( |
| "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} " |
| " {} init" |
| ).format( |
| self._helm_command, |
| paths["kube_config"], |
| quote(namespace), |
| quote(paths["helm_dir"]), |
| quote(self.service_account), |
| "--stable-repo-url {}".format(quote(self._stable_repo_url)) |
| if self._stable_repo_url |
| else "--skip-repos", |
| ) |
| _, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| n2vc_installed_sw = True |
| else: |
| # check client helm installation |
| check_file = paths["helm_dir"] + "/repository/repositories.yaml" |
| if not self._check_file_exists( |
| filename=check_file, exception_if_not_exists=False |
| ): |
| self.log.info("Initializing helm in client: {}".format(cluster_id)) |
| command = ( |
| "{} --kubeconfig={} --tiller-namespace={} " |
| "--home={} init --client-only {} " |
| ).format( |
| self._helm_command, |
| paths["kube_config"], |
| quote(namespace), |
| quote(paths["helm_dir"]), |
| "--stable-repo-url {}".format(quote(self._stable_repo_url)) |
| if self._stable_repo_url |
| else "--skip-repos", |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| else: |
| self.log.info("Helm client already initialized") |
| |
| repo_list = await self.repo_list(cluster_id) |
| for repo in repo_list: |
| if repo["name"] == "stable" and repo["url"] != self._stable_repo_url: |
| self.log.debug("Add new stable repo url: {}") |
| await self.repo_remove(cluster_id, "stable") |
| if self._stable_repo_url: |
| await self.repo_add(cluster_id, "stable", self._stable_repo_url) |
| break |
| |
| return n2vc_installed_sw |
| |
| async def _uninstall_sw(self, cluster_id: str, namespace: str): |
| # uninstall Tiller if necessary |
| |
| self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) |
| |
| # init paths, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| if not namespace: |
| # find namespace for tiller pod |
| command = "{} --kubeconfig={} get deployments --all-namespaces".format( |
| self.kubectl_command, quote(paths["kube_config"]) |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| output_table = self._output_to_table(output=output) |
| namespace = None |
| for r in output_table: |
| try: |
| if "tiller-deploy" in r[1]: |
| namespace = r[0] |
| break |
| except Exception: |
| pass |
| else: |
| msg = "Tiller deployment not found in cluster {}".format(cluster_id) |
| self.log.error(msg) |
| |
| self.log.debug("namespace for tiller: {}".format(namespace)) |
| |
| if namespace: |
| # uninstall tiller from cluster |
| self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) |
| command = "{} --kubeconfig={} --home={} reset".format( |
| self._helm_command, |
| quote(paths["kube_config"]), |
| quote(paths["helm_dir"]), |
| ) |
| self.log.debug("resetting: {}".format(command)) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| # Delete clusterrolebinding and serviceaccount. |
| # Ignore if errors for backward compatibility |
| command = ( |
| "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s." |
| "io/osm-tiller-cluster-rule" |
| ).format(self.kubectl_command, quote(paths["kube_config"])) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| command = ( |
| "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format( |
| self.kubectl_command, |
| quote(paths["kube_config"]), |
| quote(namespace), |
| quote(self.service_account), |
| ) |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| |
| else: |
| self.log.debug("namespace not found") |
| |
| async def _instances_list(self, cluster_id): |
| # init paths, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command = "{} list --output yaml".format(self._helm_command) |
| |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| |
| if output and len(output) > 0: |
| # parse yaml and update keys to lower case to unify with helm3 |
| instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases") |
| new_instances = [] |
| for instance in instances: |
| new_instance = dict((k.lower(), v) for k, v in instance.items()) |
| new_instances.append(new_instance) |
| return new_instances |
| else: |
| return [] |
| |
| def _get_inspect_command( |
| self, show_command: str, kdu_model: str, repo_str: str, version: str |
| ): |
| inspect_command = "{} inspect {} {}{} {}".format( |
| self._helm_command, show_command, quote(kdu_model), repo_str, version |
| ) |
| return inspect_command |
| |
| def _get_get_command( |
| self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str |
| ): |
| get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format( |
| kubeconfig, self._helm_command, get_command, quote(kdu_instance) |
| ) |
| return get_command |
| |
| async def _status_kdu( |
| self, |
| cluster_id: str, |
| kdu_instance: str, |
| namespace: str = None, |
| yaml_format: bool = False, |
| show_error_log: bool = False, |
| ) -> Union[str, dict]: |
| self.log.debug( |
| "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) |
| ) |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| command = ("env KUBECONFIG={} {} status {} --output yaml").format( |
| paths["kube_config"], self._helm_command, quote(kdu_instance) |
| ) |
| output, rc = await self._local_async_exec( |
| command=command, |
| raise_exception_on_error=True, |
| show_error_log=show_error_log, |
| env=env, |
| ) |
| |
| if yaml_format: |
| return str(output) |
| |
| if rc != 0: |
| return None |
| |
| data = yaml.load(output, Loader=yaml.SafeLoader) |
| |
| # remove field 'notes' |
| try: |
| del data.get("info").get("status")["notes"] |
| except KeyError: |
| pass |
| |
| # parse the manifest to a list of dictionaries |
| if "manifest" in data: |
| manifest_str = data.get("manifest") |
| manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader) |
| |
| data["manifest"] = [] |
| for doc in manifest_docs: |
| data["manifest"].append(doc) |
| |
| # parse field 'resources' |
| try: |
| resources = str(data.get("info").get("status").get("resources")) |
| resource_table = self._output_to_table(resources) |
| data.get("info").get("status")["resources"] = resource_table |
| except Exception: |
| pass |
| |
| # set description to lowercase (unify with helm3) |
| try: |
| data.get("info")["description"] = data.get("info").pop("Description") |
| except KeyError: |
| pass |
| |
| return data |
| |
| def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: |
| repo_ids = [] |
| cluster_filter = {"_admin.helm-chart.id": cluster_uuid} |
| cluster = self.db.get_one("k8sclusters", cluster_filter) |
| if cluster: |
| repo_ids = cluster.get("_admin").get("helm_chart_repos") or [] |
| return repo_ids |
| else: |
| raise K8sException( |
| "k8cluster with helm-id : {} not found".format(cluster_uuid) |
| ) |
| |
| async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool: |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| status = await self._status_kdu( |
| cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False |
| ) |
| |
| # extract info.status.resources-> str |
| # format: |
| # ==> v1/Deployment |
| # NAME READY UP-TO-DATE AVAILABLE AGE |
| # halting-horse-mongodb 0/1 1 0 0s |
| # halting-petit-mongodb 1/1 1 0 0s |
| # blank line |
| resources = K8sHelmBaseConnector._get_deep( |
| status, ("info", "status", "resources") |
| ) |
| |
| # convert to table |
| resources = K8sHelmBaseConnector._output_to_table(resources) |
| |
| num_lines = len(resources) |
| index = 0 |
| ready = True |
| while index < num_lines: |
| try: |
| line1 = resources[index] |
| index += 1 |
| # find '==>' in column 0 |
| if line1[0] == "==>": |
| line2 = resources[index] |
| index += 1 |
| # find READY in column 1 |
| if line2[1] == "READY": |
| # read next lines |
| line3 = resources[index] |
| index += 1 |
| while len(line3) > 1 and index < num_lines: |
| ready_value = line3[1] |
| parts = ready_value.split(sep="/") |
| current = int(parts[0]) |
| total = int(parts[1]) |
| if current < total: |
| self.log.debug("NOT READY:\n {}".format(line3)) |
| ready = False |
| line3 = resources[index] |
| index += 1 |
| |
| except Exception: |
| pass |
| |
| return ready |
| |
| def _get_install_command( |
| self, |
| kdu_model, |
| kdu_instance, |
| namespace, |
| params_str, |
| version, |
| atomic, |
| timeout, |
| kubeconfig, |
| ) -> str: |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| # namespace |
| namespace_str = "" |
| if namespace: |
| namespace_str = "--namespace {}".format(quote(namespace)) |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = "--version {}".format(version) |
| |
| command = ( |
| "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml " |
| "{params} {timeout} --name={name} {ns} {model} {ver}".format( |
| kubeconfig=kubeconfig, |
| helm=self._helm_command, |
| atomic=atomic_str, |
| params=params_str, |
| timeout=timeout_str, |
| name=quote(kdu_instance), |
| ns=namespace_str, |
| model=quote(kdu_model), |
| ver=version_str, |
| ) |
| ) |
| return command |
| |
| def _get_upgrade_scale_command( |
| self, |
| kdu_model: str, |
| kdu_instance: str, |
| namespace: str, |
| scale: int, |
| version: str, |
| atomic: bool, |
| replica_str: str, |
| timeout: float, |
| resource_name: str, |
| kubeconfig: str, |
| ) -> str: |
| """Generates the command to scale a Helm Chart release |
| |
| Args: |
| kdu_model (str): Kdu model name, corresponding to the Helm local location or repository |
| kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question |
| namespace (str): Namespace where this KDU instance is deployed |
| scale (int): Scale count |
| version (str): Constraint with specific version of the Chart to use |
| atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. |
| The --wait flag will be set automatically if --atomic is used |
| replica_str (str): The key under resource_name key where the scale count is stored |
| timeout (float): The time, in seconds, to wait |
| resource_name (str): The KDU's resource to scale |
| kubeconfig (str): Kubeconfig file path |
| |
| Returns: |
| str: command to scale a Helm Chart release |
| """ |
| |
| # scale |
| if resource_name: |
| scale_dict = {"{}.{}".format(resource_name, replica_str): scale} |
| else: |
| scale_dict = {replica_str: scale} |
| |
| scale_str = self._params_to_set_option(scale_dict) |
| |
| return self._get_upgrade_command( |
| kdu_model=kdu_model, |
| kdu_instance=kdu_instance, |
| namespace=namespace, |
| params_str=scale_str, |
| version=version, |
| atomic=atomic, |
| timeout=timeout, |
| kubeconfig=kubeconfig, |
| ) |
| |
| def _get_upgrade_command( |
| self, |
| kdu_model, |
| kdu_instance, |
| namespace, |
| params_str, |
| version, |
| atomic, |
| timeout, |
| kubeconfig, |
| force: bool = False, |
| ) -> str: |
| """Generates the command to upgrade a Helm Chart release |
| |
| Args: |
| kdu_model (str): Kdu model name, corresponding to the Helm local location or repository |
| kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question |
| namespace (str): Namespace where this KDU instance is deployed |
| params_str (str): Params used to upgrade the Helm Chart release |
| version (str): Constraint with specific version of the Chart to use |
| atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. |
| The --wait flag will be set automatically if --atomic is used |
| timeout (float): The time, in seconds, to wait |
| kubeconfig (str): Kubeconfig file path |
| force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. |
| Returns: |
| str: command to upgrade a Helm Chart release |
| """ |
| |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| |
| # force |
| force_str = "" |
| if force: |
| force_str = "--force " |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = "--version {}".format(quote(version)) |
| |
| # namespace |
| namespace_str = "" |
| if namespace: |
| namespace_str = "--namespace {}".format(quote(namespace)) |
| |
| command = ( |
| "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}" |
| "--reuse-values {name} {model} {ver}" |
| ).format( |
| kubeconfig=kubeconfig, |
| helm=self._helm_command, |
| namespace=namespace_str, |
| atomic=atomic_str, |
| force=force_str, |
| params=params_str, |
| timeout=timeout_str, |
| name=quote(kdu_instance), |
| model=quote(kdu_model), |
| ver=version_str, |
| ) |
| return command |
| |
| def _get_rollback_command( |
| self, kdu_instance, namespace, revision, kubeconfig |
| ) -> str: |
| return "env KUBECONFIG={} {} rollback {} {} --wait".format( |
| kubeconfig, self._helm_command, quote(kdu_instance), revision |
| ) |
| |
| def _get_uninstall_command( |
| self, kdu_instance: str, namespace: str, kubeconfig: str |
| ) -> str: |
| return "env KUBECONFIG={} {} delete --purge {}".format( |
| kubeconfig, self._helm_command, quote(kdu_instance) |
| ) |