X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=8c526f5d2c0aa9c4c3d4e96aaa6a444808197bf2;hp=6bbc0fa79b0b69eda28060a981441aafcfb440ae;hb=a8980cc3f6508f2659dc4ba4fcbeed65ba3c8e95;hpb=4395cfa6c7d0d80980c00d9f078440e0333fd826 diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 6bbc0fa..8c526f5 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -20,6 +20,7 @@ # contact with: nfvlabs@tid.es ## import asyncio +from typing import Union import os import yaml @@ -131,19 +132,18 @@ class K8sHelmConnector(K8sHelmBaseConnector): :param kwargs: Additional parameters (None yet) :return: True if successful """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) + self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) await self._install_impl( - cluster_id, + cluster_uuid, kdu_model, paths, env, @@ -157,7 +157,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) self.log.debug("Returning kdu_instance {}".format(kdu_instance)) return True @@ -168,7 +168,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) ) - return await self._exec_inspect_comand( + return await self._exec_inspect_command( inspect_command="", kdu_model=kdu_model, repo_url=repo_url ) @@ -233,14 +233,16 @@ class K8sHelmConnector(K8sHelmBaseConnector): return paths, env - async def _get_services(self, cluster_id, kdu_instance, namespace): + async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): # init config, env paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance) + command1 = "env KUBECONFIG={} {} get manifest {} ".format( + kubeconfig, self._helm_command, kdu_instance + ) command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace) output, _rc = await self._local_async_exec_pipe( command1, command2, env=env, raise_exception_on_error=True @@ -340,15 +342,13 @@ class K8sHelmConnector(K8sHelmBaseConnector): else: self.log.info("Helm client already initialized") - # remove old stable repo and add new one - cluster_uuid = "{}:{}".format(namespace, cluster_id) - repo_list = await self.repo_list(cluster_uuid) + repo_list = await self.repo_list(cluster_id) for repo in repo_list: if repo["name"] == "stable" and repo["url"] != self._stable_repo_url: self.log.debug("Add new stable repo url: {}") - await self.repo_remove(cluster_uuid, "stable") + await self.repo_remove(cluster_id, "stable") if self._stable_repo_url: - await self.repo_add(cluster_uuid, "stable", self._stable_repo_url) + await self.repo_add(cluster_id, "stable", self._stable_repo_url) break return n2vc_installed_sw @@ -405,8 +405,13 @@ class K8sHelmConnector(K8sHelmBaseConnector): output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) - command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format( - self.kubectl_command, paths["kube_config"], self.service_account + command = ( + "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format( + self.kubectl_command, + paths["kube_config"], + namespace, + self.service_account, + ) ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env @@ -447,14 +452,22 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) return inspect_command + def _get_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format( + kubeconfig, self._helm_command, get_command, kdu_instance + ) + return get_command + async def _status_kdu( self, cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: self.log.debug( "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) @@ -464,7 +477,9 @@ class K8sHelmConnector(K8sHelmBaseConnector): paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command = "{} status {} --output yaml".format(self._helm_command, kdu_instance) + command = ("env KUBECONFIG={} {} status {} --output yaml").format( + paths["kube_config"], self._helm_command, kdu_instance + ) output, rc = await self._local_async_exec( command=command, raise_exception_on_error=True, @@ -472,7 +487,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): env=env, ) - if return_text: + if yaml_format: return str(output) if rc != 0: @@ -486,6 +501,15 @@ class K8sHelmConnector(K8sHelmBaseConnector): except KeyError: pass + # parse the manifest to a list of dictionaries + if "manifest" in data: + manifest_str = data.get("manifest") + manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader) + + data["manifest"] = [] + for doc in manifest_docs: + data["manifest"].append(doc) + # parse field 'resources' try: resources = str(data.get("info").get("status").get("resources")) @@ -515,9 +539,13 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool: + # init config, env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) status = await self._status_kdu( - cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False + cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False ) # extract info.status.resources-> str @@ -567,7 +595,15 @@ class K8sHelmConnector(K8sHelmBaseConnector): return ready def _get_install_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: timeout_str = "" @@ -586,11 +622,12 @@ class K8sHelmConnector(K8sHelmBaseConnector): # version version_str = "" if version: - version_str = version_str = "--version {}".format(version) + version_str = "--version {}".format(version) command = ( - "{helm} install {atomic} --output yaml " + "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml " "{params} {timeout} --name={name} {ns} {model} {ver}".format( + kubeconfig=kubeconfig, helm=self._helm_command, atomic=atomic_str, params=params_str, @@ -603,8 +640,66 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) return command + def _get_upgrade_scale_command( + self, + kdu_model: str, + kdu_instance: str, + namespace: str, + scale: int, + version: str, + atomic: bool, + replica_str: str, + timeout: float, + resource_name: str, + kubeconfig: str, + ) -> str: + + timeout_str = "" + if timeout: + timeout_str = "--timeout {}s".format(timeout) + + # atomic + atomic_str = "" + if atomic: + atomic_str = "--atomic" + + # version + version_str = "" + if version: + version_str = "--version {}".format(version) + + # scale + if resource_name: + scale_dict = {"{}.{}".format(resource_name, replica_str): scale} + else: + scale_dict = {replica_str: scale} + + scale_str = self._params_to_set_option(scale_dict) + + command = ( + "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}" + ).format( + helm=self._helm_command, + name=kdu_instance, + atomic=atomic_str, + scale=scale_str, + timeout=timeout_str, + model=kdu_model, + ver=version_str, + kubeconfig=kubeconfig, + ) + return command + def _get_upgrade_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: timeout_str = "" @@ -621,7 +716,10 @@ class K8sHelmConnector(K8sHelmBaseConnector): if version: version_str = "--version {}".format(version) - command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}".format( + command = ( + "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}" + ).format( + kubeconfig=kubeconfig, helm=self._helm_command, atomic=atomic_str, params=params_str, @@ -632,10 +730,16 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) return command - def _get_rollback_command(self, kdu_instance, namespace, revision) -> str: - return "{} rollback {} {} --wait".format( - self._helm_command, kdu_instance, revision + def _get_rollback_command( + self, kdu_instance, namespace, revision, kubeconfig + ) -> str: + return "env KUBECONFIG={} {} rollback {} {} --wait".format( + kubeconfig, self._helm_command, kdu_instance, revision ) - def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: - return "{} delete --purge {}".format(self._helm_command, kdu_instance) + def _get_uninstall_command( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: + return "env KUBECONFIG={} {} delete --purge {}".format( + kubeconfig, self._helm_command, kdu_instance + )