X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=17e960f1a1814e6d92d000e0bc759950909ce40c;hb=de6984b39684c3a8587ea4111c757ab878942aba;hp=13a3114c2813590399182ecd2c2ef0d8f8cb9da8;hpb=7bd5c6affb2805caba1b832b56d0ad5712396306;p=osm%2FN2VC.git diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 13a3114..17e960f 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -20,6 +20,8 @@ # contact with: nfvlabs@tid.es ## import asyncio +from typing import Union +from shlex import quote import os import yaml @@ -72,17 +74,14 @@ class K8sHelmConnector(K8sHelmBaseConnector): self.log.debug("Initializing helm client-only...") command = "{} init --client-only {} ".format( self._helm_command, - "--stable-repo-url {}".format(self._stable_repo_url) + "--stable-repo-url {}".format(quote(self._stable_repo_url)) if self._stable_repo_url else "--skip-repos", ) try: - asyncio.ensure_future( + asyncio.create_task( self._local_async_exec(command=command, raise_exception_on_error=False) ) - # loop = asyncio.get_event_loop() - # loop.run_until_complete(self._local_async_exec(command=command, - # raise_exception_on_error=False)) except Exception as e: self.warning( msg="helm init failed (it was already initialized): {}".format(e) @@ -109,11 +108,12 @@ class K8sHelmConnector(K8sHelmBaseConnector): happen before any _initial-config-primitive_of the VNF is called). :param cluster_uuid: UUID of a K8s cluster known by OSM - :param kdu_model: chart/ reference (string), which can be either + :param kdu_model: chart/reference (string), which can be either of these options: - a name of chart available via the repos known by OSM - - a path to a packaged chart - - a path to an unpacked chart directory or a URL + (e.g. stable/openldap, stable/openldap:1.2.4) + - a path to a packaged chart (e.g. mychart.tgz) + - a path to an unpacked chart directory or a URL (e.g. mychart) :param kdu_instance: Kdu instance name :param atomic: If set, installation process purges chart/bundle on fail, also will wait until all the K8s objects are active @@ -131,19 +131,18 @@ class K8sHelmConnector(K8sHelmBaseConnector): :param kwargs: Additional parameters (None yet) :return: True if successful """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) + self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) await self._install_impl( - cluster_id, + cluster_uuid, kdu_model, paths, env, @@ -157,18 +156,17 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) self.log.debug("Returning kdu_instance {}".format(kdu_instance)) return True async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug( "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) ) - return await self._exec_inspect_comand( + return await self._exec_inspect_command( inspect_command="", kdu_model=kdu_model, repo_url=repo_url ) @@ -234,16 +232,17 @@ class K8sHelmConnector(K8sHelmBaseConnector): return paths, env async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): - # init config, env paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) command1 = "env KUBECONFIG={} {} get manifest {} ".format( - kubeconfig, self._helm_command, kdu_instance + kubeconfig, self._helm_command, quote(kdu_instance) + ) + command2 = "{} get --namespace={} -f -".format( + self.kubectl_command, quote(namespace) ) - command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace) output, _rc = await self._local_async_exec_pipe( command1, command2, env=env, raise_exception_on_error=True ) @@ -261,7 +260,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): # check if tiller pod is up in cluster command = "{} --kubeconfig={} --namespace={} get deployments".format( - self.kubectl_command, paths["kube_config"], namespace + self.kubectl_command, paths["kube_config"], quote(namespace) ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=True, env=env @@ -286,7 +285,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): "Initializing helm in client and server: {}".format(cluster_id) ) command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format( - self.kubectl_command, paths["kube_config"], self.service_account + self.kubectl_command, paths["kube_config"], quote(self.service_account) ) _, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env @@ -295,7 +294,9 @@ class K8sHelmConnector(K8sHelmBaseConnector): command = ( "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule " "--clusterrole=cluster-admin --serviceaccount=kube-system:{}" - ).format(self.kubectl_command, paths["kube_config"], self.service_account) + ).format( + self.kubectl_command, paths["kube_config"], quote(self.service_account) + ) _, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -306,10 +307,10 @@ class K8sHelmConnector(K8sHelmBaseConnector): ).format( self._helm_command, paths["kube_config"], - namespace, - paths["helm_dir"], - self.service_account, - "--stable-repo-url {}".format(self._stable_repo_url) + quote(namespace), + quote(paths["helm_dir"]), + quote(self.service_account), + "--stable-repo-url {}".format(quote(self._stable_repo_url)) if self._stable_repo_url else "--skip-repos", ) @@ -330,9 +331,9 @@ class K8sHelmConnector(K8sHelmBaseConnector): ).format( self._helm_command, paths["kube_config"], - namespace, - paths["helm_dir"], - "--stable-repo-url {}".format(self._stable_repo_url) + quote(namespace), + quote(paths["helm_dir"]), + "--stable-repo-url {}".format(quote(self._stable_repo_url)) if self._stable_repo_url else "--skip-repos", ) @@ -342,15 +343,13 @@ class K8sHelmConnector(K8sHelmBaseConnector): else: self.log.info("Helm client already initialized") - # remove old stable repo and add new one - cluster_uuid = "{}:{}".format(namespace, cluster_id) - repo_list = await self.repo_list(cluster_uuid) + repo_list = await self.repo_list(cluster_id) for repo in repo_list: if repo["name"] == "stable" and repo["url"] != self._stable_repo_url: self.log.debug("Add new stable repo url: {}") - await self.repo_remove(cluster_uuid, "stable") + await self.repo_remove(cluster_id, "stable") if self._stable_repo_url: - await self.repo_add(cluster_uuid, "stable", self._stable_repo_url) + await self.repo_add(cluster_id, "stable", self._stable_repo_url) break return n2vc_installed_sw @@ -368,7 +367,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): if not namespace: # find namespace for tiller pod command = "{} --kubeconfig={} get deployments --all-namespaces".format( - self.kubectl_command, paths["kube_config"] + self.kubectl_command, quote(paths["kube_config"]) ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env @@ -392,7 +391,9 @@ class K8sHelmConnector(K8sHelmBaseConnector): # uninstall tiller from cluster self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) command = "{} --kubeconfig={} --home={} reset".format( - self._helm_command, paths["kube_config"], paths["helm_dir"] + self._helm_command, + quote(paths["kube_config"]), + quote(paths["helm_dir"]), ) self.log.debug("resetting: {}".format(command)) output, _rc = await self._local_async_exec( @@ -403,12 +404,17 @@ class K8sHelmConnector(K8sHelmBaseConnector): command = ( "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s." "io/osm-tiller-cluster-rule" - ).format(self.kubectl_command, paths["kube_config"]) + ).format(self.kubectl_command, quote(paths["kube_config"])) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) - command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format( - self.kubectl_command, paths["kube_config"], self.service_account + command = ( + "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format( + self.kubectl_command, + quote(paths["kube_config"]), + quote(namespace), + quote(self.service_account), + ) ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env @@ -418,7 +424,6 @@ class K8sHelmConnector(K8sHelmBaseConnector): self.log.debug("namespace not found") async def _instances_list(self, cluster_id): - # init paths, env paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True @@ -445,19 +450,26 @@ class K8sHelmConnector(K8sHelmBaseConnector): self, show_command: str, kdu_model: str, repo_str: str, version: str ): inspect_command = "{} inspect {} {}{} {}".format( - self._helm_command, show_command, kdu_model, repo_str, version + self._helm_command, show_command, quote(kdu_model), repo_str, version ) return inspect_command + def _get_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format( + kubeconfig, self._helm_command, get_command, quote(kdu_instance) + ) + return get_command + async def _status_kdu( self, cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): - + ) -> Union[str, dict]: self.log.debug( "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) ) @@ -467,7 +479,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): cluster_name=cluster_id, create_if_not_exist=True ) command = ("env KUBECONFIG={} {} status {} --output yaml").format( - paths["kube_config"], self._helm_command, kdu_instance + paths["kube_config"], self._helm_command, quote(kdu_instance) ) output, rc = await self._local_async_exec( command=command, @@ -476,7 +488,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): env=env, ) - if return_text: + if yaml_format: return str(output) if rc != 0: @@ -490,6 +502,15 @@ class K8sHelmConnector(K8sHelmBaseConnector): except KeyError: pass + # parse the manifest to a list of dictionaries + if "manifest" in data: + manifest_str = data.get("manifest") + manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader) + + data["manifest"] = [] + for doc in manifest_docs: + data["manifest"].append(doc) + # parse field 'resources' try: resources = str(data.get("info").get("status").get("resources")) @@ -525,7 +546,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) status = await self._status_kdu( - cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False + cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False ) # extract info.status.resources-> str @@ -585,7 +606,6 @@ class K8sHelmConnector(K8sHelmBaseConnector): timeout, kubeconfig, ) -> str: - timeout_str = "" if timeout: timeout_str = "--timeout {}".format(timeout) @@ -597,12 +617,12 @@ class K8sHelmConnector(K8sHelmBaseConnector): # namespace namespace_str = "" if namespace: - namespace_str = "--namespace {}".format(namespace) + namespace_str = "--namespace {}".format(quote(namespace)) # version version_str = "" if version: - version_str = version_str = "--version {}".format(version) + version_str = "--version {}".format(version) command = ( "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml " @@ -612,14 +632,65 @@ class K8sHelmConnector(K8sHelmBaseConnector): atomic=atomic_str, params=params_str, timeout=timeout_str, - name=kdu_instance, + name=quote(kdu_instance), ns=namespace_str, - model=kdu_model, + model=quote(kdu_model), ver=version_str, ) ) return command + def _get_upgrade_scale_command( + self, + kdu_model: str, + kdu_instance: str, + namespace: str, + scale: int, + version: str, + atomic: bool, + replica_str: str, + timeout: float, + resource_name: str, + kubeconfig: str, + ) -> str: + """Generates the command to scale a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + scale (int): Scale count + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + replica_str (str): The key under resource_name key where the scale count is stored + timeout (float): The time, in seconds, to wait + resource_name (str): The KDU's resource to scale + kubeconfig (str): Kubeconfig file path + + Returns: + str: command to scale a Helm Chart release + """ + + # scale + if resource_name: + scale_dict = {"{}.{}".format(resource_name, replica_str): scale} + else: + scale_dict = {replica_str: scale} + + scale_str = self._params_to_set_option(scale_dict) + + return self._get_upgrade_command( + kdu_model=kdu_model, + kdu_instance=kdu_instance, + namespace=namespace, + params_str=scale_str, + version=version, + atomic=atomic, + timeout=timeout, + kubeconfig=kubeconfig, + ) + def _get_upgrade_command( self, kdu_model, @@ -630,7 +701,24 @@ class K8sHelmConnector(K8sHelmBaseConnector): atomic, timeout, kubeconfig, + force: bool = False, ) -> str: + """Generates the command to upgrade a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + params_str (str): Params used to upgrade the Helm Chart release + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + timeout (float): The time, in seconds, to wait + kubeconfig (str): Kubeconfig file path + force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. + Returns: + str: command to upgrade a Helm Chart release + """ timeout_str = "" if timeout: @@ -641,21 +729,34 @@ class K8sHelmConnector(K8sHelmBaseConnector): if atomic: atomic_str = "--atomic" + # force + force_str = "" + if force: + force_str = "--force " + # version version_str = "" if version: - version_str = "--version {}".format(version) + version_str = "--version {}".format(quote(version)) + + # namespace + namespace_str = "" + if namespace: + namespace_str = "--namespace {}".format(quote(namespace)) command = ( - "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}" + "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}" + "--reuse-values {name} {model} {ver}" ).format( kubeconfig=kubeconfig, helm=self._helm_command, + namespace=namespace_str, atomic=atomic_str, + force=force_str, params=params_str, timeout=timeout_str, - name=kdu_instance, - model=kdu_model, + name=quote(kdu_instance), + model=quote(kdu_model), ver=version_str, ) return command @@ -664,12 +765,12 @@ class K8sHelmConnector(K8sHelmBaseConnector): self, kdu_instance, namespace, revision, kubeconfig ) -> str: return "env KUBECONFIG={} {} rollback {} {} --wait".format( - kubeconfig, self._helm_command, kdu_instance, revision + kubeconfig, self._helm_command, quote(kdu_instance), revision ) def _get_uninstall_command( self, kdu_instance: str, namespace: str, kubeconfig: str ) -> str: return "env KUBECONFIG={} {} delete --purge {}".format( - kubeconfig, self._helm_command, kdu_instance + kubeconfig, self._helm_command, quote(kdu_instance) )