X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm3_conn.py;h=bb08f0792aaeddd542fa1e8e065d8efbeb2d7139;hp=06e57887b2e4e7c251ee15976945e63235bd6e90;hb=a8980cc3f6508f2659dc4ba4fcbeed65ba3c8e95;hpb=82b591ceed704c798ead2d9104085a08e75b511b diff --git a/n2vc/k8s_helm3_conn.py b/n2vc/k8s_helm3_conn.py index 06e5788..bb08f07 100644 --- a/n2vc/k8s_helm3_conn.py +++ b/n2vc/k8s_helm3_conn.py @@ -19,6 +19,7 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact with: nfvlabs@tid.es ## +from typing import Union import os import yaml @@ -42,7 +43,6 @@ class K8sHelm3Connector(K8sHelmBaseConnector): helm_command: str = "/usr/bin/helm3", log: object = None, on_update_db=None, - vca_config: dict = None, ): """ Initializes helm connector for helm v3 @@ -64,7 +64,6 @@ class K8sHelm3Connector(K8sHelmBaseConnector): kubectl_command=kubectl_command, helm_command=helm_command, on_update_db=on_update_db, - vca_config=vca_config, ) self.log.info("K8S Helm3 connector initialized") @@ -99,25 +98,33 @@ class K8sHelm3Connector(K8sHelmBaseConnector): :return: True if successful """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) + + self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # for helm3 if namespace does not exist must create it if namespace and namespace != "kube-system": - namespaces = await self._get_namespaces(cluster_id) - if namespace not in namespaces: - await self._create_namespace(cluster_id, namespace) + if not await self._namespace_exists(cluster_uuid, namespace): + try: + await self._create_namespace(cluster_uuid, namespace) + except Exception as e: + if not await self._namespace_exists(cluster_uuid, namespace): + err_msg = ( + "namespace {} does not exist in cluster_id {} " + "error message: ".format(namespace, e) + ) + self.log.error(err_msg) + raise K8sException(err_msg) await self._install_impl( - cluster_id, + cluster_uuid, kdu_model, paths, env, @@ -131,7 +138,7 @@ class K8sHelm3Connector(K8sHelmBaseConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) self.log.debug("Returning kdu_instance {}".format(kdu_instance)) return True @@ -142,7 +149,7 @@ class K8sHelm3Connector(K8sHelmBaseConnector): "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) ) - return await self._exec_inspect_comand( + return await self._exec_inspect_command( inspect_command="all", kdu_model=kdu_model, repo_url=repo_url ) @@ -228,6 +235,15 @@ class K8sHelm3Connector(K8sHelmBaseConnector): return paths, env + async def _namespace_exists(self, cluster_id, namespace) -> bool: + self.log.debug( + "checking if namespace {} exists cluster_id {}".format( + namespace, cluster_id + ) + ) + namespaces = await self._get_namespaces(cluster_id) + return namespace in namespaces if namespaces else False + async def _get_namespaces(self, cluster_id: str): self.log.debug("get namespaces cluster_id {}".format(cluster_id)) @@ -269,15 +285,17 @@ class K8sHelm3Connector(K8sHelmBaseConnector): return _rc - async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str): + async def _get_services( + self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str + ): # init config, env paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command1 = "{} get manifest {} --namespace={}".format( - self._helm_command, kdu_instance, namespace + command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format( + kubeconfig, self._helm_command, kdu_instance, namespace ) command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace) output, _rc = await self._local_async_exec_pipe( @@ -297,16 +315,10 @@ class K8sHelm3Connector(K8sHelmBaseConnector): if namespace not in namespaces: await self._create_namespace(cluster_id, namespace) - # If default repo is not included add - cluster_uuid = "{}:{}".format(namespace, cluster_id) - repo_list = await self.repo_list(cluster_uuid) - for repo in repo_list: - self.log.debug("repo") - if repo["name"] == "stable": - self.log.debug("Default repo already present") - break - else: - await self.repo_add(cluster_uuid, "stable", self._stable_repo_url) + repo_list = await self.repo_list(cluster_id) + stable_repo = [repo for repo in repo_list if repo["name"] == "stable"] + if not stable_repo and self._stable_repo_url: + await self.repo_add(cluster_id, "stable", self._stable_repo_url) # Returns False as no software needs to be uninstalled return False @@ -341,14 +353,24 @@ class K8sHelm3Connector(K8sHelmBaseConnector): ) return inspect_command + def _get_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + get_command = ( + "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format( + kubeconfig, self._helm_command, get_command, kdu_instance, namespace + ) + ) + return get_command + async def _status_kdu( self, cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: self.log.debug( "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) @@ -361,8 +383,8 @@ class K8sHelm3Connector(K8sHelmBaseConnector): paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command = "{} status {} --namespace={} --output yaml".format( - self._helm_command, kdu_instance, namespace + command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format( + paths["kube_config"], self._helm_command, kdu_instance, namespace ) output, rc = await self._local_async_exec( @@ -372,7 +394,7 @@ class K8sHelm3Connector(K8sHelmBaseConnector): env=env, ) - if return_text: + if yaml_format: return str(output) if rc != 0: @@ -383,11 +405,18 @@ class K8sHelm3Connector(K8sHelmBaseConnector): # remove field 'notes' and manifest try: del data.get("info")["notes"] - del data["manifest"] except KeyError: pass - # unable to parse 'resources' as currently it is not included in helm3 + # parse the manifest to a list of dictionaries + if "manifest" in data: + manifest_str = data.get("manifest") + manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader) + + data["manifest"] = [] + for doc in manifest_docs: + data["manifest"].append(doc) + return data def _get_install_command( @@ -399,6 +428,7 @@ class K8sHelm3Connector(K8sHelmBaseConnector): version: str, atomic: bool, timeout: float, + kubeconfig: str, ) -> str: timeout_str = "" @@ -420,8 +450,9 @@ class K8sHelm3Connector(K8sHelmBaseConnector): version_str = "--version {}".format(version) command = ( - "{helm} install {name} {atomic} --output yaml " + "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml " "{params} {timeout} {ns} {model} {ver}".format( + kubeconfig=kubeconfig, helm=self._helm_command, name=kdu_instance, atomic=atomic_str, @@ -434,6 +465,63 @@ class K8sHelm3Connector(K8sHelmBaseConnector): ) return command + def _get_upgrade_scale_command( + self, + kdu_model: str, + kdu_instance: str, + namespace: str, + scale: int, + version: str, + atomic: bool, + replica_str: str, + timeout: float, + resource_name: str, + kubeconfig: str, + ) -> str: + + timeout_str = "" + if timeout: + timeout_str = "--timeout {}s".format(timeout) + + # atomic + atomic_str = "" + if atomic: + atomic_str = "--atomic" + + # version + version_str = "" + if version: + version_str = "--version {}".format(version) + + # namespace + namespace_str = "" + if namespace: + namespace_str = "--namespace {}".format(namespace) + + # scale + if resource_name: + scale_dict = {"{}.{}".format(resource_name, replica_str): scale} + else: + scale_dict = {replica_str: scale} + + scale_str = self._params_to_set_option(scale_dict) + + command = ( + "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} " + "{timeout} {ver}" + ).format( + helm=self._helm_command, + name=kdu_instance, + namespace=namespace_str, + atomic=atomic_str, + scale=scale_str, + timeout=timeout_str, + model=kdu_model, + ver=version_str, + kubeconfig=kubeconfig, + ) + return command + def _get_upgrade_command( self, kdu_model: str, @@ -443,6 +531,7 @@ class K8sHelm3Connector(K8sHelmBaseConnector): version: str, atomic: bool, timeout: float, + kubeconfig: str, ) -> str: timeout_str = "" @@ -465,31 +554,34 @@ class K8sHelm3Connector(K8sHelmBaseConnector): namespace_str = "--namespace {}".format(namespace) command = ( - "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} " - "{timeout} {ver}".format( - helm=self._helm_command, - name=kdu_instance, - namespace=namespace_str, - atomic=atomic_str, - params=params_str, - timeout=timeout_str, - model=kdu_model, - ver=version_str, - ) + "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} " + "--output yaml {params} {timeout} {ver}" + ).format( + kubeconfig=kubeconfig, + helm=self._helm_command, + name=kdu_instance, + namespace=namespace_str, + atomic=atomic_str, + params=params_str, + timeout=timeout_str, + model=kdu_model, + ver=version_str, ) return command def _get_rollback_command( - self, kdu_instance: str, namespace: str, revision: float + self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str ) -> str: - return "{} rollback {} {} --namespace={} --wait".format( - self._helm_command, kdu_instance, revision, namespace + return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format( + kubeconfig, self._helm_command, kdu_instance, revision, namespace ) - def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: + def _get_uninstall_command( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: - return "{} uninstall {} --namespace={}".format( - self._helm_command, kdu_instance, namespace + return "env KUBECONFIG={} {} uninstall {} --namespace={}".format( + kubeconfig, self._helm_command, kdu_instance, namespace ) def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: