| ## |
| # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. |
| # This file is part of OSM |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact with: nfvlabs@tid.es |
| ## |
| import os |
| import yaml |
| |
| from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector |
| from n2vc.exceptions import K8sException |
| |
| |
| class K8sHelm3Connector(K8sHelmBaseConnector): |
| |
| """ |
| #################################################################################### |
| ################################### P U B L I C #################################### |
| #################################################################################### |
| """ |
| |
| def __init__( |
| self, |
| fs: object, |
| db: object, |
| kubectl_command: str = "/usr/bin/kubectl", |
| helm_command: str = "/usr/bin/helm3", |
| log: object = None, |
| on_update_db=None, |
| vca_config: dict = None, |
| ): |
| """ |
| Initializes helm connector for helm v3 |
| |
| :param fs: file system for kubernetes and helm configuration |
| :param db: database object to write current operation status |
| :param kubectl_command: path to kubectl executable |
| :param helm_command: path to helm executable |
| :param log: logger |
| :param on_update_db: callback called when k8s connector updates database |
| """ |
| |
| # parent class |
| K8sHelmBaseConnector.__init__(self, |
| db=db, |
| log=log, |
| fs=fs, |
| kubectl_command=kubectl_command, |
| helm_command=helm_command, |
| on_update_db=on_update_db, |
| vca_config=vca_config) |
| |
| self.log.info("K8S Helm3 connector initialized") |
| |
| async def install( |
| self, |
| cluster_uuid: str, |
| kdu_model: str, |
| kdu_instance: str, |
| atomic: bool = True, |
| timeout: float = 300, |
| params: dict = None, |
| db_dict: dict = None, |
| kdu_name: str = None, |
| namespace: str = None, |
| ): |
| _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
| self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) |
| |
| # sync local dir |
| self.fs.sync(from_path=cluster_id) |
| |
| # init env, paths |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| # for helm3 if namespace does not exist must create it |
| if namespace and namespace != "kube-system": |
| namespaces = await self._get_namespaces(cluster_id) |
| if namespace not in namespaces: |
| await self._create_namespace(cluster_id, namespace) |
| |
| await self._install_impl( |
| cluster_id, |
| kdu_model, |
| paths, |
| env, |
| kdu_instance, |
| atomic=atomic, |
| timeout=timeout, |
| params=params, |
| db_dict=db_dict, |
| kdu_name=kdu_name, |
| namespace=namespace, |
| ) |
| |
| # sync fs |
| self.fs.reverse_sync(from_path=cluster_id) |
| |
| self.log.debug("Returning kdu_instance {}".format(kdu_instance)) |
| return True |
| |
| async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
| |
| self.log.debug( |
| "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) |
| ) |
| |
| return await self._exec_inspect_comand( |
| inspect_command="all", kdu_model=kdu_model, repo_url=repo_url |
| ) |
| |
| """ |
| #################################################################################### |
| ################################### P R I V A T E ################################## |
| #################################################################################### |
| """ |
| |
| def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): |
| """ |
| Creates and returns base cluster and kube dirs and returns them. |
| Also created helm3 dirs according to new directory specification, paths are |
| returned and also environment variables that must be provided to execute commands |
| |
| Helm 3 directory specification uses XDG categories for variable support: |
| - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/ |
| - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/ |
| - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm |
| |
| The variables assigned for this paths are: |
| (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG, |
| $HELM_PATH_DATA but looking and helm env the variable names are different) |
| - Cache: $HELM_CACHE_HOME |
| - Config: $HELM_CONFIG_HOME |
| - Data: $HELM_DATA_HOME |
| - helm kubeconfig: $KUBECONFIG |
| |
| :param cluster_name: cluster_name |
| :return: Dictionary with config_paths and dictionary with helm environment variables |
| """ |
| |
| base = self.fs.path |
| if base.endswith("/") or base.endswith("\\"): |
| base = base[:-1] |
| |
| # base dir for cluster |
| cluster_dir = base + "/" + cluster_name |
| |
| # kube dir |
| kube_dir = cluster_dir + "/" + ".kube" |
| if create_if_not_exist and not os.path.exists(kube_dir): |
| self.log.debug("Creating dir {}".format(kube_dir)) |
| os.makedirs(kube_dir) |
| |
| helm_path_cache = cluster_dir + "/.cache/helm" |
| if create_if_not_exist and not os.path.exists(helm_path_cache): |
| self.log.debug("Creating dir {}".format(helm_path_cache)) |
| os.makedirs(helm_path_cache) |
| |
| helm_path_config = cluster_dir + "/.config/helm" |
| if create_if_not_exist and not os.path.exists(helm_path_config): |
| self.log.debug("Creating dir {}".format(helm_path_config)) |
| os.makedirs(helm_path_config) |
| |
| helm_path_data = cluster_dir + "/.local/share/helm" |
| if create_if_not_exist and not os.path.exists(helm_path_data): |
| self.log.debug("Creating dir {}".format(helm_path_data)) |
| os.makedirs(helm_path_data) |
| |
| config_filename = kube_dir + "/config" |
| |
| # 2 - Prepare dictionary with paths |
| paths = { |
| "kube_dir": kube_dir, |
| "kube_config": config_filename, |
| "cluster_dir": cluster_dir |
| } |
| |
| # 3 - Prepare environment variables |
| env = { |
| "HELM_CACHE_HOME": helm_path_cache, |
| "HELM_CONFIG_HOME": helm_path_config, |
| "HELM_DATA_HOME": helm_path_data, |
| "KUBECONFIG": config_filename |
| } |
| |
| for file_name, file in paths.items(): |
| if "dir" in file_name and not os.path.exists(file): |
| err_msg = "{} dir does not exist".format(file) |
| self.log.error(err_msg) |
| raise K8sException(err_msg) |
| |
| return paths, env |
| |
| async def _get_namespaces(self, |
| cluster_id: str): |
| |
| self.log.debug("get namespaces cluster_id {}".format(cluster_id)) |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command = "{} --kubeconfig={} get namespaces -o=yaml".format( |
| self.kubectl_command, paths["kube_config"] |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| |
| data = yaml.load(output, Loader=yaml.SafeLoader) |
| namespaces = [item["metadata"]["name"] for item in data["items"]] |
| self.log.debug(f"namespaces {namespaces}") |
| |
| return namespaces |
| |
| async def _create_namespace(self, |
| cluster_id: str, |
| namespace: str): |
| |
| self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}") |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command = "{} --kubeconfig={} create namespace {}".format( |
| self.kubectl_command, paths["kube_config"], namespace |
| ) |
| _, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| self.log.debug(f"namespace {namespace} created") |
| |
| return _rc |
| |
| async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str): |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command1 = "{} get manifest {} --namespace={}".format( |
| self._helm_command, kdu_instance, namespace |
| ) |
| command2 = "{} get --namespace={} -f -".format( |
| self.kubectl_command, namespace |
| ) |
| output, _rc = await self._local_async_exec_pipe( |
| command1, command2, env=env, raise_exception_on_error=True |
| ) |
| services = self._parse_services(output) |
| |
| return services |
| |
| async def _cluster_init(self, cluster_id, namespace, paths, env): |
| """ |
| Implements the helm version dependent cluster initialization: |
| For helm3 it creates the namespace if it is not created |
| """ |
| if namespace != "kube-system": |
| namespaces = await self._get_namespaces(cluster_id) |
| if namespace not in namespaces: |
| await self._create_namespace(cluster_id, namespace) |
| |
| # If default repo is not included add |
| cluster_uuid = "{}:{}".format(namespace, cluster_id) |
| repo_list = await self.repo_list(cluster_uuid) |
| for repo in repo_list: |
| self.log.debug("repo") |
| if repo["name"] == "stable": |
| self.log.debug("Default repo already present") |
| break |
| else: |
| await self.repo_add(cluster_uuid, |
| "stable", |
| self._stable_repo_url) |
| |
| # Returns False as no software needs to be uninstalled |
| return False |
| |
| async def _uninstall_sw(self, cluster_id: str, namespace: str): |
| # nothing to do to uninstall sw |
| pass |
| |
| async def _instances_list(self, cluster_id: str): |
| |
| # init paths, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command = "{} list --all-namespaces --output yaml".format( |
| self._helm_command |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| |
| if output and len(output) > 0: |
| self.log.debug("instances list output: {}".format(output)) |
| return yaml.load(output, Loader=yaml.SafeLoader) |
| else: |
| return [] |
| |
| def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str, |
| version: str): |
| inspect_command = "{} show {} {}{} {}".format( |
| self._helm_command, inspect_command, kdu_model, repo_str, version |
| ) |
| return inspect_command |
| |
| async def _status_kdu( |
| self, |
| cluster_id: str, |
| kdu_instance: str, |
| namespace: str = None, |
| show_error_log: bool = False, |
| return_text: bool = False, |
| ): |
| |
| self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)) |
| |
| if not namespace: |
| namespace = "kube-system" |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| command = "{} status {} --namespace={} --output yaml".format( |
| self._helm_command, kdu_instance, namespace |
| ) |
| |
| output, rc = await self._local_async_exec( |
| command=command, |
| raise_exception_on_error=True, |
| show_error_log=show_error_log, |
| env=env |
| ) |
| |
| if return_text: |
| return str(output) |
| |
| if rc != 0: |
| return None |
| |
| data = yaml.load(output, Loader=yaml.SafeLoader) |
| |
| # remove field 'notes' and manifest |
| try: |
| del data.get("info")["notes"] |
| del data["manifest"] |
| except KeyError: |
| pass |
| |
| # unable to parse 'resources' as currently it is not included in helm3 |
| return data |
| |
| def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str, |
| params_str: str, version: str, atomic: bool, timeout: float) -> str: |
| |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}s".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| # namespace |
| namespace_str = "" |
| if namespace: |
| namespace_str = "--namespace {}".format(namespace) |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = "--version {}".format(version) |
| |
| command = ( |
| "{helm} install {name} {atomic} --output yaml " |
| "{params} {timeout} {ns} {model} {ver}".format( |
| helm=self._helm_command, |
| name=kdu_instance, |
| atomic=atomic_str, |
| params=params_str, |
| timeout=timeout_str, |
| ns=namespace_str, |
| model=kdu_model, |
| ver=version_str, |
| ) |
| ) |
| return command |
| |
| def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str, |
| params_str: str, version: str, atomic: bool, timeout: float) -> str: |
| |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}s".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = "--version {}".format(version) |
| |
| # namespace |
| namespace_str = "" |
| if namespace: |
| namespace_str = "--namespace {}".format(namespace) |
| |
| command = ( |
| "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} " |
| "{timeout} {ver}".format( |
| helm=self._helm_command, |
| name=kdu_instance, |
| namespace=namespace_str, |
| atomic=atomic_str, |
| params=params_str, |
| timeout=timeout_str, |
| model=kdu_model, |
| ver=version_str, |
| ) |
| ) |
| return command |
| |
| def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str: |
| return "{} rollback {} {} --namespace={} --wait".format( |
| self._helm_command, kdu_instance, revision, namespace |
| ) |
| |
| def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: |
| |
| return "{} uninstall {} --namespace={}".format( |
| self._helm_command, kdu_instance, namespace) |
| |
| def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: |
| repo_ids = [] |
| cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid} |
| cluster = self.db.get_one("k8sclusters", cluster_filter) |
| if cluster: |
| repo_ids = cluster.get("_admin").get("helm_chart_repos") or [] |
| return repo_ids |
| else: |
| raise K8sException( |
| "k8cluster with helm-id : {} not found".format(cluster_uuid) |
| ) |