X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=a6cb11a4b4ae848d8b467b312dc373dfe835532f;hb=085fa8d4658a9b621354d5a08853086e2696abdc;hp=57010d132d149e7579a2d846f316d384ec003b26;hpb=1188b5d69127c2c60aa8df8f98a0a7925bb473b8;p=osm%2FN2VC.git diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 57010d1..a6cb11a 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -90,6 +90,9 @@ class K8sHelmBaseConnector(K8sConnector): if self._stable_repo_url == "None": self._stable_repo_url = None + # Lock to avoid concurrent execution of helm commands + self.cmd_lock = asyncio.Lock() + def _get_namespace(self, cluster_uuid: str) -> str: """ Obtains the namespace used by the cluster with the uuid passed by argument @@ -175,15 +178,6 @@ class K8sHelmBaseConnector(K8sConnector): # sync local dir self.fs.sync(from_path=cluster_uuid) - # helm repo update - command = "env KUBECONFIG={} {} repo update".format( - paths["kube_config"], self._helm_command - ) - self.log.debug("updating repo: {}".format(command)) - await self._local_async_exec( - command=command, raise_exception_on_error=False, env=env - ) - # helm repo add name url command = ("env KUBECONFIG={} {} repo add {} {}").format( paths["kube_config"], self._helm_command, name, url @@ -209,6 +203,40 @@ class K8sHelmBaseConnector(K8sConnector): command=command, raise_exception_on_error=True, env=env ) + # helm repo update + command = "env KUBECONFIG={} {} repo update {}".format( + paths["kube_config"], self._helm_command, name + ) + self.log.debug("updating repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + + # sync fs + self.fs.reverse_sync(from_path=cluster_uuid) + + async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"): + self.log.debug( + "Cluster {}, updating {} repository {}".format( + cluster_uuid, repo_type, name + ) + ) + + # init_env + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + # helm repo update + command = "{} repo update {}".format(self._helm_command, name) + self.log.debug("updating repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + # sync fs self.fs.reverse_sync(from_path=cluster_uuid) @@ -346,6 +374,9 @@ class K8sHelmBaseConnector(K8sConnector): return True + def _is_helm_chart_a_file(self, chart_name: str): + return chart_name.count("/") > 1 + async def _install_impl( self, cluster_id: str, @@ -373,6 +404,10 @@ class K8sHelmBaseConnector(K8sConnector): # version kdu_model, version = self._split_version(kdu_model) + _, repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_id, repo) + command = self._get_install_command( kdu_model, kdu_instance, @@ -402,7 +437,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=False, ) ) @@ -431,8 +465,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=True, - check_every=0, ) if rc != 0: @@ -449,6 +481,8 @@ class K8sHelmBaseConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + namespace: str = None, + force: bool = False, ): self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) @@ -456,9 +490,13 @@ class K8sHelmBaseConnector(K8sConnector): self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace - instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) - if not instance_info: - raise K8sException("kdu_instance {} not found".format(kdu_instance)) + + # set namespace + if not namespace: + instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) + if not instance_info: + raise K8sException("kdu_instance {} not found".format(kdu_instance)) + namespace = instance_info["namespace"] # init env, paths paths, env = self._init_paths_env( @@ -476,15 +514,20 @@ class K8sHelmBaseConnector(K8sConnector): # version kdu_model, version = self._split_version(kdu_model) + _, repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_uuid, repo) + command = self._get_upgrade_command( kdu_model, kdu_instance, - instance_info["namespace"], + namespace, params_str, version, atomic, timeout, paths["kube_config"], + force, ) self.log.debug("upgrading: {}".format(command)) @@ -502,10 +545,9 @@ class K8sHelmBaseConnector(K8sConnector): coro_or_future=self._store_status( cluster_id=cluster_uuid, kdu_instance=kdu_instance, - namespace=instance_info["namespace"], + namespace=namespace, db_dict=db_dict, operation="upgrade", - run_once=False, ) ) @@ -530,11 +572,9 @@ class K8sHelmBaseConnector(K8sConnector): await self._store_status( cluster_id=cluster_uuid, kdu_instance=kdu_instance, - namespace=instance_info["namespace"], + namespace=namespace, db_dict=db_dict, operation="upgrade", - run_once=True, - check_every=0, ) if rc != 0: @@ -609,10 +649,6 @@ class K8sHelmBaseConnector(K8sConnector): kdu_model, version = self._split_version(kdu_model) repo_url = await self._find_repo(kdu_model, cluster_uuid) - if not repo_url: - raise K8sException( - "Repository not found for kdu_model {}".format(kdu_model) - ) _, replica_str = await self._get_replica_count_url( kdu_model, repo_url, resource_name @@ -648,7 +684,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="scale", - run_once=False, ) ) @@ -671,8 +706,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="scale", - run_once=True, - check_every=0, ) if rc != 0: @@ -699,7 +732,7 @@ class K8sHelmBaseConnector(K8sConnector): cluster_uuid: The UUID of the cluster resource_name: Resource name kdu_instance: KDU instance name - kdu_model: The name or path of a bundle + kdu_model: The name or path of an Helm Chart kwargs: Additional parameters Returns: @@ -716,30 +749,42 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException("kdu_instance {} not found".format(kdu_instance)) # init env, paths - paths, env = self._init_paths_env( + paths, _ = self._init_paths_env( cluster_name=cluster_uuid, create_if_not_exist=True ) replicas = await self._get_replica_count_instance( - kdu_instance, instance_info["namespace"], paths["kube_config"] + kdu_instance=kdu_instance, + namespace=instance_info["namespace"], + kubeconfig=paths["kube_config"], + resource_name=resource_name, ) - # Get default value if scale count is not found from provided values - if not replicas: - repo_url = await self._find_repo(kdu_model, cluster_uuid) - if not repo_url: - raise K8sException( - "Repository not found for kdu_model {}".format(kdu_model) - ) + self.log.debug( + f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}" + ) + # Get default value if scale count is not found from provided values + # Important note: this piece of code shall only be executed in the first scaling operation, + # since it is expected that the _get_replica_count_instance is able to obtain the number of + # replicas when a scale operation was already conducted previously for this KDU/resource! + if replicas is None: + repo_url = await self._find_repo( + kdu_model=kdu_model, cluster_uuid=cluster_uuid + ) replicas, _ = await self._get_replica_count_url( - kdu_model, repo_url, resource_name + kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name ) - if not replicas: - msg = "Replica count not found. Cannot be scaled" - self.log.error(msg) - raise K8sException(msg) + self.log.debug( + f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource " + f"{resource_name} obtained: {replicas}" + ) + + if replicas is None: + msg = "Replica count not found. Cannot be scaled" + self.log.error(msg) + raise K8sException(msg) return int(replicas) @@ -788,7 +833,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=False, ) ) @@ -807,8 +851,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=True, - check_every=0, ) if rc != 0: @@ -905,6 +947,28 @@ class K8sHelmBaseConnector(K8sConnector): self.log.debug("Instance {} not found".format(kdu_instance)) return None + async def upgrade_charm( + self, + ee_id: str = None, + path: str = None, + charm_id: str = None, + charm_type: str = None, + timeout: float = None, + ) -> str: + """This method upgrade charms in VNFs + + Args: + ee_id: Execution environment id + path: Local path to the charm + charm_id: charm-id + charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm + timeout: (Float) Timeout for the ns update operation + + Returns: + The output of the update operation if status equals to "completed" + """ + raise K8sException("KDUs deployed with Helm do not support charm upgrade") + async def exec_primitive( self, cluster_uuid: str = None, @@ -1077,6 +1141,15 @@ class K8sHelmBaseConnector(K8sConnector): ) async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: + """Method to obtain the Helm Chart package's values + + Args: + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + + Returns: + str: the values of the Helm Chart package + """ self.log.debug( "inspect kdu_model values {} from (optional) repo: {}".format( @@ -1259,7 +1332,24 @@ class K8sHelmBaseConnector(K8sConnector): resource_name, kubeconfig, ) -> str: - """Obtain command to be executed to upgrade the indicated instance.""" + """Generates the command to scale a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + scale (int): Scale count + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + replica_str (str): The key under resource_name key where the scale count is stored + timeout (float): The time, in seconds, to wait + resource_name (str): The KDU's resource to scale + kubeconfig (str): Kubeconfig file path + + Returns: + str: command to scale a Helm Chart release + """ @abc.abstractmethod def _get_upgrade_command( @@ -1272,9 +1362,23 @@ class K8sHelmBaseConnector(K8sConnector): atomic, timeout, kubeconfig, + force, ) -> str: - """ - Obtain command to be executed to upgrade the indicated instance + """Generates the command to upgrade a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + params_str (str): Params used to upgrade the Helm Chart release + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + timeout (float): The time, in seconds, to wait + kubeconfig (str): Kubeconfig file path + force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. + Returns: + str: command to upgrade a Helm Chart release """ @abc.abstractmethod @@ -1297,8 +1401,17 @@ class K8sHelmBaseConnector(K8sConnector): def _get_inspect_command( self, show_command: str, kdu_model: str, repo_str: str, version: str ): - """ - Obtain command to be executed to obtain information about the kdu + """Generates the command to obtain the information about an Helm Chart package + (´helm show ...´ command) + + Args: + show_command: the second part of the command (`helm show `) + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + version: constraint with specific version of the Chart to use + + Returns: + str: the generated Helm Chart command """ @abc.abstractmethod @@ -1446,17 +1559,18 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=environ, - ) + async with self.cmd_lock: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=environ, + ) - # wait for command terminate - stdout, stderr = await process.communicate() + # wait for command terminate + stdout, stderr = await process.communicate() - return_code = process.returncode + return_code = process.returncode output = "" if stdout: @@ -1483,6 +1597,9 @@ class K8sHelmBaseConnector(K8sConnector): return output, return_code except asyncio.CancelledError: + # first, kill the process if it is still running + if process.returncode is None: + process.kill() raise except K8sException: raise @@ -1520,16 +1637,19 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - read, write = os.pipe() - await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ) - os.close(write) - process_2 = await asyncio.create_subprocess_exec( - *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ - ) - os.close(read) - stdout, stderr = await process_2.communicate() + async with self.cmd_lock: + read, write = os.pipe() + process_1 = await asyncio.create_subprocess_exec( + *command1, stdout=write, env=environ + ) + os.close(write) + process_2 = await asyncio.create_subprocess_exec( + *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ + ) + os.close(read) + stdout, stderr = await process_2.communicate() - return_code = process_2.returncode + return_code = process_2.returncode output = "" if stdout: @@ -1555,6 +1675,10 @@ class K8sHelmBaseConnector(K8sConnector): return output, return_code except asyncio.CancelledError: + # first, kill the processes if they are still running + for process in (process_1, process_2): + if process.returncode is None: + process.kill() raise except K8sException: raise @@ -1626,16 +1750,23 @@ class K8sHelmBaseConnector(K8sConnector): async def _exec_inspect_command( self, inspect_command: str, kdu_model: str, repo_url: str = None ): - """Obtains information about a kdu, no cluster (no env).""" + """Obtains information about an Helm Chart package (´helm show´ command) + + Args: + inspect_command: the Helm sub command (`helm show ...`) + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + + Returns: + str: the requested info about the Helm Chart package + """ repo_str = "" if repo_url: repo_str = " --repo {}".format(repo_url) - idx = kdu_model.find("/") - if idx >= 0: - idx += 1 - kdu_model = kdu_model[idx:] + # Obtain the Chart's name and store it in the var kdu_model + kdu_model, _ = self._split_repo(kdu_model=kdu_model) kdu_model, version = self._split_version(kdu_model) if version: @@ -1644,34 +1775,42 @@ class K8sHelmBaseConnector(K8sConnector): version_str = "" full_command = self._get_inspect_command( - inspect_command, kdu_model, repo_str, version_str + show_command=inspect_command, + kdu_model=kdu_model, + repo_str=repo_str, + version=version_str, ) - output, _rc = await self._local_async_exec(command=full_command) + output, _ = await self._local_async_exec(command=full_command) return output async def _get_replica_count_url( self, kdu_model: str, - repo_url: str, + repo_url: str = None, resource_name: str = None, - ): + ) -> (int, str): """Get the replica count value in the Helm Chart Values. Args: - kdu_model: The name or path of a bundle + kdu_model: The name or path of an Helm Chart repo_url: Helm Chart repository url resource_name: Resource name Returns: - True if replicas, False replicaCount + A tuple with: + - The number of replicas of the specific instance; if not found, returns None; and + - The string corresponding to the replica count key in the Helm values """ kdu_values = yaml.load( - await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader + await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url), + Loader=yaml.SafeLoader, ) + self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}") + if not kdu_values: raise K8sException( "kdu_values not found for kdu_model {}".format(kdu_model) @@ -1692,10 +1831,10 @@ class K8sHelmBaseConnector(K8sConnector): replica_str = "" replicas = None - if kdu_values.get("replicaCount", None): + if kdu_values.get("replicaCount") is not None: replicas = kdu_values["replicaCount"] replica_str = "replicaCount" - elif kdu_values.get("replicas", None): + elif kdu_values.get("replicas") is not None: duplicate_check = True replicas = kdu_values["replicas"] replica_str = "replicas" @@ -1734,7 +1873,7 @@ class K8sHelmBaseConnector(K8sConnector): namespace: str, kubeconfig: str, resource_name: str = None, - ): + ) -> int: """Get the replica count value in the instance. Args: @@ -1744,7 +1883,7 @@ class K8sHelmBaseConnector(K8sConnector): resource_name: Resource name Returns: - True if replicas, False replicaCount + The number of replicas of the specific instance; if not found, returns None """ kdu_values = yaml.load( @@ -1752,23 +1891,23 @@ class K8sHelmBaseConnector(K8sConnector): Loader=yaml.SafeLoader, ) + self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}") + replicas = None if kdu_values: resource_values = ( kdu_values.get(resource_name, None) if resource_name else None ) - replicas = ( - ( - resource_values.get("replicaCount", None) - or resource_values.get("replicas", None) - ) - if resource_values - else ( - kdu_values.get("replicaCount", None) - or kdu_values.get("replicas", None) - ) - ) + + for replica_str in ("replicaCount", "replicas"): + if resource_values: + replicas = resource_values.get(replica_str) + else: + replicas = kdu_values.get(replica_str) + + if replicas is not None: + break return replicas @@ -1778,42 +1917,51 @@ class K8sHelmBaseConnector(K8sConnector): operation: str, kdu_instance: str, namespace: str = None, - check_every: float = 10, db_dict: dict = None, - run_once: bool = False, - ): - while True: - try: - await asyncio.sleep(check_every) - detailed_status = await self._status_kdu( - cluster_id=cluster_id, - kdu_instance=kdu_instance, - yaml_format=False, - namespace=namespace, - ) - status = detailed_status.get("info").get("description") - self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) - # write status to db - result = await self.write_app_status_to_db( - db_dict=db_dict, - status=str(status), - detailed_status=str(detailed_status), - operation=operation, - ) - if not result: - self.log.info("Error writing in database. Task exiting...") - return - except asyncio.CancelledError: - self.log.debug("Task cancelled") - return - except Exception as e: - self.log.debug( - "_store_status exception: {}".format(str(e)), exc_info=True - ) - pass - finally: - if run_once: - return + ) -> None: + """ + Obtains the status of the KDU instance based on Helm Charts, and stores it in the database. + + :param cluster_id (str): the cluster where the KDU instance is deployed + :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade") + :param kdu_instance (str): The KDU instance in relation to which the status is obtained + :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None + :param db_dict (dict): A dictionary with the database necessary information. It shall contain the + values for the keys: + - "collection": The Mongo DB collection to write to + - "filter": The query filter to use in the update process + - "path": The dot separated keys which targets the object to be updated + Defaults to None. + """ + + try: + detailed_status = await self._status_kdu( + cluster_id=cluster_id, + kdu_instance=kdu_instance, + yaml_format=False, + namespace=namespace, + ) + + status = detailed_status.get("info").get("description") + self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.") + + # write status to db + result = await self.write_app_status_to_db( + db_dict=db_dict, + status=str(status), + detailed_status=str(detailed_status), + operation=operation, + ) + + if not result: + self.log.info("Error writing in database. Task exiting...") + + except asyncio.CancelledError as e: + self.log.warning( + f"Exception in method {self._store_status.__name__} (task cancelled): {e}" + ) + except Exception as e: + self.log.warning(f"Exception in method {self._store_status.__name__}: {e}") # params for use in -f file # returns values file option and filename (in order to delete it at the end) @@ -1833,7 +1981,7 @@ class K8sHelmBaseConnector(K8sConnector): for key in params: value = params.get(key) if "!!yaml" in str(value): - value = yaml.load(value[7:]) + value = yaml.safe_load(value[7:]) params2[key] = value values_file = get_random_number() + ".yaml" @@ -1899,20 +2047,55 @@ class K8sHelmBaseConnector(K8sConnector): def _split_version(self, kdu_model: str) -> (str, str): version = None - if ":" in kdu_model: + if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model: parts = kdu_model.split(sep=":") if len(parts) == 2: version = str(parts[1]) kdu_model = parts[0] return kdu_model, version - async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: - repo_url = None + def _split_repo(self, kdu_model: str) -> (str, str): + """Obtain the Helm Chart's repository and Chart's names from the KDU model + + Args: + kdu_model (str): Associated KDU model + + Returns: + (str, str): Tuple with the Chart name in index 0, and the repo name + in index 2; if there was a problem finding them, return None + for both + """ + + chart_name = None + repo_name = None + idx = kdu_model.find("/") if idx >= 0: + chart_name = kdu_model[idx + 1 :] repo_name = kdu_model[:idx] + + return chart_name, repo_name + + async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: + """Obtain the Helm repository for an Helm Chart + + Args: + kdu_model (str): the KDU model associated with the Helm Chart instantiation + cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation + + Returns: + str: the repository URL; if Helm Chart is a local one, the function returns None + """ + + _, repo_name = self._split_repo(kdu_model=kdu_model) + + repo_url = None + if repo_name: # Find repository link local_repo_list = await self.repo_list(cluster_uuid) for repo in local_repo_list: - repo_url = repo["url"] if repo["name"] == repo_name else None + if repo["name"] == repo_name: + repo_url = repo["url"] + break # it is not necessary to continue the loop if the repo link was found... + return repo_url