X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=149b064ba5ed57fb5fbccac5ae7705eba074d07f;hp=8c52a7b1b021508243d34b49b400eeee8be1bd20;hb=60a3a96717d7c36ba7a65573da59a6bc039f5e28;hpb=13cbdb782ea303bbc38884a732c32c0e1920fc2b diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 8c52a7b..149b064 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -90,18 +90,14 @@ class K8sHelmBaseConnector(K8sConnector): if self._stable_repo_url == "None": self._stable_repo_url = None - def _get_namespace(self, cluster_uuid: str) -> str: + @staticmethod + def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): """ - Obtains the namespace used by the cluster with the uuid passed by argument - - param: cluster_uuid: cluster's uuid + Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only + cluster_id for backward compatibility """ - - # first, obtain the cluster corresponding to the uuid passed by argument - k8scluster = self.db.get_one( - "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False - ) - return k8scluster.get("namespace") + namespace, _, cluster_id = cluster_uuid.rpartition(":") + return namespace, cluster_id async def init_env( self, @@ -125,9 +121,11 @@ class K8sHelmBaseConnector(K8sConnector): """ if reuse_cluster_uuid: - cluster_id = reuse_cluster_uuid + namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid) + namespace = namespace_ or namespace else: cluster_id = str(uuid4()) + cluster_uuid = "{}:{}".format(namespace, cluster_id) self.log.debug( "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) @@ -149,24 +147,25 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Cluster {} initialized".format(cluster_id)) - return cluster_id, n2vc_installed_sw + return cluster_uuid, n2vc_installed_sw async def repo_add( self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" ): + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "Cluster {}, adding {} repository {}. URL: {}".format( - cluster_uuid, repo_type, name, url + cluster_id, repo_type, name, url ) ) # init_env paths, env = self._init_paths_env( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # helm repo add name url command = "env KUBECONFIG={} {} repo add {} {}".format( @@ -178,16 +177,39 @@ class K8sHelmBaseConnector(K8sConnector): ) # helm repo update - command = "env KUBECONFIG={} {} repo update".format( - paths["kube_config"], self._helm_command + command = "env KUBECONFIG={} {} repo update {}".format( + paths["kube_config"], self._helm_command, name + ) + self.log.debug("updating repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env ) + + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + + async def repo_update(self, cluster_id: str, name: str, repo_type: str = "chart"): + self.log.debug( + "Cluster {}, updating {} repository {}".format(cluster_id, repo_type, name) + ) + + # init_env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + + # sync local dir + self.fs.sync(from_path=cluster_id) + + # helm repo update + command = "{} repo update {}".format(self._helm_command, name) self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) async def repo_list(self, cluster_uuid: str) -> list: """ @@ -196,15 +218,16 @@ class K8sHelmBaseConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - self.log.debug("list repositories for cluster {}".format(cluster_uuid)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("list repositories for cluster {}".format(cluster_id)) # config filename paths, env = self._init_paths_env( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) command = "env KUBECONFIG={} {} repo list --output yaml".format( paths["kube_config"], self._helm_command @@ -216,7 +239,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) if _rc == 0: if output and len(output) > 0: @@ -229,17 +252,16 @@ class K8sHelmBaseConnector(K8sConnector): return [] async def repo_remove(self, cluster_uuid: str, name: str): - self.log.debug( - "remove {} repositories for cluster {}".format(name, cluster_uuid) - ) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id)) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) command = "env KUBECONFIG={} {} repo remove {}".format( paths["kube_config"], self._helm_command, name @@ -249,7 +271,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) async def reset( self, @@ -268,15 +290,15 @@ class K8sHelmBaseConnector(K8sConnector): :param kwargs: Additional parameters (None yet) :return: Returns True if successful or raises an exception. """ - namespace = self._get_namespace(cluster_uuid=cluster_uuid) + namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "Resetting K8s environment. cluster uuid: {} uninstall={}".format( - cluster_uuid, uninstall_sw + cluster_id, uninstall_sw ) ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # uninstall releases if needed. if uninstall_sw: @@ -305,20 +327,20 @@ class K8sHelmBaseConnector(K8sConnector): else: msg = ( "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" - ).format(cluster_uuid) + ).format(cluster_id) self.log.warn(msg) uninstall_sw = ( False # Allow to remove k8s cluster without removing Tiller ) if uninstall_sw: - await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace) + await self._uninstall_sw(cluster_id, namespace) # delete cluster directory - self.log.debug("Removing directory {}".format(cluster_uuid)) - self.fs.file_delete(cluster_uuid, ignore_non_exist=True) + self.log.debug("Removing directory {}".format(cluster_id)) + self.fs.file_delete(cluster_id, ignore_non_exist=True) # Remove also local directorio if still exist - direct = self.fs.path + "/" + cluster_uuid + direct = self.fs.path + "/" + cluster_id shutil.rmtree(direct, ignore_errors=True) return True @@ -355,6 +377,10 @@ class K8sHelmBaseConnector(K8sConnector): version = str(parts[1]) kdu_model = parts[0] + repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_id, repo) + command = self._get_install_command( kdu_model, kdu_instance, @@ -384,7 +410,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=False, ) ) @@ -397,7 +422,6 @@ class K8sHelmBaseConnector(K8sConnector): output, rc = exec_task.result() else: - output, rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -413,8 +437,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=True, - check_every=0, ) if rc != 0: @@ -432,10 +454,11 @@ class K8sHelmBaseConnector(K8sConnector): params: dict = None, db_dict: dict = None, ): - self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id)) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -444,15 +467,15 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # params to str params_str, file_to_delete = self._params_to_file_option( - cluster_id=cluster_uuid, params=params + cluster_id=cluster_id, params=params ) # version @@ -463,6 +486,10 @@ class K8sHelmBaseConnector(K8sConnector): version = str(parts[1]) kdu_model = parts[0] + repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_id, repo) + command = self._get_upgrade_command( kdu_model, kdu_instance, @@ -477,7 +504,6 @@ class K8sHelmBaseConnector(K8sConnector): self.log.debug("upgrading: {}".format(command)) if atomic: - # exec helm in a task exec_task = asyncio.ensure_future( coro_or_future=self._local_async_exec( @@ -487,12 +513,11 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="upgrade", - run_once=False, ) ) @@ -504,7 +529,6 @@ class K8sHelmBaseConnector(K8sConnector): output, rc = exec_task.result() else: - output, rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -515,13 +539,11 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="upgrade", - run_once=True, - check_every=0, ) if rc != 0: @@ -530,7 +552,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) # return new revision number instance = await self.get_instance_info( @@ -564,14 +586,15 @@ class K8sHelmBaseConnector(K8sConnector): async def rollback( self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "rollback kdu_instance {} to revision {} from cluster {}".format( - kdu_instance, revision, cluster_uuid + kdu_instance, revision, cluster_id ) ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -580,11 +603,11 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) command = self._get_rollback_command( kdu_instance, instance_info["namespace"], revision, paths["kube_config"] @@ -601,12 +624,11 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=False, ) ) @@ -620,13 +642,11 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=True, - check_every=0, ) if rc != 0: @@ -635,7 +655,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) # return new revision number instance = await self.get_instance_info( @@ -660,14 +680,13 @@ class K8sHelmBaseConnector(K8sConnector): :return: True if successful """ + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "uninstall kdu_instance {} from cluster {}".format( - kdu_instance, cluster_uuid - ) + "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id) ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -676,11 +695,11 @@ class K8sHelmBaseConnector(K8sConnector): return True # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) command = self._get_uninstall_command( kdu_instance, instance_info["namespace"], paths["kube_config"] @@ -690,7 +709,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) return self._output_to_table(output) @@ -702,16 +721,17 @@ class K8sHelmBaseConnector(K8sConnector): :return: """ - self.log.debug("list releases for cluster {}".format(cluster_uuid)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("list releases for cluster {}".format(cluster_id)) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # execute internal command - result = await self._instances_list(cluster_uuid) + result = await self._instances_list(cluster_id) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) return result @@ -769,6 +789,7 @@ class K8sHelmBaseConnector(K8sConnector): - `external_ip` List of external ips (in case they are available) """ + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "get_services: cluster_uuid: {}, kdu_instance: {}".format( cluster_uuid, kdu_instance @@ -777,44 +798,45 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # get list of services names for kdu service_names = await self._get_services( - cluster_uuid, kdu_instance, namespace, paths["kube_config"] + cluster_id, kdu_instance, namespace, paths["kube_config"] ) service_list = [] for service in service_names: - service = await self._get_service(cluster_uuid, service, namespace) + service = await self._get_service(cluster_id, service, namespace) service_list.append(service) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) return service_list async def get_service( self, cluster_uuid: str, service_name: str, namespace: str ) -> object: - self.log.debug( "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( service_name, namespace, cluster_uuid ) ) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) - service = await self._get_service(cluster_uuid, service_name, namespace) + service = await self._get_service(cluster_id, service_name, namespace) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) return service @@ -852,11 +874,13 @@ class K8sHelmBaseConnector(K8sConnector): ) ) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + # sync local dir - self.fs.sync(from_path=cluster_uuid) + self.fs.sync(from_path=cluster_id) # get instance: needed to obtain namespace - instances = await self._instances_list(cluster_id=cluster_uuid) + instances = await self._instances_list(cluster_id=cluster_id) for instance in instances: if instance.get("name") == kdu_instance: break @@ -864,12 +888,12 @@ class K8sHelmBaseConnector(K8sConnector): # instance does not exist raise K8sException( "Instance name: {} not found in cluster: {}".format( - kdu_instance, cluster_uuid + kdu_instance, cluster_id ) ) status = await self._status_kdu( - cluster_id=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=instance["namespace"], yaml_format=yaml_format, @@ -877,12 +901,11 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_uuid) + self.fs.reverse_sync(from_path=cluster_id) return status async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug( "inspect kdu_model values {} from (optional) repo: {}".format( kdu_model, repo_url @@ -894,7 +917,6 @@ class K8sHelmBaseConnector(K8sConnector): ) async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug( "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) ) @@ -904,7 +926,6 @@ class K8sHelmBaseConnector(K8sConnector): ) async def synchronize_repos(self, cluster_uuid: str): - self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid)) try: db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid) @@ -1205,7 +1226,6 @@ class K8sHelmBaseConnector(K8sConnector): encode_utf8: bool = False, env: dict = None, ) -> (str, int): - command = K8sHelmBaseConnector._remove_multiple_spaces(command) self.log.debug( "Executing async local command: {}, env: {}".format(command, env) @@ -1276,7 +1296,6 @@ class K8sHelmBaseConnector(K8sConnector): encode_utf8: bool = False, env: dict = None, ): - command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) command = "{} | {}".format(command1, command2) @@ -1421,47 +1440,55 @@ class K8sHelmBaseConnector(K8sConnector): operation: str, kdu_instance: str, namespace: str = None, - check_every: float = 10, db_dict: dict = None, - run_once: bool = False, - ): - while True: - try: - await asyncio.sleep(check_every) - detailed_status = await self._status_kdu( - cluster_id=cluster_id, - kdu_instance=kdu_instance, - yaml_format=False, - namespace=namespace, - ) - status = detailed_status.get("info").get("description") - self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) - # write status to db - result = await self.write_app_status_to_db( - db_dict=db_dict, - status=str(status), - detailed_status=str(detailed_status), - operation=operation, - ) - if not result: - self.log.info("Error writing in database. Task exiting...") - return - except asyncio.CancelledError: - self.log.debug("Task cancelled") - return - except Exception as e: - self.log.debug( - "_store_status exception: {}".format(str(e)), exc_info=True - ) - pass - finally: - if run_once: - return + ) -> None: + """ + Obtains the status of the KDU instance based on Helm Charts, and stores it in the database. + + :param cluster_id (str): the cluster where the KDU instance is deployed + :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade") + :param kdu_instance (str): The KDU instance in relation to which the status is obtained + :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None + :param db_dict (dict): A dictionary with the database necessary information. It shall contain the + values for the keys: + - "collection": The Mongo DB collection to write to + - "filter": The query filter to use in the update process + - "path": The dot separated keys which targets the object to be updated + Defaults to None. + """ + + try: + detailed_status = await self._status_kdu( + cluster_id=cluster_id, + kdu_instance=kdu_instance, + yaml_format=False, + namespace=namespace, + ) + + status = detailed_status.get("info").get("description") + self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.") + + # write status to db + result = await self.write_app_status_to_db( + db_dict=db_dict, + status=str(status), + detailed_status=str(detailed_status), + operation=operation, + ) + + if not result: + self.log.info("Error writing in database. Task exiting...") + + except asyncio.CancelledError as e: + self.log.warning( + f"Exception in method {self._store_status.__name__} (task cancelled): {e}" + ) + except Exception as e: + self.log.warning(f"Exception in method {self._store_status.__name__}: {e}") # params for use in -f file # returns values file option and filename (in order to delete it at the end) def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str): - if params and len(params) > 0: self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True) @@ -1539,3 +1566,10 @@ class K8sHelmBaseConnector(K8sConnector): name = name + get_random_number() return name.lower() + + def _split_repo(self, kdu_model: str) -> str: + repo_name = None + idx = kdu_model.find("/") + if idx >= 0: + repo_name = kdu_model[:idx] + return repo_name