X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=8c526f5d2c0aa9c4c3d4e96aaa6a444808197bf2;hp=76ca0b92e3e32d34ae2b6210e5ac3d641ae14c43;hb=a8980cc3f6508f2659dc4ba4fcbeed65ba3c8e95;hpb=095392b4b80e0b63c66282f283c1139ec0536750 diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 76ca0b9..8c526f5 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -20,6 +20,7 @@ # contact with: nfvlabs@tid.es ## import asyncio +from typing import Union import os import yaml @@ -70,7 +71,12 @@ class K8sHelmConnector(K8sHelmBaseConnector): # initialize helm client-only self.log.debug("Initializing helm client-only...") - command = "{} init --client-only".format(self._helm_command) + command = "{} init --client-only {} ".format( + self._helm_command, + "--stable-repo-url {}".format(self._stable_repo_url) + if self._stable_repo_url + else "--skip-repos", + ) try: asyncio.ensure_future( self._local_async_exec(command=command, raise_exception_on_error=False) @@ -86,43 +92,75 @@ class K8sHelmConnector(K8sHelmBaseConnector): self.log.info("K8S Helm2 connector initialized") async def install( - self, - cluster_uuid: str, - kdu_model: str, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None, - kdu_name: str = None, - namespace: str = None, + self, + cluster_uuid: str, + kdu_model: str, + kdu_instance: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, + kdu_name: str = None, + namespace: str = None, + **kwargs, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) + """ + Deploys of a new KDU instance. It would implicitly rely on the `install` call + to deploy the Chart/Bundle properly parametrized (in practice, this call would + happen before any _initial-config-primitive_of the VNF is called). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_model: chart/ reference (string), which can be either + of these options: + - a name of chart available via the repos known by OSM + - a path to a packaged chart + - a path to an unpacked chart directory or a URL + :param kdu_instance: Kdu instance name + :param atomic: If set, installation process purges chart/bundle on fail, also + will wait until all the K8s objects are active + :param timeout: Time in seconds to wait for the install of the chart/bundle + (defaults to Helm default timeout: 300s) + :param params: dictionary of key-value pairs for instantiation parameters + (overriding default values) + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} + :param kdu_name: Name of the KDU instance to be installed + :param namespace: K8s namespace to use for the KDU instance + :param kwargs: Additional parameters (None yet) + :return: True if successful + """ + self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) - kdu_instance = await self._install_impl(cluster_id, - kdu_model, - paths, - env, - atomic=atomic, - timeout=timeout, - params=params, - db_dict=db_dict, - kdu_name=kdu_name, - namespace=namespace) + await self._install_impl( + cluster_uuid, + kdu_model, + paths, + env, + kdu_instance, + atomic=atomic, + timeout=timeout, + params=params, + db_dict=db_dict, + kdu_name=kdu_name, + namespace=namespace, + ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) self.log.debug("Returning kdu_instance {}".format(kdu_instance)) - return kdu_instance + return True async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: @@ -130,7 +168,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) ) - return await self._exec_inspect_comand( + return await self._exec_inspect_command( inspect_command="", kdu_model=kdu_model, repo_url=repo_url ) @@ -195,14 +233,16 @@ class K8sHelmConnector(K8sHelmBaseConnector): return paths, env - async def _get_services(self, cluster_id, kdu_instance, namespace): + async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): # init config, env paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance) + command1 = "env KUBECONFIG={} {} get manifest {} ".format( + kubeconfig, self._helm_command, kdu_instance + ) command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace) output, _rc = await self._local_async_exec_pipe( command1, command2, env=env, raise_exception_on_error=True @@ -211,8 +251,9 @@ class K8sHelmConnector(K8sHelmBaseConnector): return services - async def _cluster_init(self, cluster_id: str, namespace: str, - paths: dict, env: dict): + async def _cluster_init( + self, cluster_id: str, namespace: str, paths: dict, env: dict + ): """ Implements the helm version dependent cluster initialization: For helm2 it initialized tiller environment if needed @@ -261,13 +302,16 @@ class K8sHelmConnector(K8sHelmBaseConnector): command = ( "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} " - "init" + " {} init" ).format( self._helm_command, paths["kube_config"], namespace, paths["helm_dir"], self.service_account, + "--stable-repo-url {}".format(self._stable_repo_url) + if self._stable_repo_url + else "--skip-repos", ) _, _rc = await self._local_async_exec( command=command, raise_exception_on_error=True, env=env @@ -282,12 +326,15 @@ class K8sHelmConnector(K8sHelmBaseConnector): self.log.info("Initializing helm in client: {}".format(cluster_id)) command = ( "{} --kubeconfig={} --tiller-namespace={} " - "--home={} init --client-only" + "--home={} init --client-only {} " ).format( self._helm_command, paths["kube_config"], namespace, paths["helm_dir"], + "--stable-repo-url {}".format(self._stable_repo_url) + if self._stable_repo_url + else "--skip-repos", ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=True, env=env @@ -295,6 +342,15 @@ class K8sHelmConnector(K8sHelmBaseConnector): else: self.log.info("Helm client already initialized") + repo_list = await self.repo_list(cluster_id) + for repo in repo_list: + if repo["name"] == "stable" and repo["url"] != self._stable_repo_url: + self.log.debug("Add new stable repo url: {}") + await self.repo_remove(cluster_id, "stable") + if self._stable_repo_url: + await self.repo_add(cluster_id, "stable", self._stable_repo_url) + break + return n2vc_installed_sw async def _uninstall_sw(self, cluster_id: str, namespace: str): @@ -349,8 +405,13 @@ class K8sHelmConnector(K8sHelmBaseConnector): output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) - command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format( - self.kubectl_command, paths["kube_config"], self.service_account + command = ( + "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format( + self.kubectl_command, + paths["kube_config"], + namespace, + self.service_account, + ) ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env @@ -383,21 +444,30 @@ class K8sHelmConnector(K8sHelmBaseConnector): else: return [] - def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str, - version: str): + def _get_inspect_command( + self, show_command: str, kdu_model: str, repo_str: str, version: str + ): inspect_command = "{} inspect {} {}{} {}".format( self._helm_command, show_command, kdu_model, repo_str, version ) return inspect_command + def _get_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format( + kubeconfig, self._helm_command, get_command, kdu_instance + ) + return get_command + async def _status_kdu( self, cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: self.log.debug( "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace) @@ -407,7 +477,9 @@ class K8sHelmConnector(K8sHelmBaseConnector): paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command = "{} status {} --output yaml".format(self._helm_command, kdu_instance) + command = ("env KUBECONFIG={} {} status {} --output yaml").format( + paths["kube_config"], self._helm_command, kdu_instance + ) output, rc = await self._local_async_exec( command=command, raise_exception_on_error=True, @@ -415,7 +487,7 @@ class K8sHelmConnector(K8sHelmBaseConnector): env=env, ) - if return_text: + if yaml_format: return str(output) if rc != 0: @@ -429,6 +501,15 @@ class K8sHelmConnector(K8sHelmBaseConnector): except KeyError: pass + # parse the manifest to a list of dictionaries + if "manifest" in data: + manifest_str = data.get("manifest") + manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader) + + data["manifest"] = [] + for doc in manifest_docs: + data["manifest"].append(doc) + # parse field 'resources' try: resources = str(data.get("info").get("status").get("resources")) @@ -458,9 +539,13 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool: + # init config, env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) status = await self._status_kdu( - cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False + cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False ) # extract info.status.resources-> str @@ -510,7 +595,15 @@ class K8sHelmConnector(K8sHelmBaseConnector): return ready def _get_install_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: timeout_str = "" @@ -529,11 +622,12 @@ class K8sHelmConnector(K8sHelmBaseConnector): # version version_str = "" if version: - version_str = version_str = "--version {}".format(version) + version_str = "--version {}".format(version) command = ( - "{helm} install {atomic} --output yaml " + "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml " "{params} {timeout} --name={name} {ns} {model} {ver}".format( + kubeconfig=kubeconfig, helm=self._helm_command, atomic=atomic_str, params=params_str, @@ -546,8 +640,66 @@ class K8sHelmConnector(K8sHelmBaseConnector): ) return command + def _get_upgrade_scale_command( + self, + kdu_model: str, + kdu_instance: str, + namespace: str, + scale: int, + version: str, + atomic: bool, + replica_str: str, + timeout: float, + resource_name: str, + kubeconfig: str, + ) -> str: + + timeout_str = "" + if timeout: + timeout_str = "--timeout {}s".format(timeout) + + # atomic + atomic_str = "" + if atomic: + atomic_str = "--atomic" + + # version + version_str = "" + if version: + version_str = "--version {}".format(version) + + # scale + if resource_name: + scale_dict = {"{}.{}".format(resource_name, replica_str): scale} + else: + scale_dict = {replica_str: scale} + + scale_str = self._params_to_set_option(scale_dict) + + command = ( + "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}" + ).format( + helm=self._helm_command, + name=kdu_instance, + atomic=atomic_str, + scale=scale_str, + timeout=timeout_str, + model=kdu_model, + ver=version_str, + kubeconfig=kubeconfig, + ) + return command + def _get_upgrade_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: timeout_str = "" @@ -564,21 +716,30 @@ class K8sHelmConnector(K8sHelmBaseConnector): if version: version_str = "--version {}".format(version) - command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\ - .format(helm=self._helm_command, - atomic=atomic_str, - params=params_str, - timeout=timeout_str, - name=kdu_instance, - model=kdu_model, - ver=version_str - ) + command = ( + "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}" + ).format( + kubeconfig=kubeconfig, + helm=self._helm_command, + atomic=atomic_str, + params=params_str, + timeout=timeout_str, + name=kdu_instance, + model=kdu_model, + ver=version_str, + ) return command - def _get_rollback_command(self, kdu_instance, namespace, revision) -> str: - return "{} rollback {} {} --wait".format( - self._helm_command, kdu_instance, revision + def _get_rollback_command( + self, kdu_instance, namespace, revision, kubeconfig + ) -> str: + return "env KUBECONFIG={} {} rollback {} {} --wait".format( + kubeconfig, self._helm_command, kdu_instance, revision ) - def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: - return "{} delete --purge {}".format(self._helm_command, kdu_instance) + def _get_uninstall_command( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: + return "env KUBECONFIG={} {} delete --purge {}".format( + kubeconfig, self._helm_command, kdu_instance + )