| ## |
| # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. |
| # This file is part of OSM |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact with: nfvlabs@tid.es |
| ## |
| from typing import Union |
| import os |
| import yaml |
| |
| from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector |
| from n2vc.exceptions import K8sException |
| |
| |
| class K8sHelm3Connector(K8sHelmBaseConnector): |
| |
| """ |
| #################################################################################### |
| ################################### P U B L I C #################################### |
| #################################################################################### |
| """ |
| |
| def __init__( |
| self, |
| fs: object, |
| db: object, |
| kubectl_command: str = "/usr/bin/kubectl", |
| helm_command: str = "/usr/bin/helm3", |
| log: object = None, |
| on_update_db=None, |
| ): |
| """ |
| Initializes helm connector for helm v3 |
| |
| :param fs: file system for kubernetes and helm configuration |
| :param db: database object to write current operation status |
| :param kubectl_command: path to kubectl executable |
| :param helm_command: path to helm executable |
| :param log: logger |
| :param on_update_db: callback called when k8s connector updates database |
| """ |
| |
| # parent class |
| K8sHelmBaseConnector.__init__( |
| self, |
| db=db, |
| log=log, |
| fs=fs, |
| kubectl_command=kubectl_command, |
| helm_command=helm_command, |
| on_update_db=on_update_db, |
| ) |
| |
| self.log.info("K8S Helm3 connector initialized") |
| |
| async def install( |
| self, |
| cluster_uuid: str, |
| kdu_model: str, |
| kdu_instance: str, |
| atomic: bool = True, |
| timeout: float = 300, |
| params: dict = None, |
| db_dict: dict = None, |
| kdu_name: str = None, |
| namespace: str = None, |
| **kwargs, |
| ): |
| """Install a helm chart |
| |
| :param cluster_uuid str: The UUID of the cluster to install to |
| :param kdu_model str: chart/reference (string), which can be either |
| of these options: |
| - a name of chart available via the repos known by OSM |
| (e.g. stable/openldap, stable/openldap:1.2.4) |
| - a path to a packaged chart (e.g. mychart.tgz) |
| - a path to an unpacked chart directory or a URL (e.g. mychart) |
| :param kdu_instance: Kdu instance name |
| :param atomic bool: If set, waits until the model is active and resets |
| the cluster on failure. |
| :param timeout int: The time, in seconds, to wait for the install |
| to finish |
| :param params dict: Key-value pairs of instantiation parameters |
| :param kdu_name: Name of the KDU instance to be installed |
| :param namespace: K8s namespace to use for the KDU instance |
| |
| :param kwargs: Additional parameters (None yet) |
| |
| :return: True if successful |
| """ |
| |
| self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) |
| |
| # sync local dir |
| self.fs.sync(from_path=cluster_uuid) |
| |
| # init env, paths |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_uuid, create_if_not_exist=True |
| ) |
| |
| # for helm3 if namespace does not exist must create it |
| if namespace and namespace != "kube-system": |
| if not await self._namespace_exists(cluster_uuid, namespace): |
| try: |
| await self._create_namespace(cluster_uuid, namespace) |
| except Exception as e: |
| if not await self._namespace_exists(cluster_uuid, namespace): |
| err_msg = ( |
| "namespace {} does not exist in cluster_id {} " |
| "error message: ".format(namespace, e) |
| ) |
| self.log.error(err_msg) |
| raise K8sException(err_msg) |
| |
| await self._install_impl( |
| cluster_uuid, |
| kdu_model, |
| paths, |
| env, |
| kdu_instance, |
| atomic=atomic, |
| timeout=timeout, |
| params=params, |
| db_dict=db_dict, |
| kdu_name=kdu_name, |
| namespace=namespace, |
| ) |
| |
| # sync fs |
| self.fs.reverse_sync(from_path=cluster_uuid) |
| |
| self.log.debug("Returning kdu_instance {}".format(kdu_instance)) |
| return True |
| |
| async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
| |
| self.log.debug( |
| "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) |
| ) |
| |
| return await self._exec_inspect_command( |
| inspect_command="all", kdu_model=kdu_model, repo_url=repo_url |
| ) |
| |
| """ |
| #################################################################################### |
| ################################### P R I V A T E ################################## |
| #################################################################################### |
| """ |
| |
| def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): |
| """ |
| Creates and returns base cluster and kube dirs and returns them. |
| Also created helm3 dirs according to new directory specification, paths are |
| returned and also environment variables that must be provided to execute commands |
| |
| Helm 3 directory specification uses XDG categories for variable support: |
| - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/ |
| - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/ |
| - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm |
| |
| The variables assigned for this paths are: |
| (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG, |
| $HELM_PATH_DATA but looking and helm env the variable names are different) |
| - Cache: $HELM_CACHE_HOME |
| - Config: $HELM_CONFIG_HOME |
| - Data: $HELM_DATA_HOME |
| - helm kubeconfig: $KUBECONFIG |
| |
| :param cluster_name: cluster_name |
| :return: Dictionary with config_paths and dictionary with helm environment variables |
| """ |
| |
| base = self.fs.path |
| if base.endswith("/") or base.endswith("\\"): |
| base = base[:-1] |
| |
| # base dir for cluster |
| cluster_dir = base + "/" + cluster_name |
| |
| # kube dir |
| kube_dir = cluster_dir + "/" + ".kube" |
| if create_if_not_exist and not os.path.exists(kube_dir): |
| self.log.debug("Creating dir {}".format(kube_dir)) |
| os.makedirs(kube_dir) |
| |
| helm_path_cache = cluster_dir + "/.cache/helm" |
| if create_if_not_exist and not os.path.exists(helm_path_cache): |
| self.log.debug("Creating dir {}".format(helm_path_cache)) |
| os.makedirs(helm_path_cache) |
| |
| helm_path_config = cluster_dir + "/.config/helm" |
| if create_if_not_exist and not os.path.exists(helm_path_config): |
| self.log.debug("Creating dir {}".format(helm_path_config)) |
| os.makedirs(helm_path_config) |
| |
| helm_path_data = cluster_dir + "/.local/share/helm" |
| if create_if_not_exist and not os.path.exists(helm_path_data): |
| self.log.debug("Creating dir {}".format(helm_path_data)) |
| os.makedirs(helm_path_data) |
| |
| config_filename = kube_dir + "/config" |
| |
| # 2 - Prepare dictionary with paths |
| paths = { |
| "kube_dir": kube_dir, |
| "kube_config": config_filename, |
| "cluster_dir": cluster_dir, |
| } |
| |
| # 3 - Prepare environment variables |
| env = { |
| "HELM_CACHE_HOME": helm_path_cache, |
| "HELM_CONFIG_HOME": helm_path_config, |
| "HELM_DATA_HOME": helm_path_data, |
| "KUBECONFIG": config_filename, |
| } |
| |
| for file_name, file in paths.items(): |
| if "dir" in file_name and not os.path.exists(file): |
| err_msg = "{} dir does not exist".format(file) |
| self.log.error(err_msg) |
| raise K8sException(err_msg) |
| |
| return paths, env |
| |
| async def _namespace_exists(self, cluster_id, namespace) -> bool: |
| self.log.debug( |
| "checking if namespace {} exists cluster_id {}".format( |
| namespace, cluster_id |
| ) |
| ) |
| namespaces = await self._get_namespaces(cluster_id) |
| return namespace in namespaces if namespaces else False |
| |
| async def _get_namespaces(self, cluster_id: str): |
| |
| self.log.debug("get namespaces cluster_id {}".format(cluster_id)) |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command = "{} --kubeconfig={} get namespaces -o=yaml".format( |
| self.kubectl_command, paths["kube_config"] |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| |
| data = yaml.load(output, Loader=yaml.SafeLoader) |
| namespaces = [item["metadata"]["name"] for item in data["items"]] |
| self.log.debug(f"namespaces {namespaces}") |
| |
| return namespaces |
| |
| async def _create_namespace(self, cluster_id: str, namespace: str): |
| |
| self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}") |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command = "{} --kubeconfig={} create namespace {}".format( |
| self.kubectl_command, paths["kube_config"], namespace |
| ) |
| _, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| self.log.debug(f"namespace {namespace} created") |
| |
| return _rc |
| |
| async def _get_services( |
| self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str |
| ): |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format( |
| kubeconfig, self._helm_command, kdu_instance, namespace |
| ) |
| command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace) |
| output, _rc = await self._local_async_exec_pipe( |
| command1, command2, env=env, raise_exception_on_error=True |
| ) |
| services = self._parse_services(output) |
| |
| return services |
| |
| async def _cluster_init(self, cluster_id, namespace, paths, env): |
| """ |
| Implements the helm version dependent cluster initialization: |
| For helm3 it creates the namespace if it is not created |
| """ |
| if namespace != "kube-system": |
| namespaces = await self._get_namespaces(cluster_id) |
| if namespace not in namespaces: |
| await self._create_namespace(cluster_id, namespace) |
| |
| repo_list = await self.repo_list(cluster_id) |
| stable_repo = [repo for repo in repo_list if repo["name"] == "stable"] |
| if not stable_repo and self._stable_repo_url: |
| await self.repo_add(cluster_id, "stable", self._stable_repo_url) |
| |
| # Returns False as no software needs to be uninstalled |
| return False |
| |
| async def _uninstall_sw(self, cluster_id: str, namespace: str): |
| # nothing to do to uninstall sw |
| pass |
| |
| async def _instances_list(self, cluster_id: str): |
| |
| # init paths, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command = "{} list --all-namespaces --output yaml".format(self._helm_command) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| |
| if output and len(output) > 0: |
| self.log.debug("instances list output: {}".format(output)) |
| return yaml.load(output, Loader=yaml.SafeLoader) |
| else: |
| return [] |
| |
| def _get_inspect_command( |
| self, inspect_command: str, kdu_model: str, repo_str: str, version: str |
| ): |
| inspect_command = "{} show {} {}{} {}".format( |
| self._helm_command, inspect_command, kdu_model, repo_str, version |
| ) |
| return inspect_command |
| |
| def _get_get_command( |
| self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str |
| ): |
| get_command = ( |
| "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format( |
| kubeconfig, self._helm_command, get_command, kdu_instance, namespace |
| ) |
| ) |
| return get_command |
| |
| async def _status_kdu( |
| self, |
| cluster_id: str, |
| kdu_instance: str, |
| namespace: str = None, |
| yaml_format: bool = False, |
| show_error_log: bool = False, |
| ) -> Union[str, dict]: |
| |
| self.log.debug( |
| "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) |
| ) |
| |
| if not namespace: |
| namespace = "kube-system" |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format( |
| paths["kube_config"], self._helm_command, kdu_instance, namespace |
| ) |
| |
| output, rc = await self._local_async_exec( |
| command=command, |
| raise_exception_on_error=True, |
| show_error_log=show_error_log, |
| env=env, |
| ) |
| |
| if yaml_format: |
| return str(output) |
| |
| if rc != 0: |
| return None |
| |
| data = yaml.load(output, Loader=yaml.SafeLoader) |
| |
| # remove field 'notes' and manifest |
| try: |
| del data.get("info")["notes"] |
| except KeyError: |
| pass |
| |
| # parse the manifest to a list of dictionaries |
| if "manifest" in data: |
| manifest_str = data.get("manifest") |
| manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader) |
| |
| data["manifest"] = [] |
| for doc in manifest_docs: |
| data["manifest"].append(doc) |
| |
| return data |
| |
| def _get_install_command( |
| self, |
| kdu_model: str, |
| kdu_instance: str, |
| namespace: str, |
| params_str: str, |
| version: str, |
| atomic: bool, |
| timeout: float, |
| kubeconfig: str, |
| ) -> str: |
| |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}s".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| # namespace |
| namespace_str = "" |
| if namespace: |
| namespace_str = "--namespace {}".format(namespace) |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = "--version {}".format(version) |
| |
| command = ( |
| "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml " |
| "{params} {timeout} {ns} {model} {ver}".format( |
| kubeconfig=kubeconfig, |
| helm=self._helm_command, |
| name=kdu_instance, |
| atomic=atomic_str, |
| params=params_str, |
| timeout=timeout_str, |
| ns=namespace_str, |
| model=kdu_model, |
| ver=version_str, |
| ) |
| ) |
| return command |
| |
| def _get_upgrade_scale_command( |
| self, |
| kdu_model: str, |
| kdu_instance: str, |
| namespace: str, |
| scale: int, |
| version: str, |
| atomic: bool, |
| replica_str: str, |
| timeout: float, |
| resource_name: str, |
| kubeconfig: str, |
| ) -> str: |
| |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}s".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = "--version {}".format(version) |
| |
| # namespace |
| namespace_str = "" |
| if namespace: |
| namespace_str = "--namespace {}".format(namespace) |
| |
| # scale |
| if resource_name: |
| scale_dict = {"{}.{}".format(resource_name, replica_str): scale} |
| else: |
| scale_dict = {replica_str: scale} |
| |
| scale_str = self._params_to_set_option(scale_dict) |
| |
| command = ( |
| "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} " |
| "{timeout} {ver}" |
| ).format( |
| helm=self._helm_command, |
| name=kdu_instance, |
| namespace=namespace_str, |
| atomic=atomic_str, |
| scale=scale_str, |
| timeout=timeout_str, |
| model=kdu_model, |
| ver=version_str, |
| kubeconfig=kubeconfig, |
| ) |
| return command |
| |
| def _get_upgrade_command( |
| self, |
| kdu_model: str, |
| kdu_instance: str, |
| namespace: str, |
| params_str: str, |
| version: str, |
| atomic: bool, |
| timeout: float, |
| kubeconfig: str, |
| ) -> str: |
| |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}s".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = "--version {}".format(version) |
| |
| # namespace |
| namespace_str = "" |
| if namespace: |
| namespace_str = "--namespace {}".format(namespace) |
| |
| command = ( |
| "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} " |
| "--output yaml {params} {timeout} {ver}" |
| ).format( |
| kubeconfig=kubeconfig, |
| helm=self._helm_command, |
| name=kdu_instance, |
| namespace=namespace_str, |
| atomic=atomic_str, |
| params=params_str, |
| timeout=timeout_str, |
| model=kdu_model, |
| ver=version_str, |
| ) |
| return command |
| |
| def _get_rollback_command( |
| self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str |
| ) -> str: |
| return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format( |
| kubeconfig, self._helm_command, kdu_instance, revision, namespace |
| ) |
| |
| def _get_uninstall_command( |
| self, kdu_instance: str, namespace: str, kubeconfig: str |
| ) -> str: |
| |
| return "env KUBECONFIG={} {} uninstall {} --namespace={}".format( |
| kubeconfig, self._helm_command, kdu_instance, namespace |
| ) |
| |
| def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: |
| repo_ids = [] |
| cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid} |
| cluster = self.db.get_one("k8sclusters", cluster_filter) |
| if cluster: |
| repo_ids = cluster.get("_admin").get("helm_chart_repos") or [] |
| return repo_ids |
| else: |
| raise K8sException( |
| "k8cluster with helm-id : {} not found".format(cluster_uuid) |
| ) |