X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=703bd737cc5ced06b2021ae00267ca45d1baca79;hp=273b206ddb13eb8027773db228e8f45b1884bc02;hb=b41de17df6282334088ffbd887fbc01e496e1797;hpb=156677967e417604682fda05ed0099f07f1a3129 diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 273b206..703bd73 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -89,14 +89,18 @@ class K8sHelmBaseConnector(K8sConnector): if self._stable_repo_url == "None": self._stable_repo_url = None - @staticmethod - def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): + def _get_namespace(self, cluster_uuid: str) -> str: """ - Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only - cluster_id for backward compatibility + Obtains the namespace used by the cluster with the uuid passed by argument + + param: cluster_uuid: cluster's uuid """ - namespace, _, cluster_id = cluster_uuid.rpartition(":") - return namespace, cluster_id + + # first, obtain the cluster corresponding to the uuid passed by argument + k8scluster = self.db.get_one( + "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False + ) + return k8scluster.get("namespace") async def init_env( self, @@ -120,11 +124,9 @@ class K8sHelmBaseConnector(K8sConnector): """ if reuse_cluster_uuid: - namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid) - namespace = namespace_ or namespace + cluster_id = reuse_cluster_uuid else: cluster_id = str(uuid4()) - cluster_uuid = "{}:{}".format(namespace, cluster_id) self.log.debug( "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) @@ -146,25 +148,24 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Cluster {} initialized".format(cluster_id)) - return cluster_uuid, n2vc_installed_sw + return cluster_id, n2vc_installed_sw async def repo_add( self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "Cluster {}, adding {} repository {}. URL: {}".format( - cluster_id, repo_type, name, url + cluster_uuid, repo_type, name, url ) ) # init_env paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # helm repo update command = "env KUBECONFIG={} {} repo update".format( @@ -185,7 +186,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def repo_list(self, cluster_uuid: str) -> list: """ @@ -194,16 +195,15 @@ class K8sHelmBaseConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list repositories for cluster {}".format(cluster_id)) + self.log.debug("list repositories for cluster {}".format(cluster_uuid)) # config filename paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = "env KUBECONFIG={} {} repo list --output yaml".format( paths["kube_config"], self._helm_command @@ -215,7 +215,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) if _rc == 0: if output and len(output) > 0: @@ -228,17 +228,17 @@ class K8sHelmBaseConnector(K8sConnector): return [] async def repo_remove(self, cluster_uuid: str, name: str): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id)) + self.log.debug( + "remove {} repositories for cluster {}".format(name, cluster_uuid) + ) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = "env KUBECONFIG={} {} repo remove {}".format( paths["kube_config"], self._helm_command, name @@ -248,7 +248,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def reset( self, @@ -267,15 +267,15 @@ class K8sHelmBaseConnector(K8sConnector): :param kwargs: Additional parameters (None yet) :return: Returns True if successful or raises an exception. """ - namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + namespace = self._get_namespace(cluster_uuid=cluster_uuid) self.log.debug( "Resetting K8s environment. cluster uuid: {} uninstall={}".format( - cluster_id, uninstall_sw + cluster_uuid, uninstall_sw ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # uninstall releases if needed. if uninstall_sw: @@ -304,20 +304,20 @@ class K8sHelmBaseConnector(K8sConnector): else: msg = ( "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" - ).format(cluster_id) + ).format(cluster_uuid) self.log.warn(msg) uninstall_sw = ( False # Allow to remove k8s cluster without removing Tiller ) if uninstall_sw: - await self._uninstall_sw(cluster_id, namespace) + await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace) # delete cluster directory - self.log.debug("Removing directory {}".format(cluster_id)) - self.fs.file_delete(cluster_id, ignore_non_exist=True) + self.log.debug("Removing directory {}".format(cluster_uuid)) + self.fs.file_delete(cluster_uuid, ignore_non_exist=True) # Remove also local directorio if still exist - direct = self.fs.path + "/" + cluster_id + direct = self.fs.path + "/" + cluster_uuid shutil.rmtree(direct, ignore_errors=True) return True @@ -426,11 +426,10 @@ class K8sHelmBaseConnector(K8sConnector): params: dict = None, db_dict: dict = None, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id)) + self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -439,15 +438,15 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # params to str params_str, file_to_delete = self._params_to_file_option( - cluster_id=cluster_id, params=params + cluster_id=cluster_uuid, params=params ) # version @@ -477,7 +476,7 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -505,7 +504,7 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -520,7 +519,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -563,12 +562,10 @@ class K8sHelmBaseConnector(K8sConnector): True if successful, False otherwise """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - - debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_id) + debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid) if resource_name: debug_mgs = "scaling resource {} in model {} (cluster {})".format( - resource_name, kdu_model, cluster_id + resource_name, kdu_model, cluster_uuid ) self.log.debug(debug_mgs) @@ -581,7 +578,7 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # version @@ -622,7 +619,7 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -645,7 +642,7 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -660,7 +657,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return True @@ -685,9 +682,8 @@ class K8sHelmBaseConnector(K8sConnector): Resource instance count """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "getting scale count for {} in cluster {}".format(kdu_model, cluster_id) + "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid) ) # look for instance to obtain namespace @@ -697,7 +693,7 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) replicas = await self._get_replica_count_instance( @@ -726,16 +722,14 @@ class K8sHelmBaseConnector(K8sConnector): async def rollback( self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "rollback kdu_instance {} to revision {} from cluster {}".format( - kdu_instance, revision, cluster_id + kdu_instance, revision, cluster_uuid ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -744,11 +738,11 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = self._get_rollback_command( kdu_instance, instance_info["namespace"], revision, paths["kube_config"] @@ -765,7 +759,7 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -784,7 +778,7 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -799,7 +793,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -824,13 +818,14 @@ class K8sHelmBaseConnector(K8sConnector): :return: True if successful """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id) + "uninstall kdu_instance {} from cluster {}".format( + kdu_instance, cluster_uuid + ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -839,11 +834,11 @@ class K8sHelmBaseConnector(K8sConnector): return True # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = self._get_uninstall_command( kdu_instance, instance_info["namespace"], paths["kube_config"] @@ -853,7 +848,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return self._output_to_table(output) @@ -865,17 +860,16 @@ class K8sHelmBaseConnector(K8sConnector): :return: """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list releases for cluster {}".format(cluster_id)) + self.log.debug("list releases for cluster {}".format(cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # execute internal command - result = await self._instances_list(cluster_id) + result = await self._instances_list(cluster_uuid) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return result @@ -933,7 +927,6 @@ class K8sHelmBaseConnector(K8sConnector): - `external_ip` List of external ips (in case they are available) """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "get_services: cluster_uuid: {}, kdu_instance: {}".format( cluster_uuid, kdu_instance @@ -942,24 +935,24 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get list of services names for kdu service_names = await self._get_services( - cluster_id, kdu_instance, namespace, paths["kube_config"] + cluster_uuid, kdu_instance, namespace, paths["kube_config"] ) service_list = [] for service in service_names: - service = await self._get_service(cluster_id, service, namespace) + service = await self._get_service(cluster_uuid, service, namespace) service_list.append(service) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service_list @@ -973,15 +966,13 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) - service = await self._get_service(cluster_id, service_name, namespace) + service = await self._get_service(cluster_uuid, service_name, namespace) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service @@ -1015,13 +1006,11 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get instance: needed to obtain namespace - instances = await self._instances_list(cluster_id=cluster_id) + instances = await self._instances_list(cluster_id=cluster_uuid) for instance in instances: if instance.get("name") == kdu_instance: break @@ -1029,12 +1018,12 @@ class K8sHelmBaseConnector(K8sConnector): # instance does not exist raise K8sException( "Instance name: {} not found in cluster: {}".format( - kdu_instance, cluster_id + kdu_instance, cluster_uuid ) ) status = await self._status_kdu( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance["namespace"], show_error_log=True, @@ -1042,7 +1031,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return status