X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=e3b51bab0176b6cddbb76c8801cbe72cec649b26;hp=fdfc443468ee3ccd820c9fb103c690371adebe32;hb=70c5f3ae0d6811adaddf189833a9299f2572930b;hpb=f52cb7cfeb4e24febe7c66af3d5bb275a50d7f99 diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index fdfc443..e3b51ba 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -40,6 +40,7 @@ class K8sHelmConnector(K8sConnector): ################################### P U B L I C #################################### #################################################################################### """ + service_account = "osm" def __init__( self, @@ -96,6 +97,15 @@ class K8sHelmConnector(K8sConnector): self.log.info("K8S Helm connector initialized") + @staticmethod + def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): + """ + Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only + cluster_id for backward compatibility + """ + namespace, _, cluster_id = cluster_uuid.rpartition(':') + return namespace, cluster_id + async def init_env( self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None ) -> (str, bool): @@ -114,19 +124,21 @@ class K8sHelmConnector(K8sConnector): (on error, an exception will be raised) """ - cluster_uuid = reuse_cluster_uuid - if not cluster_uuid: - cluster_uuid = str(uuid4()) + if reuse_cluster_uuid: + namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid) + namespace = namespace_ or namespace + else: + cluster_id = str(uuid4()) + cluster_uuid = "{}:{}".format(namespace, cluster_id) - self.log.debug("Initializing K8S environment. namespace: {}".format(namespace)) + self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)) # create config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) - f = open(config_filename, "w") - f.write(k8s_creds) - f.close() + with open(config_filename, "w") as f: + f.write(k8s_creds) # check if tiller pod is up in cluster command = "{} --kubeconfig={} --namespace={} get deployments".format( @@ -136,7 +148,7 @@ class K8sHelmConnector(K8sConnector): command=command, raise_exception_on_error=True ) - output_table = K8sHelmConnector._output_to_table(output=output) + output_table = self._output_to_table(output=output) # find 'tiller' pod in all pods already_initialized = False @@ -152,22 +164,27 @@ class K8sHelmConnector(K8sConnector): n2vc_installed_sw = False if not already_initialized: self.log.info( - "Initializing helm in client and server: {}".format(cluster_uuid) - ) - command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format( - self._helm_command, config_filename, namespace, helm_dir - ) - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=True + "Initializing helm in client and server: {}".format(cluster_id) ) + command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format( + self.kubectl_command, config_filename, self.service_account) + _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + + command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule " + "--clusterrole=cluster-admin --serviceaccount=kube-system:{}" + ).format(self.kubectl_command, config_filename, self.service_account) + _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + + command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} " + "init").format(self._helm_command, config_filename, namespace, helm_dir, + self.service_account) + _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True) n2vc_installed_sw = True else: # check client helm installation check_file = helm_dir + "/repository/repositories.yaml" - if not self._check_file_exists( - filename=check_file, exception_if_not_exists=False - ): - self.log.info("Initializing helm in client: {}".format(cluster_uuid)) + if not self._check_file_exists(filename=check_file, exception_if_not_exists=False): + self.log.info("Initializing helm in client: {}".format(cluster_id)) command = ( "{} --kubeconfig={} --tiller-namespace={} " "--home={} init --client-only" @@ -178,19 +195,20 @@ class K8sHelmConnector(K8sConnector): else: self.log.info("Helm client already initialized") - self.log.info("Cluster initialized {}".format(cluster_uuid)) + self.log.info("Cluster {} initialized".format(cluster_id)) return cluster_uuid, n2vc_installed_sw async def repo_add( self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" ): - - self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format( + cluster_id, repo_type, name, url)) # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # helm repo update @@ -214,11 +232,12 @@ class K8sHelmConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - self.log.debug("list repositories for cluster {}".format(cluster_uuid)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("list repositories for cluster {}".format(cluster_id)) # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) command = "{} --kubeconfig={} --home={} repo list --output yaml".format( @@ -237,16 +256,17 @@ class K8sHelmConnector(K8sConnector): """ Remove a repository from OSM - :param cluster_uuid: the cluster + :param cluster_uuid: the cluster or 'namespace:cluster' :param name: repo name in OSM :return: True if successful """ - self.log.debug("list repositories for cluster {}".format(cluster_uuid)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("list repositories for cluster {}".format(cluster_id)) # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) command = "{} --kubeconfig={} --home={} repo remove {}".format( @@ -259,87 +279,72 @@ class K8sHelmConnector(K8sConnector): self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False ) -> bool: - self.log.debug( - "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid) - ) + namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}" + .format(cluster_id, uninstall_sw)) # get kube and helm directories _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=False + cluster_name=cluster_id, create_if_not_exist=False ) - # uninstall releases if needed - releases = await self.instances_list(cluster_uuid=cluster_uuid) - if len(releases) > 0: - if force: - for r in releases: - try: - kdu_instance = r.get("Name") - chart = r.get("Chart") - self.log.debug( - "Uninstalling {} -> {}".format(chart, kdu_instance) - ) - await self.uninstall( - cluster_uuid=cluster_uuid, kdu_instance=kdu_instance - ) - except Exception as e: - self.log.error( - "Error uninstalling release {}: {}".format(kdu_instance, e) - ) - else: - msg = ( - "Cluster has releases and not force. Cannot reset K8s " - "environment. Cluster uuid: {}" - ).format(cluster_uuid) - self.log.error(msg) - raise K8sException(msg) - + # uninstall releases if needed. if uninstall_sw: + releases = await self.instances_list(cluster_uuid=cluster_uuid) + if len(releases) > 0: + if force: + for r in releases: + try: + kdu_instance = r.get("Name") + chart = r.get("Chart") + self.log.debug( + "Uninstalling {} -> {}".format(chart, kdu_instance) + ) + await self.uninstall( + cluster_uuid=cluster_uuid, kdu_instance=kdu_instance + ) + except Exception as e: + self.log.error( + "Error uninstalling release {}: {}".format(kdu_instance, e) + ) + else: + msg = ( + "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" + ).format(cluster_id) + self.log.warn(msg) + uninstall_sw = False # Allow to remove k8s cluster without removing Tiller - self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid)) - - # find namespace for tiller pod - command = "{} --kubeconfig={} get deployments --all-namespaces".format( - self.kubectl_command, config_filename - ) - output, _rc = await self._local_async_exec( - command=command, raise_exception_on_error=False - ) - output_table = K8sHelmConnector._output_to_table(output=output) - namespace = None - for r in output_table: - try: - if "tiller-deploy" in r[1]: - namespace = r[0] - break - except Exception: - pass - else: - msg = "Tiller deployment not found in cluster {}".format(cluster_uuid) - self.log.error(msg) - - self.log.debug("namespace for tiller: {}".format(namespace)) + if uninstall_sw: - force_str = "--force" + self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id)) - if namespace: - # delete tiller deployment - self.log.debug( - "Deleting tiller deployment for cluster {}, namespace {}".format( - cluster_uuid, namespace - ) + if not namespace: + # find namespace for tiller pod + command = "{} --kubeconfig={} get deployments --all-namespaces".format( + self.kubectl_command, config_filename ) - command = ( - "{} --namespace {} --kubeconfig={} {} delete deployment " - "tiller-deploy" - ).format(self.kubectl_command, namespace, config_filename, force_str) - await self._local_async_exec( + output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=False ) + output_table = K8sHelmConnector._output_to_table(output=output) + namespace = None + for r in output_table: + try: + if "tiller-deploy" in r[1]: + namespace = r[0] + break + except Exception: + pass + else: + msg = "Tiller deployment not found in cluster {}".format(cluster_id) + self.log.error(msg) + self.log.debug("namespace for tiller: {}".format(namespace)) + + if namespace: # uninstall tiller from cluster self.log.debug( - "Uninstalling tiller from cluster {}".format(cluster_uuid) + "Uninstalling tiller from cluster {}".format(cluster_id) ) command = "{} --kubeconfig={} --home={} reset".format( self._helm_command, config_filename, helm_dir @@ -348,11 +353,23 @@ class K8sHelmConnector(K8sConnector): output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=True ) + # Delete clusterrolebinding and serviceaccount. + # Ignore if errors for backward compatibility + command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s." + "io/osm-tiller-cluster-rule").format(self.kubectl_command, + config_filename) + output, _rc = await self._local_async_exec(command=command, + raise_exception_on_error=False) + command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\ + format(self.kubectl_command, config_filename, self.service_account) + output, _rc = await self._local_async_exec(command=command, + raise_exception_on_error=False) + else: self.log.debug("namespace not found") # delete cluster directory - direct = self.fs.path + "/" + cluster_uuid + direct = self.fs.path + "/" + cluster_id self.log.debug("Removing directory {}".format(direct)) shutil.rmtree(direct, ignore_errors=True) @@ -370,17 +387,18 @@ class K8sHelmConnector(K8sConnector): namespace: str = None, ): - self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # params to str # params_str = K8sHelmConnector._params_to_set_option(params) params_str, file_to_delete = self._params_to_file_option( - cluster_uuid=cluster_uuid, params=params + cluster_id=cluster_id, params=params ) timeout_str = "" @@ -410,7 +428,7 @@ class K8sHelmConnector(K8sConnector): kdu_instance = K8sHelmConnector._generate_release_name(kdu_model) try: result = await self._status_kdu( - cluster_uuid=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, show_error_log=False, ) @@ -449,7 +467,7 @@ class K8sHelmConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_uuid=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, db_dict=db_dict, operation="install", @@ -477,7 +495,7 @@ class K8sHelmConnector(K8sConnector): # write final status await self._store_status( - cluster_uuid=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, db_dict=db_dict, operation="install", @@ -497,15 +515,16 @@ class K8sHelmConnector(K8sConnector): """ returns a list of deployed releases in a cluster - :param cluster_uuid: the cluster + :param cluster_uuid: the 'cluster' or 'namespace:cluster' :return: """ - self.log.debug("list releases for cluster {}".format(cluster_uuid)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("list releases for cluster {}".format(cluster_id)) # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) command = "{} --kubeconfig={} --home={} list --output yaml".format( @@ -532,17 +551,18 @@ class K8sHelmConnector(K8sConnector): db_dict: dict = None, ): - self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id)) # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) # params to str # params_str = K8sHelmConnector._params_to_set_option(params) params_str, file_to_delete = self._params_to_file_option( - cluster_uuid=cluster_uuid, params=params + cluster_id=cluster_id, params=params ) timeout_str = "" @@ -589,7 +609,7 @@ class K8sHelmConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_uuid=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, db_dict=db_dict, operation="upgrade", @@ -616,7 +636,7 @@ class K8sHelmConnector(K8sConnector): # write final status await self._store_status( - cluster_uuid=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, db_dict=db_dict, operation="upgrade", @@ -644,15 +664,16 @@ class K8sHelmConnector(K8sConnector): self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "rollback kdu_instance {} to revision {} from cluster {}".format( - kdu_instance, revision, cluster_uuid + kdu_instance, revision, cluster_id ) ) # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format( @@ -668,7 +689,7 @@ class K8sHelmConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_uuid=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, db_dict=db_dict, operation="rollback", @@ -686,7 +707,7 @@ class K8sHelmConnector(K8sConnector): # write final status await self._store_status( - cluster_uuid=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, db_dict=db_dict, operation="rollback", @@ -716,20 +737,21 @@ class K8sHelmConnector(K8sConnector): (this call would happen after all _terminate-config-primitive_ of the VNF are invoked). - :param cluster_uuid: UUID of a K8s cluster known by OSM + :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id :param kdu_instance: unique name for the KDU instance to be deleted :return: True if successful """ + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "uninstall kdu_instance {} from cluster {}".format( - kdu_instance, cluster_uuid + kdu_instance, cluster_id ) ) # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) command = "{} --kubeconfig={} --home={} delete --purge {}".format( @@ -753,7 +775,7 @@ class K8sHelmConnector(K8sConnector): ) -> str: """Exec primitive (Juju action) - :param cluster_uuid str: The UUID of the cluster + :param cluster_uuid str: The UUID of the cluster or namespace:cluster :param kdu_instance str: The unique name of the KDU instance :param primitive_name: Name of action that will be executed :param timeout: Timeout for action execution @@ -802,15 +824,78 @@ class K8sHelmConnector(K8sConnector): async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str: # call internal function + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) return await self._status_kdu( - cluster_uuid=cluster_uuid, + cluster_id=cluster_id, kdu_instance=kdu_instance, show_error_log=True, return_text=True, ) + async def get_services(self, + cluster_uuid: str, + kdu_instance: str, + namespace: str) -> list: + + self.log.debug( + "get_services: cluster_uuid: {}, kdu_instance: {}".format( + cluster_uuid, kdu_instance + ) + ) + + status = await self._status_kdu( + cluster_uuid, kdu_instance, return_text=False + ) + + service_names = self._parse_helm_status_service_info(status) + service_list = [] + for service in service_names: + service = await self.get_service(cluster_uuid, service, namespace) + service_list.append(service) + + return service_list + + async def get_service(self, + cluster_uuid: str, + service_name: str, + namespace: str) -> object: + + self.log.debug( + "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( + service_name, namespace, cluster_uuid) + ) + + # get paths + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format( + self.kubectl_command, config_filename, namespace, service_name + ) + + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) + + data = yaml.load(output, Loader=yaml.SafeLoader) + + service = { + "name": service_name, + "type": self._get_deep(data, ("spec", "type")), + "ports": self._get_deep(data, ("spec", "ports")), + "cluster_ip": self._get_deep(data, ("spec", "clusterIP")) + } + if service["type"] == "LoadBalancer": + ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress")) + ip_list = [elem["ip"] for elem in ip_map_list] + service["external_ip"] = ip_list + + return service + async def synchronize_repos(self, cluster_uuid: str): + _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug("syncronize repos for cluster helm-id: {}",) try: update_repos_timeout = ( @@ -950,7 +1035,7 @@ class K8sHelmConnector(K8sConnector): async def _status_kdu( self, - cluster_uuid: str, + cluster_id: str, kdu_instance: str, show_error_log: bool = False, return_text: bool = False, @@ -960,7 +1045,7 @@ class K8sHelmConnector(K8sConnector): # config filename _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( - cluster_name=cluster_uuid, create_if_not_exist=True + cluster_name=cluster_id, create_if_not_exist=True ) command = "{} --kubeconfig={} --home={} status {} --output yaml".format( @@ -1042,7 +1127,7 @@ class K8sHelmConnector(K8sConnector): async def _store_status( self, - cluster_uuid: str, + cluster_id: str, operation: str, kdu_instance: str, check_every: float = 10, @@ -1052,12 +1137,12 @@ class K8sHelmConnector(K8sConnector): while True: try: await asyncio.sleep(check_every) - detailed_status = await self.status_kdu( - cluster_uuid=cluster_uuid, kdu_instance=kdu_instance + detailed_status = await self._status_kdu( + cluster_id=cluster_id, kdu_instance=kdu_instance, + return_text=False ) status = detailed_status.get("info").get("Description") - self.log.debug("STATUS:\n{}".format(status)) - self.log.debug("DETAILED STATUS:\n{}".format(detailed_status)) + self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status)) # write status to db result = await self.write_app_status_to_db( db_dict=db_dict, @@ -1072,16 +1157,16 @@ class K8sHelmConnector(K8sConnector): self.log.debug("Task cancelled") return except Exception as e: - self.log.debug("_store_status exception: {}".format(str(e))) + self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True) pass finally: if run_once: return - async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool: + async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool: status = await self._status_kdu( - cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False + cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False ) # extract info.status.resources-> str @@ -1127,6 +1212,37 @@ class K8sHelmConnector(K8sConnector): return ready + def _parse_helm_status_service_info(self, status): + + # extract info.status.resources-> str + # format: + # ==> v1/Deployment + # NAME READY UP-TO-DATE AVAILABLE AGE + # halting-horse-mongodb 0/1 1 0 0s + # halting-petit-mongodb 1/1 1 0 0s + # blank line + resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources")) + + service_list = [] + first_line_skipped = service_found = False + for line in resources: + if not service_found: + if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service": + service_found = True + continue + else: + if len(line) >= 2 and line[0] == "==>": + service_found = first_line_skipped = False + continue + if not line: + continue + if not first_line_skipped: + first_line_skipped = True + continue + service_list.append(line[0]) + + return service_list + @staticmethod def _get_deep(dictionary: dict, members: tuple): target = dictionary @@ -1158,10 +1274,10 @@ class K8sHelmConnector(K8sConnector): # params for use in -f file # returns values file option and filename (in order to delete it at the end) - def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str): + def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str): if params and len(params) > 0: - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + self._get_paths(cluster_name=cluster_id, create_if_not_exist=True) def get_random_number(): r = random.randrange(start=1, stop=99999999)