| ## |
| # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. |
| # This file is part of OSM |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact with: nfvlabs@tid.es |
| ## |
| import asyncio |
| import os |
| import yaml |
| |
| from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector |
| from n2vc.exceptions import K8sException |
| |
| |
| class K8sHelmConnector(K8sHelmBaseConnector): |
| |
| """ |
| #################################################################################### |
| ################################### P U B L I C #################################### |
| #################################################################################### |
| """ |
| |
| def __init__( |
| self, |
| fs: object, |
| db: object, |
| kubectl_command: str = "/usr/bin/kubectl", |
| helm_command: str = "/usr/bin/helm", |
| log: object = None, |
| on_update_db=None, |
| vca_config: dict = None, |
| ): |
| """ |
| Initializes helm connector for helm v2 |
| |
| :param fs: file system for kubernetes and helm configuration |
| :param db: database object to write current operation status |
| :param kubectl_command: path to kubectl executable |
| :param helm_command: path to helm executable |
| :param log: logger |
| :param on_update_db: callback called when k8s connector updates database |
| """ |
| |
| # parent class |
| K8sHelmBaseConnector.__init__( |
| self, |
| db=db, |
| log=log, |
| fs=fs, |
| kubectl_command=kubectl_command, |
| helm_command=helm_command, |
| on_update_db=on_update_db, |
| vca_config=vca_config, |
| ) |
| |
| self.log.info("Initializing K8S Helm2 connector") |
| |
| # initialize helm client-only |
| self.log.debug("Initializing helm client-only...") |
| command = "{} init --client-only --stable-repo-url {} ".format( |
| self._helm_command, self._stable_repo_url) |
| try: |
| asyncio.ensure_future( |
| self._local_async_exec(command=command, raise_exception_on_error=False) |
| ) |
| # loop = asyncio.get_event_loop() |
| # loop.run_until_complete(self._local_async_exec(command=command, |
| # raise_exception_on_error=False)) |
| except Exception as e: |
| self.warning( |
| msg="helm init failed (it was already initialized): {}".format(e) |
| ) |
| |
| self.log.info("K8S Helm2 connector initialized") |
| |
| async def install( |
| self, |
| cluster_uuid: str, |
| kdu_model: str, |
| kdu_instance: str, |
| atomic: bool = True, |
| timeout: float = 300, |
| params: dict = None, |
| db_dict: dict = None, |
| kdu_name: str = None, |
| namespace: str = None, |
| ): |
| _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
| self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) |
| |
| # sync local dir |
| self.fs.sync(from_path=cluster_id) |
| |
| # init env, paths |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| await self._install_impl( |
| cluster_id, |
| kdu_model, |
| paths, |
| env, |
| kdu_instance, |
| atomic=atomic, |
| timeout=timeout, |
| params=params, |
| db_dict=db_dict, |
| kdu_name=kdu_name, |
| namespace=namespace, |
| ) |
| |
| # sync fs |
| self.fs.reverse_sync(from_path=cluster_id) |
| |
| self.log.debug("Returning kdu_instance {}".format(kdu_instance)) |
| return True |
| |
| async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
| |
| self.log.debug( |
| "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) |
| ) |
| |
| return await self._exec_inspect_comand( |
| inspect_command="", kdu_model=kdu_model, repo_url=repo_url |
| ) |
| |
| """ |
| #################################################################################### |
| ################################### P R I V A T E ################################## |
| #################################################################################### |
| """ |
| |
| def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): |
| """ |
| Creates and returns base cluster and kube dirs and returns them. |
| Also created helm3 dirs according to new directory specification, paths are |
| returned and also environment variables that must be provided to execute commands |
| |
| Helm 2 directory specification uses helm_home dir: |
| |
| The variables assigned for this paths are: |
| - Helm hone: $HELM_HOME |
| - helm kubeconfig: $KUBECONFIG |
| |
| :param cluster_name: cluster_name |
| :return: Dictionary with config_paths and dictionary with helm environment variables |
| """ |
| base = self.fs.path |
| if base.endswith("/") or base.endswith("\\"): |
| base = base[:-1] |
| |
| # base dir for cluster |
| cluster_dir = base + "/" + cluster_name |
| |
| # kube dir |
| kube_dir = cluster_dir + "/" + ".kube" |
| if create_if_not_exist and not os.path.exists(kube_dir): |
| self.log.debug("Creating dir {}".format(kube_dir)) |
| os.makedirs(kube_dir) |
| |
| # helm home dir |
| helm_dir = cluster_dir + "/" + ".helm" |
| if create_if_not_exist and not os.path.exists(helm_dir): |
| self.log.debug("Creating dir {}".format(helm_dir)) |
| os.makedirs(helm_dir) |
| |
| config_filename = kube_dir + "/config" |
| |
| # 2 - Prepare dictionary with paths |
| paths = { |
| "kube_dir": kube_dir, |
| "kube_config": config_filename, |
| "cluster_dir": cluster_dir, |
| "helm_dir": helm_dir, |
| } |
| |
| for file_name, file in paths.items(): |
| if "dir" in file_name and not os.path.exists(file): |
| err_msg = "{} dir does not exist".format(file) |
| self.log.error(err_msg) |
| raise K8sException(err_msg) |
| |
| # 3 - Prepare environment variables |
| env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename} |
| |
| return paths, env |
| |
| async def _get_services(self, cluster_id, kdu_instance, namespace): |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance) |
| command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace) |
| output, _rc = await self._local_async_exec_pipe( |
| command1, command2, env=env, raise_exception_on_error=True |
| ) |
| services = self._parse_services(output) |
| |
| return services |
| |
| async def _cluster_init(self, cluster_id: str, namespace: str, |
| paths: dict, env: dict): |
| """ |
| Implements the helm version dependent cluster initialization: |
| For helm2 it initialized tiller environment if needed |
| """ |
| |
| # check if tiller pod is up in cluster |
| command = "{} --kubeconfig={} --namespace={} get deployments".format( |
| self.kubectl_command, paths["kube_config"], namespace |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| |
| output_table = self._output_to_table(output=output) |
| |
| # find 'tiller' pod in all pods |
| already_initialized = False |
| try: |
| for row in output_table: |
| if row[0].startswith("tiller-deploy"): |
| already_initialized = True |
| break |
| except Exception: |
| pass |
| |
| # helm init |
| n2vc_installed_sw = False |
| if not already_initialized: |
| self.log.info( |
| "Initializing helm in client and server: {}".format(cluster_id) |
| ) |
| command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format( |
| self.kubectl_command, paths["kube_config"], self.service_account |
| ) |
| _, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| |
| command = ( |
| "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule " |
| "--clusterrole=cluster-admin --serviceaccount=kube-system:{}" |
| ).format(self.kubectl_command, paths["kube_config"], self.service_account) |
| _, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| |
| command = ( |
| "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} " |
| " --stable-repo-url {} init" |
| ).format( |
| self._helm_command, |
| paths["kube_config"], |
| namespace, |
| paths["helm_dir"], |
| self.service_account, |
| self._stable_repo_url |
| ) |
| _, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| n2vc_installed_sw = True |
| else: |
| # check client helm installation |
| check_file = paths["helm_dir"] + "/repository/repositories.yaml" |
| if not self._check_file_exists( |
| filename=check_file, exception_if_not_exists=False |
| ): |
| self.log.info("Initializing helm in client: {}".format(cluster_id)) |
| command = ( |
| "{} --kubeconfig={} --tiller-namespace={} " |
| "--home={} init --client-only --stable-repo-url {} " |
| ).format( |
| self._helm_command, |
| paths["kube_config"], |
| namespace, |
| paths["helm_dir"], |
| self._stable_repo_url, |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| else: |
| self.log.info("Helm client already initialized") |
| |
| # remove old stable repo and add new one |
| cluster_uuid = "{}:{}".format(namespace, cluster_id) |
| repo_list = await self.repo_list(cluster_uuid) |
| for repo in repo_list: |
| if repo["name"] == "stable" and repo["url"] != self._stable_repo_url: |
| self.log.debug("Add new stable repo url: {}") |
| await self.repo_remove(cluster_uuid, |
| "stable") |
| await self.repo_add(cluster_uuid, |
| "stable", |
| self._stable_repo_url) |
| break |
| |
| return n2vc_installed_sw |
| |
| async def _uninstall_sw(self, cluster_id: str, namespace: str): |
| # uninstall Tiller if necessary |
| |
| self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) |
| |
| # init paths, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| if not namespace: |
| # find namespace for tiller pod |
| command = "{} --kubeconfig={} get deployments --all-namespaces".format( |
| self.kubectl_command, paths["kube_config"] |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| output_table = self._output_to_table(output=output) |
| namespace = None |
| for r in output_table: |
| try: |
| if "tiller-deploy" in r[1]: |
| namespace = r[0] |
| break |
| except Exception: |
| pass |
| else: |
| msg = "Tiller deployment not found in cluster {}".format(cluster_id) |
| self.log.error(msg) |
| |
| self.log.debug("namespace for tiller: {}".format(namespace)) |
| |
| if namespace: |
| # uninstall tiller from cluster |
| self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) |
| command = "{} --kubeconfig={} --home={} reset".format( |
| self._helm_command, paths["kube_config"], paths["helm_dir"] |
| ) |
| self.log.debug("resetting: {}".format(command)) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| # Delete clusterrolebinding and serviceaccount. |
| # Ignore if errors for backward compatibility |
| command = ( |
| "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s." |
| "io/osm-tiller-cluster-rule" |
| ).format(self.kubectl_command, paths["kube_config"]) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format( |
| self.kubectl_command, paths["kube_config"], self.service_account |
| ) |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=False, env=env |
| ) |
| |
| else: |
| self.log.debug("namespace not found") |
| |
| async def _instances_list(self, cluster_id): |
| |
| # init paths, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| |
| command = "{} list --output yaml".format(self._helm_command) |
| |
| output, _rc = await self._local_async_exec( |
| command=command, raise_exception_on_error=True, env=env |
| ) |
| |
| if output and len(output) > 0: |
| # parse yaml and update keys to lower case to unify with helm3 |
| instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases") |
| new_instances = [] |
| for instance in instances: |
| new_instance = dict((k.lower(), v) for k, v in instance.items()) |
| new_instances.append(new_instance) |
| return new_instances |
| else: |
| return [] |
| |
| def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str, |
| version: str): |
| inspect_command = "{} inspect {} {}{} {}".format( |
| self._helm_command, show_command, kdu_model, repo_str, version |
| ) |
| return inspect_command |
| |
| async def _status_kdu( |
| self, |
| cluster_id: str, |
| kdu_instance: str, |
| namespace: str = None, |
| show_error_log: bool = False, |
| return_text: bool = False, |
| ): |
| |
| self.log.debug( |
| "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) |
| ) |
| |
| # init config, env |
| paths, env = self._init_paths_env( |
| cluster_name=cluster_id, create_if_not_exist=True |
| ) |
| command = "{} status {} --output yaml".format(self._helm_command, kdu_instance) |
| output, rc = await self._local_async_exec( |
| command=command, |
| raise_exception_on_error=True, |
| show_error_log=show_error_log, |
| env=env, |
| ) |
| |
| if return_text: |
| return str(output) |
| |
| if rc != 0: |
| return None |
| |
| data = yaml.load(output, Loader=yaml.SafeLoader) |
| |
| # remove field 'notes' |
| try: |
| del data.get("info").get("status")["notes"] |
| except KeyError: |
| pass |
| |
| # parse field 'resources' |
| try: |
| resources = str(data.get("info").get("status").get("resources")) |
| resource_table = self._output_to_table(resources) |
| data.get("info").get("status")["resources"] = resource_table |
| except Exception: |
| pass |
| |
| # set description to lowercase (unify with helm3) |
| try: |
| data.get("info")["description"] = data.get("info").pop("Description") |
| except KeyError: |
| pass |
| |
| return data |
| |
| def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: |
| repo_ids = [] |
| cluster_filter = {"_admin.helm-chart.id": cluster_uuid} |
| cluster = self.db.get_one("k8sclusters", cluster_filter) |
| if cluster: |
| repo_ids = cluster.get("_admin").get("helm_chart_repos") or [] |
| return repo_ids |
| else: |
| raise K8sException( |
| "k8cluster with helm-id : {} not found".format(cluster_uuid) |
| ) |
| |
| async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool: |
| |
| status = await self._status_kdu( |
| cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False |
| ) |
| |
| # extract info.status.resources-> str |
| # format: |
| # ==> v1/Deployment |
| # NAME READY UP-TO-DATE AVAILABLE AGE |
| # halting-horse-mongodb 0/1 1 0 0s |
| # halting-petit-mongodb 1/1 1 0 0s |
| # blank line |
| resources = K8sHelmBaseConnector._get_deep( |
| status, ("info", "status", "resources") |
| ) |
| |
| # convert to table |
| resources = K8sHelmBaseConnector._output_to_table(resources) |
| |
| num_lines = len(resources) |
| index = 0 |
| ready = True |
| while index < num_lines: |
| try: |
| line1 = resources[index] |
| index += 1 |
| # find '==>' in column 0 |
| if line1[0] == "==>": |
| line2 = resources[index] |
| index += 1 |
| # find READY in column 1 |
| if line2[1] == "READY": |
| # read next lines |
| line3 = resources[index] |
| index += 1 |
| while len(line3) > 1 and index < num_lines: |
| ready_value = line3[1] |
| parts = ready_value.split(sep="/") |
| current = int(parts[0]) |
| total = int(parts[1]) |
| if current < total: |
| self.log.debug("NOT READY:\n {}".format(line3)) |
| ready = False |
| line3 = resources[index] |
| index += 1 |
| |
| except Exception: |
| pass |
| |
| return ready |
| |
| def _get_install_command( |
| self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout |
| ) -> str: |
| |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| # namespace |
| namespace_str = "" |
| if namespace: |
| namespace_str = "--namespace {}".format(namespace) |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = version_str = "--version {}".format(version) |
| |
| command = ( |
| "{helm} install {atomic} --output yaml " |
| "{params} {timeout} --name={name} {ns} {model} {ver}".format( |
| helm=self._helm_command, |
| atomic=atomic_str, |
| params=params_str, |
| timeout=timeout_str, |
| name=kdu_instance, |
| ns=namespace_str, |
| model=kdu_model, |
| ver=version_str, |
| ) |
| ) |
| return command |
| |
| def _get_upgrade_command( |
| self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout |
| ) -> str: |
| |
| timeout_str = "" |
| if timeout: |
| timeout_str = "--timeout {}".format(timeout) |
| |
| # atomic |
| atomic_str = "" |
| if atomic: |
| atomic_str = "--atomic" |
| |
| # version |
| version_str = "" |
| if version: |
| version_str = "--version {}".format(version) |
| |
| command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\ |
| .format(helm=self._helm_command, |
| atomic=atomic_str, |
| params=params_str, |
| timeout=timeout_str, |
| name=kdu_instance, |
| model=kdu_model, |
| ver=version_str |
| ) |
| return command |
| |
| def _get_rollback_command(self, kdu_instance, namespace, revision) -> str: |
| return "{} rollback {} {} --wait".format( |
| self._helm_command, kdu_instance, revision |
| ) |
| |
| def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: |
| return "{} delete --purge {}".format(self._helm_command, kdu_instance) |