X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=65cbdac525b7813f09e701f7107aa7bd42015eea;hp=2e7442372aeb62c1d723c8940d7b5fddbbffce02;hb=e308c71201537413caf28f8814051470f0418504;hpb=a5728bfa2a58437329105d900822274a73cd9356 diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 2e74423..65cbdac 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -195,6 +195,9 @@ class K8sHelmConnector(K8sConnector): else: self.log.info("Helm client already initialized") + # sync fs with local data + self.fs.reverse_sync(from_path=cluster_id) + self.log.info("Cluster {} initialized".format(cluster_id)) return cluster_uuid, n2vc_installed_sw @@ -211,6 +214,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + # helm repo update command = "{} --kubeconfig={} --home={} repo update".format( self._helm_command, config_filename, helm_dir @@ -225,6 +231,9 @@ class K8sHelmConnector(K8sConnector): self.log.debug("adding repo: {}".format(command)) await self._local_async_exec(command=command, raise_exception_on_error=True) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + async def repo_list(self, cluster_uuid: str) -> list: """ Get the list of registered repositories @@ -240,6 +249,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --home={} repo list --output yaml".format( self._helm_command, config_filename, helm_dir ) @@ -252,6 +264,9 @@ class K8sHelmConnector(K8sConnector): else: return [] + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + async def repo_remove(self, cluster_uuid: str, name: str): """ Remove a repository from OSM @@ -269,51 +284,59 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --home={} repo remove {}".format( self._helm_command, config_filename, helm_dir, name ) await self._local_async_exec(command=command, raise_exception_on_error=True) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + async def reset( self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False ) -> bool: namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug( - "Resetting K8s environment. cluster uuid: {}".format(cluster_id) - ) + self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}" + .format(cluster_id, uninstall_sw)) # get kube and helm directories _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( cluster_name=cluster_id, create_if_not_exist=False ) - # uninstall releases if needed - releases = await self.instances_list(cluster_uuid=cluster_uuid) - if len(releases) > 0: - if force: - for r in releases: - try: - kdu_instance = r.get("Name") - chart = r.get("Chart") - self.log.debug( - "Uninstalling {} -> {}".format(chart, kdu_instance) - ) - await self.uninstall( - cluster_uuid=cluster_uuid, kdu_instance=kdu_instance - ) - except Exception as e: - self.log.error( - "Error uninstalling release {}: {}".format(kdu_instance, e) - ) - else: - msg = ( - "Cluster has releases and not force. Cannot reset K8s " - "environment. Cluster uuid: {}" - ).format(cluster_id) - self.log.error(msg) - raise K8sException(msg) + # sync local dir + self.fs.sync(from_path=cluster_id) + + # uninstall releases if needed. + if uninstall_sw: + releases = await self.instances_list(cluster_uuid=cluster_uuid) + if len(releases) > 0: + if force: + for r in releases: + try: + kdu_instance = r.get("Name") + chart = r.get("Chart") + self.log.debug( + "Uninstalling {} -> {}".format(chart, kdu_instance) + ) + await self.uninstall( + cluster_uuid=cluster_uuid, kdu_instance=kdu_instance + ) + except Exception as e: + self.log.error( + "Error uninstalling release {}: {}".format(kdu_instance, e) + ) + else: + msg = ( + "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" + ).format(cluster_id) + self.log.warn(msg) + uninstall_sw = False # Allow to remove k8s cluster without removing Tiller if uninstall_sw: @@ -370,8 +393,10 @@ class K8sHelmConnector(K8sConnector): self.log.debug("namespace not found") # delete cluster directory + self.log.debug("Removing directory {}".format(cluster_id)) + self.fs.file_delete(cluster_id, ignore_non_exist=True) + # Remove also local directorio if still exist direct = self.fs.path + "/" + cluster_id - self.log.debug("Removing directory {}".format(direct)) shutil.rmtree(direct, ignore_errors=True) return True @@ -396,6 +421,10 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.log.debug("sync cluster_id: {}".format(_cluster_dir)) + self.fs.sync(from_path=cluster_id) + # params to str # params_str = K8sHelmConnector._params_to_set_option(params) params_str, file_to_delete = self._params_to_file_option( @@ -509,6 +538,9 @@ class K8sHelmConnector(K8sConnector): self.log.error(msg) raise K8sException(msg) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + self.log.debug("Returning kdu_instance {}".format(kdu_instance)) return kdu_instance @@ -528,6 +560,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --home={} list --output yaml".format( self._helm_command, config_filename, helm_dir ) @@ -560,6 +595,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + # params to str # params_str = K8sHelmConnector._params_to_set_option(params) params_str, file_to_delete = self._params_to_file_option( @@ -650,6 +688,9 @@ class K8sHelmConnector(K8sConnector): self.log.error(msg) raise K8sException(msg) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + # return new revision number instance = await self.get_instance_info( cluster_uuid=cluster_uuid, kdu_instance=kdu_instance @@ -677,6 +718,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format( self._helm_command, config_filename, helm_dir, kdu_instance, revision ) @@ -721,6 +765,9 @@ class K8sHelmConnector(K8sConnector): self.log.error(msg) raise K8sException(msg) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + # return new revision number instance = await self.get_instance_info( cluster_uuid=cluster_uuid, kdu_instance=kdu_instance @@ -755,6 +802,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --home={} delete --purge {}".format( self._helm_command, config_filename, helm_dir, kdu_instance ) @@ -763,6 +813,9 @@ class K8sHelmConnector(K8sConnector): command=command, raise_exception_on_error=True ) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + return self._output_to_table(output) async def exec_primitive( @@ -833,6 +886,81 @@ class K8sHelmConnector(K8sConnector): return_text=True, ) + async def get_services(self, + cluster_uuid: str, + kdu_instance: str, + namespace: str) -> list: + + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug( + "get_services: cluster_uuid: {}, kdu_instance: {}".format( + cluster_uuid, kdu_instance + ) + ) + + # sync local dir + self.fs.sync(from_path=cluster_id) + + status = await self._status_kdu( + cluster_id, kdu_instance, return_text=False + ) + + service_names = self._parse_helm_status_service_info(status) + service_list = [] + for service in service_names: + service = await self.get_service(cluster_uuid, service, namespace) + service_list.append(service) + + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + + return service_list + + async def get_service(self, + cluster_uuid: str, + service_name: str, + namespace: str) -> object: + + self.log.debug( + "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( + service_name, namespace, cluster_uuid) + ) + + # get paths + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_id, create_if_not_exist=True + ) + + # sync local dir + self.fs.sync(from_path=cluster_id) + + command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format( + self.kubectl_command, config_filename, namespace, service_name + ) + + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) + + data = yaml.load(output, Loader=yaml.SafeLoader) + + service = { + "name": service_name, + "type": self._get_deep(data, ("spec", "type")), + "ports": self._get_deep(data, ("spec", "ports")), + "cluster_ip": self._get_deep(data, ("spec", "clusterIP")) + } + if service["type"] == "LoadBalancer": + ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress")) + ip_list = [elem["ip"] for elem in ip_map_list] + service["external_ip"] = ip_list + + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + + return service + async def synchronize_repos(self, cluster_uuid: str): _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) @@ -1097,7 +1225,7 @@ class K8sHelmConnector(K8sConnector): self.log.debug("Task cancelled") return except Exception as e: - self.log.debug("_store_status exception: {}".format(str(e))) + self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True) pass finally: if run_once: @@ -1152,6 +1280,37 @@ class K8sHelmConnector(K8sConnector): return ready + def _parse_helm_status_service_info(self, status): + + # extract info.status.resources-> str + # format: + # ==> v1/Deployment + # NAME READY UP-TO-DATE AVAILABLE AGE + # halting-horse-mongodb 0/1 1 0 0s + # halting-petit-mongodb 1/1 1 0 0s + # blank line + resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources")) + + service_list = [] + first_line_skipped = service_found = False + for line in resources: + if not service_found: + if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service": + service_found = True + continue + else: + if len(line) >= 2 and line[0] == "==>": + service_found = first_line_skipped = False + continue + if not line: + continue + if not first_line_skipped: + first_line_skipped = True + continue + service_list.append(line[0]) + + return service_list + @staticmethod def _get_deep(dictionary: dict, members: tuple): target = dictionary