X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=13a3114c2813590399182ecd2c2ef0d8f8cb9da8;hp=6bbc0fa79b0b69eda28060a981441aafcfb440ae;hb=HEAD;hpb=582b923b8f3f7104411c39ebdba63949d606ecd1 diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py deleted file mode 100644 index 6bbc0fa..0000000 --- a/n2vc/k8s_helm_conn.py +++ /dev/null @@ -1,641 +0,0 @@ -## -# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. -# This file is part of OSM -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# For those usages not covered by the Apache License, Version 2.0 please -# contact with: nfvlabs@tid.es -## -import asyncio -import os -import yaml - -from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector -from n2vc.exceptions import K8sException - - -class K8sHelmConnector(K8sHelmBaseConnector): - - """ - #################################################################################### - ################################### P U B L I C #################################### - #################################################################################### - """ - - def __init__( - self, - fs: object, - db: object, - kubectl_command: str = "/usr/bin/kubectl", - helm_command: str = "/usr/bin/helm", - log: object = None, - on_update_db=None, - ): - """ - Initializes helm connector for helm v2 - - :param fs: file system for kubernetes and helm configuration - :param db: database object to write current operation status - :param kubectl_command: path to kubectl executable - :param helm_command: path to helm executable - :param log: logger - :param on_update_db: callback called when k8s connector updates database - """ - - # parent class - K8sHelmBaseConnector.__init__( - self, - db=db, - log=log, - fs=fs, - kubectl_command=kubectl_command, - helm_command=helm_command, - on_update_db=on_update_db, - ) - - self.log.info("Initializing K8S Helm2 connector") - - # initialize helm client-only - self.log.debug("Initializing helm client-only...") - command = "{} init --client-only {} ".format( - self._helm_command, - "--stable-repo-url {}".format(self._stable_repo_url) - if self._stable_repo_url - else "--skip-repos", - ) - try: - asyncio.ensure_future( - self._local_async_exec(command=command, raise_exception_on_error=False) - ) - # loop = asyncio.get_event_loop() - # loop.run_until_complete(self._local_async_exec(command=command, - # raise_exception_on_error=False)) - except Exception as e: - self.warning( - msg="helm init failed (it was already initialized): {}".format(e) - ) - - self.log.info("K8S Helm2 connector initialized") - - async def install( - self, - cluster_uuid: str, - kdu_model: str, - kdu_instance: str, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None, - kdu_name: str = None, - namespace: str = None, - **kwargs, - ): - """ - Deploys of a new KDU instance. It would implicitly rely on the `install` call - to deploy the Chart/Bundle properly parametrized (in practice, this call would - happen before any _initial-config-primitive_of the VNF is called). - - :param cluster_uuid: UUID of a K8s cluster known by OSM - :param kdu_model: chart/ reference (string), which can be either - of these options: - - a name of chart available via the repos known by OSM - - a path to a packaged chart - - a path to an unpacked chart directory or a URL - :param kdu_instance: Kdu instance name - :param atomic: If set, installation process purges chart/bundle on fail, also - will wait until all the K8s objects are active - :param timeout: Time in seconds to wait for the install of the chart/bundle - (defaults to Helm default timeout: 300s) - :param params: dictionary of key-value pairs for instantiation parameters - (overriding default values) - :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, - path: }, - e.g. {collection: "nsrs", filter: - {_id: , path: "_admin.deployed.K8S.3"} - :param kdu_name: Name of the KDU instance to be installed - :param namespace: K8s namespace to use for the KDU instance - :param kwargs: Additional parameters (None yet) - :return: True if successful - """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) - - # sync local dir - self.fs.sync(from_path=cluster_id) - - # init env, paths - paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True - ) - - await self._install_impl( - cluster_id, - kdu_model, - paths, - env, - kdu_instance, - atomic=atomic, - timeout=timeout, - params=params, - db_dict=db_dict, - kdu_name=kdu_name, - namespace=namespace, - ) - - # sync fs - self.fs.reverse_sync(from_path=cluster_id) - - self.log.debug("Returning kdu_instance {}".format(kdu_instance)) - return True - - async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: - - self.log.debug( - "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) - ) - - return await self._exec_inspect_comand( - inspect_command="", kdu_model=kdu_model, repo_url=repo_url - ) - - """ - #################################################################################### - ################################### P R I V A T E ################################## - #################################################################################### - """ - - def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): - """ - Creates and returns base cluster and kube dirs and returns them. - Also created helm3 dirs according to new directory specification, paths are - returned and also environment variables that must be provided to execute commands - - Helm 2 directory specification uses helm_home dir: - - The variables assigned for this paths are: - - Helm hone: $HELM_HOME - - helm kubeconfig: $KUBECONFIG - - :param cluster_name: cluster_name - :return: Dictionary with config_paths and dictionary with helm environment variables - """ - base = self.fs.path - if base.endswith("/") or base.endswith("\\"): - base = base[:-1] - - # base dir for cluster - cluster_dir = base + "/" + cluster_name - - # kube dir - kube_dir = cluster_dir + "/" + ".kube" - if create_if_not_exist and not os.path.exists(kube_dir): - self.log.debug("Creating dir {}".format(kube_dir)) - os.makedirs(kube_dir) - - # helm home dir - helm_dir = cluster_dir + "/" + ".helm" - if create_if_not_exist and not os.path.exists(helm_dir): - self.log.debug("Creating dir {}".format(helm_dir)) - os.makedirs(helm_dir) - - config_filename = kube_dir + "/config" - - # 2 - Prepare dictionary with paths - paths = { - "kube_dir": kube_dir, - "kube_config": config_filename, - "cluster_dir": cluster_dir, - "helm_dir": helm_dir, - } - - for file_name, file in paths.items(): - if "dir" in file_name and not os.path.exists(file): - err_msg = "{} dir does not exist".format(file) - self.log.error(err_msg) - raise K8sException(err_msg) - - # 3 - Prepare environment variables - env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename} - - return paths, env - - async def _get_services(self, cluster_id, kdu_instance, namespace): - - # init config, env - paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True - ) - - command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance) - command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace) - output, _rc = await self._local_async_exec_pipe( - command1, command2, env=env, raise_exception_on_error=True - ) - services = self._parse_services(output) - - return services - - async def _cluster_init( - self, cluster_id: str, namespace: str, paths: dict, env: dict - ): - """ - Implements the helm version dependent cluster initialization: - For helm2 it initialized tiller environment if needed - """ - - # check if tiller pod is up in cluster - command = "{} --kubeconfig={} --namespace={} get deployments".format( - self.kubectl_command, paths["kube_config"], namespace - ) - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env - ) - - output_table = self._output_to_table(output=output) - - # find 'tiller' pod in all pods - already_initialized = False - try: - for row in output_table: - if row[0].startswith("tiller-deploy"): - already_initialized = True - break - except Exception: - pass - - # helm init - n2vc_installed_sw = False - if not already_initialized: - self.log.info( - "Initializing helm in client and server: {}".format(cluster_id) - ) - command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format( - self.kubectl_command, paths["kube_config"], self.service_account - ) - _, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=False, env=env - ) - - command = ( - "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule " - "--clusterrole=cluster-admin --serviceaccount=kube-system:{}" - ).format(self.kubectl_command, paths["kube_config"], self.service_account) - _, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=False, env=env - ) - - command = ( - "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} " - " {} init" - ).format( - self._helm_command, - paths["kube_config"], - namespace, - paths["helm_dir"], - self.service_account, - "--stable-repo-url {}".format(self._stable_repo_url) - if self._stable_repo_url - else "--skip-repos", - ) - _, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env - ) - n2vc_installed_sw = True - else: - # check client helm installation - check_file = paths["helm_dir"] + "/repository/repositories.yaml" - if not self._check_file_exists( - filename=check_file, exception_if_not_exists=False - ): - self.log.info("Initializing helm in client: {}".format(cluster_id)) - command = ( - "{} --kubeconfig={} --tiller-namespace={} " - "--home={} init --client-only {} " - ).format( - self._helm_command, - paths["kube_config"], - namespace, - paths["helm_dir"], - "--stable-repo-url {}".format(self._stable_repo_url) - if self._stable_repo_url - else "--skip-repos", - ) - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env - ) - else: - self.log.info("Helm client already initialized") - - # remove old stable repo and add new one - cluster_uuid = "{}:{}".format(namespace, cluster_id) - repo_list = await self.repo_list(cluster_uuid) - for repo in repo_list: - if repo["name"] == "stable" and repo["url"] != self._stable_repo_url: - self.log.debug("Add new stable repo url: {}") - await self.repo_remove(cluster_uuid, "stable") - if self._stable_repo_url: - await self.repo_add(cluster_uuid, "stable", self._stable_repo_url) - break - - return n2vc_installed_sw - - async def _uninstall_sw(self, cluster_id: str, namespace: str): - # uninstall Tiller if necessary - - self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) - - # init paths, env - paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True - ) - - if not namespace: - # find namespace for tiller pod - command = "{} --kubeconfig={} get deployments --all-namespaces".format( - self.kubectl_command, paths["kube_config"] - ) - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=False, env=env - ) - output_table = self._output_to_table(output=output) - namespace = None - for r in output_table: - try: - if "tiller-deploy" in r[1]: - namespace = r[0] - break - except Exception: - pass - else: - msg = "Tiller deployment not found in cluster {}".format(cluster_id) - self.log.error(msg) - - self.log.debug("namespace for tiller: {}".format(namespace)) - - if namespace: - # uninstall tiller from cluster - self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) - command = "{} --kubeconfig={} --home={} reset".format( - self._helm_command, paths["kube_config"], paths["helm_dir"] - ) - self.log.debug("resetting: {}".format(command)) - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env - ) - # Delete clusterrolebinding and serviceaccount. - # Ignore if errors for backward compatibility - command = ( - "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s." - "io/osm-tiller-cluster-rule" - ).format(self.kubectl_command, paths["kube_config"]) - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=False, env=env - ) - command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format( - self.kubectl_command, paths["kube_config"], self.service_account - ) - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=False, env=env - ) - - else: - self.log.debug("namespace not found") - - async def _instances_list(self, cluster_id): - - # init paths, env - paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True - ) - - command = "{} list --output yaml".format(self._helm_command) - - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env - ) - - if output and len(output) > 0: - # parse yaml and update keys to lower case to unify with helm3 - instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases") - new_instances = [] - for instance in instances: - new_instance = dict((k.lower(), v) for k, v in instance.items()) - new_instances.append(new_instance) - return new_instances - else: - return [] - - def _get_inspect_command( - self, show_command: str, kdu_model: str, repo_str: str, version: str - ): - inspect_command = "{} inspect {} {}{} {}".format( - self._helm_command, show_command, kdu_model, repo_str, version - ) - return inspect_command - - async def _status_kdu( - self, - cluster_id: str, - kdu_instance: str, - namespace: str = None, - show_error_log: bool = False, - return_text: bool = False, - ): - - self.log.debug( - "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) - ) - - # init config, env - paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True - ) - command = "{} status {} --output yaml".format(self._helm_command, kdu_instance) - output, rc = await self._local_async_exec( - command=command, - raise_exception_on_error=True, - show_error_log=show_error_log, - env=env, - ) - - if return_text: - return str(output) - - if rc != 0: - return None - - data = yaml.load(output, Loader=yaml.SafeLoader) - - # remove field 'notes' - try: - del data.get("info").get("status")["notes"] - except KeyError: - pass - - # parse field 'resources' - try: - resources = str(data.get("info").get("status").get("resources")) - resource_table = self._output_to_table(resources) - data.get("info").get("status")["resources"] = resource_table - except Exception: - pass - - # set description to lowercase (unify with helm3) - try: - data.get("info")["description"] = data.get("info").pop("Description") - except KeyError: - pass - - return data - - def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: - repo_ids = [] - cluster_filter = {"_admin.helm-chart.id": cluster_uuid} - cluster = self.db.get_one("k8sclusters", cluster_filter) - if cluster: - repo_ids = cluster.get("_admin").get("helm_chart_repos") or [] - return repo_ids - else: - raise K8sException( - "k8cluster with helm-id : {} not found".format(cluster_uuid) - ) - - async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool: - - status = await self._status_kdu( - cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False - ) - - # extract info.status.resources-> str - # format: - # ==> v1/Deployment - # NAME READY UP-TO-DATE AVAILABLE AGE - # halting-horse-mongodb 0/1 1 0 0s - # halting-petit-mongodb 1/1 1 0 0s - # blank line - resources = K8sHelmBaseConnector._get_deep( - status, ("info", "status", "resources") - ) - - # convert to table - resources = K8sHelmBaseConnector._output_to_table(resources) - - num_lines = len(resources) - index = 0 - ready = True - while index < num_lines: - try: - line1 = resources[index] - index += 1 - # find '==>' in column 0 - if line1[0] == "==>": - line2 = resources[index] - index += 1 - # find READY in column 1 - if line2[1] == "READY": - # read next lines - line3 = resources[index] - index += 1 - while len(line3) > 1 and index < num_lines: - ready_value = line3[1] - parts = ready_value.split(sep="/") - current = int(parts[0]) - total = int(parts[1]) - if current < total: - self.log.debug("NOT READY:\n {}".format(line3)) - ready = False - line3 = resources[index] - index += 1 - - except Exception: - pass - - return ready - - def _get_install_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout - ) -> str: - - timeout_str = "" - if timeout: - timeout_str = "--timeout {}".format(timeout) - - # atomic - atomic_str = "" - if atomic: - atomic_str = "--atomic" - # namespace - namespace_str = "" - if namespace: - namespace_str = "--namespace {}".format(namespace) - - # version - version_str = "" - if version: - version_str = version_str = "--version {}".format(version) - - command = ( - "{helm} install {atomic} --output yaml " - "{params} {timeout} --name={name} {ns} {model} {ver}".format( - helm=self._helm_command, - atomic=atomic_str, - params=params_str, - timeout=timeout_str, - name=kdu_instance, - ns=namespace_str, - model=kdu_model, - ver=version_str, - ) - ) - return command - - def _get_upgrade_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout - ) -> str: - - timeout_str = "" - if timeout: - timeout_str = "--timeout {}".format(timeout) - - # atomic - atomic_str = "" - if atomic: - atomic_str = "--atomic" - - # version - version_str = "" - if version: - version_str = "--version {}".format(version) - - command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}".format( - helm=self._helm_command, - atomic=atomic_str, - params=params_str, - timeout=timeout_str, - name=kdu_instance, - model=kdu_model, - ver=version_str, - ) - return command - - def _get_rollback_command(self, kdu_instance, namespace, revision) -> str: - return "{} rollback {} {} --wait".format( - self._helm_command, kdu_instance, revision - ) - - def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: - return "{} delete --purge {}".format(self._helm_command, kdu_instance)