X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=89f619989a57eb6223eb58559508efcee090c35b;hp=842bbe3cb7136b09769946de7cd02845c29caf3d;hb=fa02f8a90b7fe1e1b7a80feedef4132bef1ca3e4;hpb=1c83f2e4d061ad37ba898e114cb42e70fdee5145 diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 842bbe3..89f6199 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -26,11 +26,11 @@ import time import shlex import shutil import stat -import subprocess import os import yaml from uuid import uuid4 +from n2vc.config import EnvironConfig from n2vc.exceptions import K8sException from n2vc.k8s_conn import K8sConnector @@ -42,6 +42,7 @@ class K8sHelmBaseConnector(K8sConnector): ################################### P U B L I C #################################### #################################################################################### """ + service_account = "osm" def __init__( @@ -68,6 +69,7 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Initializing K8S Helm connector") + self.config = EnvironConfig() # random numbers for release name generation random.seed(time.time()) @@ -82,17 +84,26 @@ class K8sHelmBaseConnector(K8sConnector): self._helm_command = helm_command self._check_file_exists(filename=helm_command, exception_if_not_exists=True) + # obtain stable repo url from config or apply default + self._stable_repo_url = self.config.get("stablerepourl") + if self._stable_repo_url == "None": + self._stable_repo_url = None + @staticmethod def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): """ Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only cluster_id for backward compatibility """ - namespace, _, cluster_id = cluster_uuid.rpartition(':') + namespace, _, cluster_id = cluster_uuid.rpartition(":") return namespace, cluster_id async def init_env( - self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None + self, + k8s_creds: str, + namespace: str = "kube-system", + reuse_cluster_uuid=None, + **kwargs, ) -> (str, bool): """ It prepares a given K8s cluster environment to run Charts @@ -102,6 +113,7 @@ class K8sHelmBaseConnector(K8sConnector): :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse + :param kwargs: Additional parameters (None yet) :return: uuid of the K8s cluster and True if connector has installed some software in the cluster (on error, an exception will be raised) @@ -114,7 +126,9 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id = str(uuid4()) cluster_uuid = "{}:{}".format(namespace, cluster_id) - self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)) + self.log.debug( + "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) + ) paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True @@ -135,11 +149,14 @@ class K8sHelmBaseConnector(K8sConnector): return cluster_uuid, n2vc_installed_sw async def repo_add( - self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" + self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" ): _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format( - cluster_id, repo_type, name, url)) + self.log.debug( + "Cluster {}, adding {} repository {}. URL: {}".format( + cluster_id, repo_type, name, url + ) + ) # sync local dir self.fs.sync(from_path=cluster_id) @@ -150,18 +167,18 @@ class K8sHelmBaseConnector(K8sConnector): ) # helm repo update - command = "{} repo update".format( - self._helm_command - ) + command = "{} repo update".format(self._helm_command) self.log.debug("updating repo: {}".format(command)) - await self._local_async_exec(command=command, raise_exception_on_error=False, env=env) + await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) # helm repo add name url - command = "{} repo add {} {}".format( - self._helm_command, name, url - ) + command = "{} repo add {} {}".format(self._helm_command, name, url) self.log.debug("adding repo: {}".format(command)) - await self._local_async_exec(command=command, raise_exception_on_error=True, env=env) + await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) # sync fs self.fs.reverse_sync(from_path=cluster_id) @@ -184,9 +201,7 @@ class K8sHelmBaseConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) - command = "{} repo list --output yaml".format( - self._helm_command - ) + command = "{} repo list --output yaml".format(self._helm_command) # Set exception to false because if there are no repos just want an empty list output, _rc = await self._local_async_exec( @@ -219,21 +234,37 @@ class K8sHelmBaseConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) - command = "{} repo remove {}".format( - self._helm_command, name + command = "{} repo remove {}".format(self._helm_command, name) + await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env ) - await self._local_async_exec(command=command, raise_exception_on_error=True, env=env) # sync fs self.fs.reverse_sync(from_path=cluster_id) async def reset( - self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False + self, + cluster_uuid: str, + force: bool = False, + uninstall_sw: bool = False, + **kwargs, ) -> bool: + """Reset a cluster + + Resets the Kubernetes cluster by removing the helm deployment that represents it. + :param cluster_uuid: The UUID of the cluster to reset + :param force: Boolean to force the reset + :param uninstall_sw: Boolean to force the reset + :param kwargs: Additional parameters (None yet) + :return: Returns True if successful or raises an exception. + """ namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}" - .format(cluster_id, uninstall_sw)) + self.log.debug( + "Resetting K8s environment. cluster uuid: {} uninstall={}".format( + cluster_id, uninstall_sw + ) + ) # sync local dir self.fs.sync(from_path=cluster_id) @@ -258,14 +289,18 @@ class K8sHelmBaseConnector(K8sConnector): # that in some cases of previously installed helm releases it # raised an error self.log.warn( - "Error uninstalling release {}: {}".format(kdu_instance, e) + "Error uninstalling release {}: {}".format( + kdu_instance, e + ) ) else: msg = ( "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" ).format(cluster_id) self.log.warn(msg) - uninstall_sw = False # Allow to remove k8s cluster without removing Tiller + uninstall_sw = ( + False # Allow to remove k8s cluster without removing Tiller + ) if uninstall_sw: await self._uninstall_sw(cluster_id, namespace) @@ -279,28 +314,20 @@ class K8sHelmBaseConnector(K8sConnector): return True - async def install( - self, - cluster_uuid: str, - kdu_model: str, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None, - kdu_name: str = None, - namespace: str = None, + async def _install_impl( + self, + cluster_id: str, + kdu_model: str, + paths: dict, + env: dict, + kdu_instance: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, + kdu_name: str = None, + namespace: str = None, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) - - # sync local dir - self.fs.sync(from_path=cluster_id) - - # init env, paths - paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True - ) - # params to str params_str, file_to_delete = self._params_to_file_option( cluster_id=cluster_id, params=params @@ -314,25 +341,9 @@ class K8sHelmBaseConnector(K8sConnector): version = str(parts[1]) kdu_model = parts[0] - # generate a name for the release. Then, check if already exists - kdu_instance = None - while kdu_instance is None: - kdu_instance = self._generate_release_name(kdu_model) - try: - result = await self._status_kdu( - cluster_id=cluster_id, - kdu_instance=kdu_instance, - namespace=namespace, - show_error_log=False, - ) - if result is not None: - # instance already exists: generate a new one - kdu_instance = None - except K8sException: - pass - - command = self._get_install_command(kdu_model, kdu_instance, namespace, - params_str, version, atomic, timeout) + command = self._get_install_command( + kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + ) self.log.debug("installing: {}".format(command)) @@ -390,12 +401,6 @@ class K8sHelmBaseConnector(K8sConnector): self.log.error(msg) raise K8sException(msg) - # sync fs - self.fs.reverse_sync(from_path=cluster_id) - - self.log.debug("Returning kdu_instance {}".format(kdu_instance)) - return kdu_instance - async def upgrade( self, cluster_uuid: str, @@ -435,8 +440,15 @@ class K8sHelmBaseConnector(K8sConnector): version = str(parts[1]) kdu_model = parts[0] - command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"], - params_str, version, atomic, timeout) + command = self._get_upgrade_command( + kdu_model, + kdu_instance, + instance_info["namespace"], + params_str, + version, + atomic, + timeout, + ) self.log.debug("upgrading: {}".format(command)) @@ -507,6 +519,24 @@ class K8sHelmBaseConnector(K8sConnector): else: return 0 + async def scale( + self, + kdu_instance: str, + scale: int, + resource_name: str, + total_timeout: float = 1800, + **kwargs, + ): + raise NotImplementedError("Method not implemented") + + async def get_scale_count( + self, + resource_name: str, + kdu_instance: str, + **kwargs, + ): + raise NotImplementedError("Method not implemented") + async def rollback( self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): @@ -531,8 +561,9 @@ class K8sHelmBaseConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) - command = self._get_rollback_command(kdu_instance, instance_info["namespace"], - revision) + command = self._get_rollback_command( + kdu_instance, instance_info["namespace"], revision + ) self.log.debug("rolling_back: {}".format(command)) @@ -592,7 +623,7 @@ class K8sHelmBaseConnector(K8sConnector): else: return 0 - async def uninstall(self, cluster_uuid: str, kdu_instance: str): + async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs): """ Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call (this call should happen after all _terminate-config-primitive_ of the VNF @@ -600,14 +631,13 @@ class K8sHelmBaseConnector(K8sConnector): :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id :param kdu_instance: unique name for the KDU instance to be deleted + :param kwargs: Additional parameters (None yet) :return: True if successful """ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "uninstall kdu_instance {} from cluster {}".format( - kdu_instance, cluster_id - ) + "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id) ) # sync local dir @@ -671,6 +701,7 @@ class K8sHelmBaseConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + **kwargs, ) -> str: """Exec primitive (Juju action) @@ -680,6 +711,7 @@ class K8sHelmBaseConnector(K8sConnector): :param timeout: Timeout for action execution :param params: Dictionary of all the parameters needed for the action :db_dict: Dictionary for any additional data + :param kwargs: Additional parameters (None yet) :return: Returns the output of the action """ @@ -688,10 +720,9 @@ class K8sHelmBaseConnector(K8sConnector): "different from rollback, upgrade and status" ) - async def get_services(self, - cluster_uuid: str, - kdu_instance: str, - namespace: str) -> list: + async def get_services( + self, cluster_uuid: str, kdu_instance: str, namespace: str + ) -> list: """ Returns a list of services defined for the specified kdu instance. @@ -731,14 +762,14 @@ class K8sHelmBaseConnector(K8sConnector): return service_list - async def get_service(self, - cluster_uuid: str, - service_name: str, - namespace: str) -> object: + async def get_service( + self, cluster_uuid: str, service_name: str, namespace: str + ) -> object: self.log.debug( "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( - service_name, namespace, cluster_uuid) + service_name, namespace, cluster_uuid + ) ) _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) @@ -753,8 +784,30 @@ class K8sHelmBaseConnector(K8sConnector): return service - async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str: + async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str: + """ + This call would retrieve tha current state of a given KDU instance. It would be + would allow to retrieve the _composition_ (i.e. K8s objects) and _specific + values_ of the configuration parameters applied to a given instance. This call + would be based on the `status` call. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance + :param kwargs: Additional parameters (None yet) + :return: If successful, it will return the following vector of arguments: + - K8s `namespace` in the cluster where the KDU lives + - `state` of the KDU instance. It can be: + - UNKNOWN + - DEPLOYED + - DELETED + - SUPERSEDED + - FAILED or + - DELETING + - List of `resources` (objects) that this release consists of, sorted by kind, + and the status of those resources + - Last `deployment_time`. + """ self.log.debug( "status_kdu: cluster_uuid: {}, kdu_instance: {}".format( cluster_uuid, kdu_instance @@ -773,8 +826,11 @@ class K8sHelmBaseConnector(K8sConnector): break else: # instance does not exist - raise K8sException("Instance name: {} not found in cluster: {}".format( - kdu_instance, cluster_id)) + raise K8sException( + "Instance name: {} not found in cluster: {}".format( + kdu_instance, cluster_id + ) + ) status = await self._status_kdu( cluster_id=cluster_id, @@ -833,14 +889,19 @@ class K8sHelmBaseConnector(K8sConnector): repo_id = db_repo.get("_id") if curr_repo_url != db_repo["url"]: if curr_repo_url: - self.log.debug("repo {} url changed, delete and and again".format( - db_repo["url"])) + self.log.debug( + "repo {} url changed, delete and and again".format( + db_repo["url"] + ) + ) await self.repo_remove(cluster_uuid, db_repo["name"]) deleted_repo_list.append(repo_id) # add repo self.log.debug("add repo {}".format(db_repo["name"])) - await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"]) + await self.repo_add( + cluster_uuid, db_repo["name"], db_repo["url"] + ) added_repo_dict[repo_id] = db_repo["name"] except Exception as e: raise K8sException( @@ -872,18 +933,6 @@ class K8sHelmBaseConnector(K8sConnector): self.log.error("Error synchronizing repos: {}".format(e)) raise Exception("Error synchronizing repos: {}".format(e)) - def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: - repo_ids = [] - cluster_filter = {"_admin.helm-chart.id": cluster_uuid} - cluster = self.db.get_one("k8sclusters", cluster_filter) - if cluster: - repo_ids = cluster.get("_admin").get("helm_chart_repos") or [] - return repo_ids - else: - raise K8sException( - "k8cluster with helm-id : {} not found".format(cluster_uuid) - ) - def _get_db_repos_dict(self, repo_ids: list): db_repos_dict = {} for repo_id in repo_ids: @@ -927,22 +976,30 @@ class K8sHelmBaseConnector(K8sConnector): """ @abc.abstractmethod - async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None, - show_error_log: bool = False, return_text: bool = False): + async def _status_kdu( + self, + cluster_id: str, + kdu_instance: str, + namespace: str = None, + show_error_log: bool = False, + return_text: bool = False, + ): """ Implements the helm version dependent method to obtain status of a helm instance """ @abc.abstractmethod - def _get_install_command(self, kdu_model, kdu_instance, namespace, - params_str, version, atomic, timeout) -> str: + def _get_install_command( + self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + ) -> str: """ Obtain command to be executed to delete the indicated instance """ @abc.abstractmethod - def _get_upgrade_command(self, kdu_model, kdu_instance, namespace, - params_str, version, atomic, timeout) -> str: + def _get_upgrade_command( + self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + ) -> str: """ Obtain command to be executed to upgrade the indicated instance """ @@ -960,8 +1017,9 @@ class K8sHelmBaseConnector(K8sConnector): """ @abc.abstractmethod - def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str, - version: str): + def _get_inspect_command( + self, show_command: str, kdu_model: str, repo_str: str, version: str + ): """ Obtain command to be executed to obtain information about the kdu """ @@ -975,6 +1033,12 @@ class K8sHelmBaseConnector(K8sConnector): For Helm v3 it does nothing and does not need to be callled """ + @abc.abstractmethod + def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: + """ + Obtains the cluster repos identifiers + """ + """ #################################################################################### ################################### P R I V A T E ################################## @@ -1071,38 +1135,25 @@ class K8sHelmBaseConnector(K8sConnector): of dictionaries """ new_list = [] - for dictionary in input_list: - new_dict = dict((k.lower(), v) for k, v in dictionary.items()) - new_list.append(new_dict) + if input_list: + for dictionary in input_list: + new_dict = dict((k.lower(), v) for k, v in dictionary.items()) + new_list.append(new_dict) return new_list - def _local_exec(self, command: str) -> (str, int): - command = self._remove_multiple_spaces(command) - self.log.debug("Executing sync local command: {}".format(command)) - # raise exception if fails - output = "" - try: - output = subprocess.check_output( - command, shell=True, universal_newlines=True - ) - return_code = 0 - self.log.debug(output) - except Exception: - return_code = 1 - - return output, return_code - async def _local_async_exec( self, command: str, raise_exception_on_error: bool = False, show_error_log: bool = True, encode_utf8: bool = False, - env: dict = None + env: dict = None, ) -> (str, int): command = K8sHelmBaseConnector._remove_multiple_spaces(command) - self.log.debug("Executing async local command: {}, env: {}".format(command, env)) + self.log.debug( + "Executing async local command: {}, env: {}".format(command, env) + ) # split command command = shlex.split(command) @@ -1113,8 +1164,10 @@ class K8sHelmBaseConnector(K8sConnector): try: process = await asyncio.create_subprocess_exec( - *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - env=environ + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=environ, ) # wait for command terminate @@ -1158,18 +1211,22 @@ class K8sHelmBaseConnector(K8sConnector): else: return "", -1 - async def _local_async_exec_pipe(self, - command1: str, - command2: str, - raise_exception_on_error: bool = True, - show_error_log: bool = True, - encode_utf8: bool = False, - env: dict = None): + async def _local_async_exec_pipe( + self, + command1: str, + command2: str, + raise_exception_on_error: bool = True, + show_error_log: bool = True, + encode_utf8: bool = False, + env: dict = None, + ): command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) command = "{} | {}".format(command1, command2) - self.log.debug("Executing async local command: {}, env: {}".format(command, env)) + self.log.debug( + "Executing async local command: {}, env: {}".format(command, env) + ) # split command command1 = shlex.split(command1) @@ -1183,9 +1240,9 @@ class K8sHelmBaseConnector(K8sConnector): read, write = os.pipe() await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ) os.close(write) - process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read, - stdout=asyncio.subprocess.PIPE, - env=environ) + process_2 = await asyncio.create_subprocess_exec( + *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ + ) os.close(read) stdout, stderr = await process_2.communicate() @@ -1261,7 +1318,7 @@ class K8sHelmBaseConnector(K8sConnector): "name": service_name, "type": self._get_deep(data, ("spec", "type")), "ports": self._get_deep(data, ("spec", "ports")), - "cluster_ip": self._get_deep(data, ("spec", "clusterIP")) + "cluster_ip": self._get_deep(data, ("spec", "clusterIP")), } if service["type"] == "LoadBalancer": ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress")) @@ -1293,7 +1350,9 @@ class K8sHelmBaseConnector(K8sConnector): version = "--version {}".format(str(parts[1])) kdu_model = parts[0] - full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version) + full_command = self._get_inspect_command( + inspect_command, kdu_model, repo_str, version + ) output, _rc = await self._local_async_exec( command=full_command, encode_utf8=True ) @@ -1314,11 +1373,13 @@ class K8sHelmBaseConnector(K8sConnector): try: await asyncio.sleep(check_every) detailed_status = await self._status_kdu( - cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace, - return_text=False + cluster_id=cluster_id, + kdu_instance=kdu_instance, + namespace=namespace, + return_text=False, ) status = detailed_status.get("info").get("description") - self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status)) + self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) # write status to db result = await self.write_app_status_to_db( db_dict=db_dict, @@ -1333,7 +1394,9 @@ class K8sHelmBaseConnector(K8sConnector): self.log.debug("Task cancelled") return except Exception as e: - self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True) + self.log.debug( + "_store_status exception: {}".format(str(e)), exc_info=True + ) pass finally: if run_once: @@ -1344,9 +1407,7 @@ class K8sHelmBaseConnector(K8sConnector): def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str): if params and len(params) > 0: - self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True - ) + self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True) def get_random_number(): r = random.randrange(start=1, stop=99999999) @@ -1388,15 +1449,16 @@ class K8sHelmBaseConnector(K8sConnector): return params_str @staticmethod - def _generate_release_name(chart_name: str): + def generate_kdu_instance_name(**kwargs): + chart_name = kwargs["kdu_model"] # check embeded chart (file or dir) if chart_name.startswith("/"): # extract file or directory name - chart_name = chart_name[chart_name.rfind("/") + 1:] + chart_name = chart_name[chart_name.rfind("/") + 1 :] # check URL elif "://" in chart_name: # extract last portion of URL - chart_name = chart_name[chart_name.rfind("/") + 1:] + chart_name = chart_name[chart_name.rfind("/") + 1 :] name = "" for c in chart_name: