X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=5079dbe6f917fe53894cda0d4f402c4a8d9a81c2;hp=20fa337ddd8d681adc11eab405e0e0005edca745;hb=a8980cc3f6508f2659dc4ba4fcbeed65ba3c8e95;hpb=7bd5c6affb2805caba1b832b56d0ad5712396306 diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 20fa337..5079dbe 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -21,6 +21,7 @@ ## import abc import asyncio +from typing import Union import random import time import shlex @@ -89,14 +90,18 @@ class K8sHelmBaseConnector(K8sConnector): if self._stable_repo_url == "None": self._stable_repo_url = None - @staticmethod - def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): + def _get_namespace(self, cluster_uuid: str) -> str: """ - Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only - cluster_id for backward compatibility + Obtains the namespace used by the cluster with the uuid passed by argument + + param: cluster_uuid: cluster's uuid """ - namespace, _, cluster_id = cluster_uuid.rpartition(":") - return namespace, cluster_id + + # first, obtain the cluster corresponding to the uuid passed by argument + k8scluster = self.db.get_one( + "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False + ) + return k8scluster.get("namespace") async def init_env( self, @@ -120,11 +125,9 @@ class K8sHelmBaseConnector(K8sConnector): """ if reuse_cluster_uuid: - namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid) - namespace = namespace_ or namespace + cluster_id = reuse_cluster_uuid else: cluster_id = str(uuid4()) - cluster_uuid = "{}:{}".format(namespace, cluster_id) self.log.debug( "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) @@ -146,25 +149,31 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Cluster {} initialized".format(cluster_id)) - return cluster_uuid, n2vc_installed_sw + return cluster_id, n2vc_installed_sw async def repo_add( - self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" + self, + cluster_uuid: str, + name: str, + url: str, + repo_type: str = "chart", + cert: str = None, + user: str = None, + password: str = None, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "Cluster {}, adding {} repository {}. URL: {}".format( - cluster_id, repo_type, name, url + cluster_uuid, repo_type, name, url ) ) # init_env paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # helm repo update command = "env KUBECONFIG={} {} repo update".format( @@ -176,16 +185,32 @@ class K8sHelmBaseConnector(K8sConnector): ) # helm repo add name url - command = "env KUBECONFIG={} {} repo add {} {}".format( + command = ("env KUBECONFIG={} {} repo add {} {}").format( paths["kube_config"], self._helm_command, name, url ) + + if cert: + temp_cert_file = os.path.join( + self.fs.path, "{}/helmcerts/".format(cluster_id), "temp.crt" + ) + os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True) + with open(temp_cert_file, "w") as the_cert: + the_cert.write(cert) + command += " --ca-file {}".format(temp_cert_file) + + if user: + command += " --username={}".format(user) + + if password: + command += " --password={}".format(password) + self.log.debug("adding repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=True, env=env ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def repo_list(self, cluster_uuid: str) -> list: """ @@ -194,16 +219,15 @@ class K8sHelmBaseConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list repositories for cluster {}".format(cluster_id)) + self.log.debug("list repositories for cluster {}".format(cluster_uuid)) # config filename paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = "env KUBECONFIG={} {} repo list --output yaml".format( paths["kube_config"], self._helm_command @@ -215,7 +239,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) if _rc == 0: if output and len(output) > 0: @@ -228,17 +252,17 @@ class K8sHelmBaseConnector(K8sConnector): return [] async def repo_remove(self, cluster_uuid: str, name: str): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id)) + self.log.debug( + "remove {} repositories for cluster {}".format(name, cluster_uuid) + ) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = "env KUBECONFIG={} {} repo remove {}".format( paths["kube_config"], self._helm_command, name @@ -248,7 +272,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def reset( self, @@ -267,15 +291,15 @@ class K8sHelmBaseConnector(K8sConnector): :param kwargs: Additional parameters (None yet) :return: Returns True if successful or raises an exception. """ - namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + namespace = self._get_namespace(cluster_uuid=cluster_uuid) self.log.debug( "Resetting K8s environment. cluster uuid: {} uninstall={}".format( - cluster_id, uninstall_sw + cluster_uuid, uninstall_sw ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # uninstall releases if needed. if uninstall_sw: @@ -304,20 +328,20 @@ class K8sHelmBaseConnector(K8sConnector): else: msg = ( "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" - ).format(cluster_id) + ).format(cluster_uuid) self.log.warn(msg) uninstall_sw = ( False # Allow to remove k8s cluster without removing Tiller ) if uninstall_sw: - await self._uninstall_sw(cluster_id, namespace) + await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace) # delete cluster directory - self.log.debug("Removing directory {}".format(cluster_id)) - self.fs.file_delete(cluster_id, ignore_non_exist=True) + self.log.debug("Removing directory {}".format(cluster_uuid)) + self.fs.file_delete(cluster_uuid, ignore_non_exist=True) # Remove also local directorio if still exist - direct = self.fs.path + "/" + cluster_id + direct = self.fs.path + "/" + cluster_uuid shutil.rmtree(direct, ignore_errors=True) return True @@ -347,12 +371,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # version - version = None - if ":" in kdu_model: - parts = kdu_model.split(sep=":") - if len(parts) == 2: - version = str(parts[1]) - kdu_model = parts[0] + kdu_model, version = self._split_version(kdu_model) command = self._get_install_command( kdu_model, @@ -431,11 +450,10 @@ class K8sHelmBaseConnector(K8sConnector): params: dict = None, db_dict: dict = None, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id)) + self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -444,24 +462,19 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # params to str params_str, file_to_delete = self._params_to_file_option( - cluster_id=cluster_id, params=params + cluster_id=cluster_uuid, params=params ) # version - version = None - if ":" in kdu_model: - parts = kdu_model.split(sep=":") - if len(parts) == 2: - version = str(parts[1]) - kdu_model = parts[0] + kdu_model, version = self._split_version(kdu_model) command = self._get_upgrade_command( kdu_model, @@ -487,7 +500,7 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -515,7 +528,7 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -530,7 +543,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -549,31 +562,198 @@ class K8sHelmBaseConnector(K8sConnector): scale: int, resource_name: str, total_timeout: float = 1800, + cluster_uuid: str = None, + kdu_model: str = None, + atomic: bool = True, + db_dict: dict = None, **kwargs, ): - raise NotImplementedError("Method not implemented") + """Scale a resource in a Helm Chart. + + Args: + kdu_instance: KDU instance name + scale: Scale to which to set the resource + resource_name: Resource name + total_timeout: The time, in seconds, to wait + cluster_uuid: The UUID of the cluster + kdu_model: The chart reference + atomic: if set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + db_dict: Dictionary for any additional data + kwargs: Additional parameters + + Returns: + True if successful, False otherwise + """ + + debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid) + if resource_name: + debug_mgs = "scaling resource {} in model {} (cluster {})".format( + resource_name, kdu_model, cluster_uuid + ) + + self.log.debug(debug_mgs) + + # look for instance to obtain namespace + # get_instance_info function calls the sync command + instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) + if not instance_info: + raise K8sException("kdu_instance {} not found".format(kdu_instance)) + + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + # version + kdu_model, version = self._split_version(kdu_model) + + repo_url = await self._find_repo(kdu_model, cluster_uuid) + if not repo_url: + raise K8sException( + "Repository not found for kdu_model {}".format(kdu_model) + ) + + _, replica_str = await self._get_replica_count_url( + kdu_model, repo_url, resource_name + ) + + command = self._get_upgrade_scale_command( + kdu_model, + kdu_instance, + instance_info["namespace"], + scale, + version, + atomic, + replica_str, + total_timeout, + resource_name, + paths["kube_config"], + ) + + self.log.debug("scaling: {}".format(command)) + + if atomic: + # exec helm in a task + exec_task = asyncio.ensure_future( + coro_or_future=self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + ) + # write status in another task + status_task = asyncio.ensure_future( + coro_or_future=self._store_status( + cluster_id=cluster_uuid, + kdu_instance=kdu_instance, + namespace=instance_info["namespace"], + db_dict=db_dict, + operation="scale", + run_once=False, + ) + ) + + # wait for execution task + await asyncio.wait([exec_task]) + + # cancel status task + status_task.cancel() + output, rc = exec_task.result() + + else: + output, rc = await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + + # write final status + await self._store_status( + cluster_id=cluster_uuid, + kdu_instance=kdu_instance, + namespace=instance_info["namespace"], + db_dict=db_dict, + operation="scale", + run_once=True, + check_every=0, + ) + + if rc != 0: + msg = "Error executing command: {}\nOutput: {}".format(command, output) + self.log.error(msg) + raise K8sException(msg) + + # sync fs + self.fs.reverse_sync(from_path=cluster_uuid) + + return True async def get_scale_count( self, resource_name: str, kdu_instance: str, + cluster_uuid: str, + kdu_model: str, **kwargs, - ): - raise NotImplementedError("Method not implemented") + ) -> int: + """Get a resource scale count. + + Args: + cluster_uuid: The UUID of the cluster + resource_name: Resource name + kdu_instance: KDU instance name + kdu_model: The name or path of a bundle + kwargs: Additional parameters + + Returns: + Resource instance count + """ + + self.log.debug( + "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid) + ) + + # look for instance to obtain namespace + instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) + if not instance_info: + raise K8sException("kdu_instance {} not found".format(kdu_instance)) + + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + replicas = await self._get_replica_count_instance( + kdu_instance, instance_info["namespace"], paths["kube_config"] + ) + + # Get default value if scale count is not found from provided values + if not replicas: + repo_url = await self._find_repo(kdu_model, cluster_uuid) + if not repo_url: + raise K8sException( + "Repository not found for kdu_model {}".format(kdu_model) + ) + + replicas, _ = await self._get_replica_count_url( + kdu_model, repo_url, resource_name + ) + + if not replicas: + msg = "Replica count not found. Cannot be scaled" + self.log.error(msg) + raise K8sException(msg) + + return int(replicas) async def rollback( self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "rollback kdu_instance {} to revision {} from cluster {}".format( - kdu_instance, revision, cluster_id + kdu_instance, revision, cluster_uuid ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -582,11 +762,11 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = self._get_rollback_command( kdu_instance, instance_info["namespace"], revision, paths["kube_config"] @@ -603,7 +783,7 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -622,7 +802,7 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -637,7 +817,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -662,13 +842,14 @@ class K8sHelmBaseConnector(K8sConnector): :return: True if successful """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id) + "uninstall kdu_instance {} from cluster {}".format( + kdu_instance, cluster_uuid + ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -677,11 +858,11 @@ class K8sHelmBaseConnector(K8sConnector): return True # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = self._get_uninstall_command( kdu_instance, instance_info["namespace"], paths["kube_config"] @@ -691,7 +872,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return self._output_to_table(output) @@ -703,17 +884,16 @@ class K8sHelmBaseConnector(K8sConnector): :return: """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list releases for cluster {}".format(cluster_id)) + self.log.debug("list releases for cluster {}".format(cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # execute internal command - result = await self._instances_list(cluster_id) + result = await self._instances_list(cluster_uuid) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return result @@ -771,7 +951,6 @@ class K8sHelmBaseConnector(K8sConnector): - `external_ip` List of external ips (in case they are available) """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "get_services: cluster_uuid: {}, kdu_instance: {}".format( cluster_uuid, kdu_instance @@ -780,24 +959,24 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get list of services names for kdu service_names = await self._get_services( - cluster_id, kdu_instance, namespace, paths["kube_config"] + cluster_uuid, kdu_instance, namespace, paths["kube_config"] ) service_list = [] for service in service_names: - service = await self._get_service(cluster_id, service, namespace) + service = await self._get_service(cluster_uuid, service, namespace) service_list.append(service) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service_list @@ -811,19 +990,19 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) - service = await self._get_service(cluster_id, service_name, namespace) + service = await self._get_service(cluster_uuid, service_name, namespace) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service - async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str: + async def status_kdu( + self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs + ) -> Union[str, dict]: """ This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve the _composition_ (i.e. K8s objects) and _specific @@ -833,6 +1012,8 @@ class K8sHelmBaseConnector(K8sConnector): :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance :param kwargs: Additional parameters (None yet) + :param yaml_format: if the return shall be returned as an YAML string or as a + dictionary :return: If successful, it will return the following vector of arguments: - K8s `namespace` in the cluster where the KDU lives - `state` of the KDU instance. It can be: @@ -853,13 +1034,11 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get instance: needed to obtain namespace - instances = await self._instances_list(cluster_id=cluster_id) + instances = await self._instances_list(cluster_id=cluster_uuid) for instance in instances: if instance.get("name") == kdu_instance: break @@ -867,23 +1046,36 @@ class K8sHelmBaseConnector(K8sConnector): # instance does not exist raise K8sException( "Instance name: {} not found in cluster: {}".format( - kdu_instance, cluster_id + kdu_instance, cluster_uuid ) ) status = await self._status_kdu( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance["namespace"], + yaml_format=yaml_format, show_error_log=True, - return_text=True, ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return status + async def get_values_kdu( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: + + self.log.debug("get kdu_instance values {}".format(kdu_instance)) + + return await self._exec_get_command( + get_command="values", + kdu_instance=kdu_instance, + namespace=namespace, + kubeconfig=kubeconfig, + ) + async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: self.log.debug( @@ -892,7 +1084,7 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - return await self._exec_inspect_comand( + return await self._exec_inspect_command( inspect_command="values", kdu_model=kdu_model, repo_url=repo_url ) @@ -902,7 +1094,7 @@ class K8sHelmBaseConnector(K8sConnector): "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) ) - return await self._exec_inspect_comand( + return await self._exec_inspect_command( inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url ) @@ -938,9 +1130,19 @@ class K8sHelmBaseConnector(K8sConnector): # add repo self.log.debug("add repo {}".format(db_repo["name"])) - await self.repo_add( - cluster_uuid, db_repo["name"], db_repo["url"] - ) + if "ca_cert" in db_repo: + await self.repo_add( + cluster_uuid, + db_repo["name"], + db_repo["url"], + cert=db_repo["ca_cert"], + ) + else: + await self.repo_add( + cluster_uuid, + db_repo["name"], + db_repo["url"], + ) added_repo_dict[repo_id] = db_repo["name"] except Exception as e: raise K8sException( @@ -1020,9 +1222,9 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: """ Implements the helm version dependent method to obtain status of a helm instance """ @@ -1043,6 +1245,22 @@ class K8sHelmBaseConnector(K8sConnector): Obtain command to be executed to delete the indicated instance """ + @abc.abstractmethod + def _get_upgrade_scale_command( + self, + kdu_model, + kdu_instance, + namespace, + count, + version, + atomic, + replicas, + timeout, + resource_name, + kubeconfig, + ) -> str: + """Obtain command to be executed to upgrade the indicated instance.""" + @abc.abstractmethod def _get_upgrade_command( self, @@ -1083,6 +1301,12 @@ class K8sHelmBaseConnector(K8sConnector): Obtain command to be executed to obtain information about the kdu """ + @abc.abstractmethod + def _get_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + """Obtain command to be executed to get information about the kdu instance.""" + @abc.abstractmethod async def _uninstall_sw(self, cluster_id: str, namespace: str): """ @@ -1386,12 +1610,23 @@ class K8sHelmBaseConnector(K8sConnector): return service - async def _exec_inspect_comand( + async def _exec_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + """Obtains information about the kdu instance.""" + + full_command = self._get_get_command( + get_command, kdu_instance, namespace, kubeconfig + ) + + output, _rc = await self._local_async_exec(command=full_command) + + return output + + async def _exec_inspect_command( self, inspect_command: str, kdu_model: str, repo_url: str = None ): - """ - Obtains information about a kdu, no cluster (no env) - """ + """Obtains information about a kdu, no cluster (no env).""" repo_str = "" if repo_url: @@ -1402,22 +1637,141 @@ class K8sHelmBaseConnector(K8sConnector): idx += 1 kdu_model = kdu_model[idx:] - version = "" - if ":" in kdu_model: - parts = kdu_model.split(sep=":") - if len(parts) == 2: - version = "--version {}".format(str(parts[1])) - kdu_model = parts[0] + kdu_model, version = self._split_version(kdu_model) + if version: + version_str = "--version {}".format(version) + else: + version_str = "" full_command = self._get_inspect_command( - inspect_command, kdu_model, repo_str, version - ) - output, _rc = await self._local_async_exec( - command=full_command, encode_utf8=True + inspect_command, kdu_model, repo_str, version_str ) + output, _rc = await self._local_async_exec(command=full_command) + return output + async def _get_replica_count_url( + self, + kdu_model: str, + repo_url: str, + resource_name: str = None, + ): + """Get the replica count value in the Helm Chart Values. + + Args: + kdu_model: The name or path of a bundle + repo_url: Helm Chart repository url + resource_name: Resource name + + Returns: + True if replicas, False replicaCount + """ + + kdu_values = yaml.load( + await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader + ) + + if not kdu_values: + raise K8sException( + "kdu_values not found for kdu_model {}".format(kdu_model) + ) + + if resource_name: + kdu_values = kdu_values.get(resource_name, None) + + if not kdu_values: + msg = "resource {} not found in the values in model {}".format( + resource_name, kdu_model + ) + self.log.error(msg) + raise K8sException(msg) + + duplicate_check = False + + replica_str = "" + replicas = None + + if kdu_values.get("replicaCount", None): + replicas = kdu_values["replicaCount"] + replica_str = "replicaCount" + elif kdu_values.get("replicas", None): + duplicate_check = True + replicas = kdu_values["replicas"] + replica_str = "replicas" + else: + if resource_name: + msg = ( + "replicaCount or replicas not found in the resource" + "{} values in model {}. Cannot be scaled".format( + resource_name, kdu_model + ) + ) + else: + msg = ( + "replicaCount or replicas not found in the values" + "in model {}. Cannot be scaled".format(kdu_model) + ) + self.log.error(msg) + raise K8sException(msg) + + # Control if replicas and replicaCount exists at the same time + msg = "replicaCount and replicas are exists at the same time" + if duplicate_check: + if "replicaCount" in kdu_values: + self.log.error(msg) + raise K8sException(msg) + else: + if "replicas" in kdu_values: + self.log.error(msg) + raise K8sException(msg) + + return replicas, replica_str + + async def _get_replica_count_instance( + self, + kdu_instance: str, + namespace: str, + kubeconfig: str, + resource_name: str = None, + ): + """Get the replica count value in the instance. + + Args: + kdu_instance: The name of the KDU instance + namespace: KDU instance namespace + kubeconfig: + resource_name: Resource name + + Returns: + True if replicas, False replicaCount + """ + + kdu_values = yaml.load( + await self.get_values_kdu(kdu_instance, namespace, kubeconfig), + Loader=yaml.SafeLoader, + ) + + replicas = None + + if kdu_values: + resource_values = ( + kdu_values.get(resource_name, None) if resource_name else None + ) + replicas = ( + ( + resource_values.get("replicaCount", None) + or resource_values.get("replicas", None) + ) + if resource_values + else ( + kdu_values.get("replicaCount", None) + or kdu_values.get("replicas", None) + ) + ) + + return replicas + async def _store_status( self, cluster_id: str, @@ -1434,8 +1788,8 @@ class K8sHelmBaseConnector(K8sConnector): detailed_status = await self._status_kdu( cluster_id=cluster_id, kdu_instance=kdu_instance, + yaml_format=False, namespace=namespace, - return_text=False, ) status = detailed_status.get("info").get("description") self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) @@ -1542,3 +1896,23 @@ class K8sHelmBaseConnector(K8sConnector): name = name + get_random_number() return name.lower() + + def _split_version(self, kdu_model: str) -> (str, str): + version = None + if ":" in kdu_model: + parts = kdu_model.split(sep=":") + if len(parts) == 2: + version = str(parts[1]) + kdu_model = parts[0] + return kdu_model, version + + async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: + repo_url = None + idx = kdu_model.find("/") + if idx >= 0: + repo_name = kdu_model[:idx] + # Find repository link + local_repo_list = await self.repo_list(cluster_uuid) + for repo in local_repo_list: + repo_url = repo["url"] if repo["name"] == repo_name else None + return repo_url