X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=51eb4758a3496a3c6ae5e0897a1fdb6568af2528;hb=44bd0680258ca579736f85e5f5796bd7fc8b29c5;hp=703bd737cc5ced06b2021ae00267ca45d1baca79;hpb=b41de17df6282334088ffbd887fbc01e496e1797;p=osm%2FN2VC.git diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 703bd73..51eb475 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -21,6 +21,7 @@ ## import abc import asyncio +from typing import Union import random import time import shlex @@ -89,6 +90,9 @@ class K8sHelmBaseConnector(K8sConnector): if self._stable_repo_url == "None": self._stable_repo_url = None + # Lock to avoid concurrent execution of helm commands + self.cmd_lock = asyncio.Lock() + def _get_namespace(self, cluster_uuid: str) -> str: """ Obtains the namespace used by the cluster with the uuid passed by argument @@ -151,7 +155,14 @@ class K8sHelmBaseConnector(K8sConnector): return cluster_id, n2vc_installed_sw async def repo_add( - self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" + self, + cluster_uuid: str, + name: str, + url: str, + repo_type: str = "chart", + cert: str = None, + user: str = None, + password: str = None, ): self.log.debug( "Cluster {}, adding {} repository {}. URL: {}".format( @@ -167,22 +178,63 @@ class K8sHelmBaseConnector(K8sConnector): # sync local dir self.fs.sync(from_path=cluster_uuid) + # helm repo add name url + command = ("env KUBECONFIG={} {} repo add {} {}").format( + paths["kube_config"], self._helm_command, name, url + ) + + if cert: + temp_cert_file = os.path.join( + self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt" + ) + os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True) + with open(temp_cert_file, "w") as the_cert: + the_cert.write(cert) + command += " --ca-file {}".format(temp_cert_file) + + if user: + command += " --username={}".format(user) + + if password: + command += " --password={}".format(password) + + self.log.debug("adding repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) + # helm repo update - command = "env KUBECONFIG={} {} repo update".format( - paths["kube_config"], self._helm_command + command = "env KUBECONFIG={} {} repo update {}".format( + paths["kube_config"], self._helm_command, name ) self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) - # helm repo add name url - command = "env KUBECONFIG={} {} repo add {} {}".format( - paths["kube_config"], self._helm_command, name, url + # sync fs + self.fs.reverse_sync(from_path=cluster_uuid) + + async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"): + self.log.debug( + "Cluster {}, updating {} repository {}".format( + cluster_uuid, repo_type, name + ) ) - self.log.debug("adding repo: {}".format(command)) + + # init_env + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + # helm repo update + command = "{} repo update {}".format(self._helm_command, name) + self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env + command=command, raise_exception_on_error=False, env=env ) # sync fs @@ -322,6 +374,9 @@ class K8sHelmBaseConnector(K8sConnector): return True + def _is_helm_chart_a_file(self, chart_name: str): + return chart_name.count("/") > 1 + async def _install_impl( self, cluster_id: str, @@ -349,6 +404,10 @@ class K8sHelmBaseConnector(K8sConnector): # version kdu_model, version = self._split_version(kdu_model) + repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_id, repo) + command = self._get_install_command( kdu_model, kdu_instance, @@ -378,7 +437,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=False, ) ) @@ -407,8 +465,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=True, - check_every=0, ) if rc != 0: @@ -452,6 +508,10 @@ class K8sHelmBaseConnector(K8sConnector): # version kdu_model, version = self._split_version(kdu_model) + repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_uuid, repo) + command = self._get_upgrade_command( kdu_model, kdu_instance, @@ -481,7 +541,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="upgrade", - run_once=False, ) ) @@ -509,8 +568,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="upgrade", - run_once=True, - check_every=0, ) if rc != 0: @@ -585,10 +642,6 @@ class K8sHelmBaseConnector(K8sConnector): kdu_model, version = self._split_version(kdu_model) repo_url = await self._find_repo(kdu_model, cluster_uuid) - if not repo_url: - raise K8sException( - "Repository not found for kdu_model {}".format(kdu_model) - ) _, replica_str = await self._get_replica_count_url( kdu_model, repo_url, resource_name @@ -624,7 +677,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="scale", - run_once=False, ) ) @@ -647,8 +699,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="scale", - run_once=True, - check_every=0, ) if rc != 0: @@ -675,7 +725,7 @@ class K8sHelmBaseConnector(K8sConnector): cluster_uuid: The UUID of the cluster resource_name: Resource name kdu_instance: KDU instance name - kdu_model: The name or path of a bundle + kdu_model: The name or path of an Helm Chart kwargs: Additional parameters Returns: @@ -697,19 +747,19 @@ class K8sHelmBaseConnector(K8sConnector): ) replicas = await self._get_replica_count_instance( - kdu_instance, instance_info["namespace"], paths["kube_config"] + kdu_instance=kdu_instance, + namespace=instance_info["namespace"], + kubeconfig=paths["kube_config"], + resource_name=resource_name, ) # Get default value if scale count is not found from provided values if not replicas: - repo_url = await self._find_repo(kdu_model, cluster_uuid) - if not repo_url: - raise K8sException( - "Repository not found for kdu_model {}".format(kdu_model) - ) - + repo_url = await self._find_repo( + kdu_model=kdu_model, cluster_uuid=cluster_uuid + ) replicas, _ = await self._get_replica_count_url( - kdu_model, repo_url, resource_name + kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name ) if not replicas: @@ -764,7 +814,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=False, ) ) @@ -783,8 +832,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=True, - check_every=0, ) if rc != 0: @@ -881,6 +928,28 @@ class K8sHelmBaseConnector(K8sConnector): self.log.debug("Instance {} not found".format(kdu_instance)) return None + async def upgrade_charm( + self, + ee_id: str = None, + path: str = None, + charm_id: str = None, + charm_type: str = None, + timeout: float = None, + ) -> str: + """This method upgrade charms in VNFs + + Args: + ee_id: Execution environment id + path: Local path to the charm + charm_id: charm-id + charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm + timeout: (Float) Timeout for the ns update operation + + Returns: + The output of the update operation if status equals to "completed" + """ + raise K8sException("KDUs deployed with Helm do not support charm upgrade") + async def exec_primitive( self, cluster_uuid: str = None, @@ -976,7 +1045,9 @@ class K8sHelmBaseConnector(K8sConnector): return service - async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str: + async def status_kdu( + self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs + ) -> Union[str, dict]: """ This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve the _composition_ (i.e. K8s objects) and _specific @@ -986,6 +1057,8 @@ class K8sHelmBaseConnector(K8sConnector): :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance :param kwargs: Additional parameters (None yet) + :param yaml_format: if the return shall be returned as an YAML string or as a + dictionary :return: If successful, it will return the following vector of arguments: - K8s `namespace` in the cluster where the KDU lives - `state` of the KDU instance. It can be: @@ -1026,8 +1099,8 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance["namespace"], + yaml_format=yaml_format, show_error_log=True, - return_text=True, ) # sync fs @@ -1049,6 +1122,15 @@ class K8sHelmBaseConnector(K8sConnector): ) async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: + """Method to obtain the Helm Chart package's values + + Args: + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + + Returns: + str: the values of the Helm Chart package + """ self.log.debug( "inspect kdu_model values {} from (optional) repo: {}".format( @@ -1102,9 +1184,19 @@ class K8sHelmBaseConnector(K8sConnector): # add repo self.log.debug("add repo {}".format(db_repo["name"])) - await self.repo_add( - cluster_uuid, db_repo["name"], db_repo["url"] - ) + if "ca_cert" in db_repo: + await self.repo_add( + cluster_uuid, + db_repo["name"], + db_repo["url"], + cert=db_repo["ca_cert"], + ) + else: + await self.repo_add( + cluster_uuid, + db_repo["name"], + db_repo["url"], + ) added_repo_dict[repo_id] = db_repo["name"] except Exception as e: raise K8sException( @@ -1184,9 +1276,9 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: """ Implements the helm version dependent method to obtain status of a helm instance """ @@ -1221,7 +1313,24 @@ class K8sHelmBaseConnector(K8sConnector): resource_name, kubeconfig, ) -> str: - """Obtain command to be executed to upgrade the indicated instance.""" + """Generates the command to scale a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + scale (int): Scale count + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + replica_str (str): The key under resource_name key where the scale count is stored + timeout (float): The time, in seconds, to wait + resource_name (str): The KDU's resource to scale + kubeconfig (str): Kubeconfig file path + + Returns: + str: command to scale a Helm Chart release + """ @abc.abstractmethod def _get_upgrade_command( @@ -1235,8 +1344,21 @@ class K8sHelmBaseConnector(K8sConnector): timeout, kubeconfig, ) -> str: - """ - Obtain command to be executed to upgrade the indicated instance + """Generates the command to upgrade a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + params_str (str): Params used to upgrade the Helm Chart release + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + timeout (float): The time, in seconds, to wait + kubeconfig (str): Kubeconfig file path + + Returns: + str: command to upgrade a Helm Chart release """ @abc.abstractmethod @@ -1259,8 +1381,17 @@ class K8sHelmBaseConnector(K8sConnector): def _get_inspect_command( self, show_command: str, kdu_model: str, repo_str: str, version: str ): - """ - Obtain command to be executed to obtain information about the kdu + """Generates the command to obtain the information about an Helm Chart package + (´helm show ...´ command) + + Args: + show_command: the second part of the command (`helm show `) + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + version: constraint with specific version of the Chart to use + + Returns: + str: the generated Helm Chart command """ @abc.abstractmethod @@ -1408,17 +1539,18 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=environ, - ) + async with self.cmd_lock: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=environ, + ) - # wait for command terminate - stdout, stderr = await process.communicate() + # wait for command terminate + stdout, stderr = await process.communicate() - return_code = process.returncode + return_code = process.returncode output = "" if stdout: @@ -1482,16 +1614,19 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - read, write = os.pipe() - await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ) - os.close(write) - process_2 = await asyncio.create_subprocess_exec( - *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ - ) - os.close(read) - stdout, stderr = await process_2.communicate() + async with self.cmd_lock: + read, write = os.pipe() + await asyncio.create_subprocess_exec( + *command1, stdout=write, env=environ + ) + os.close(write) + process_2 = await asyncio.create_subprocess_exec( + *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ + ) + os.close(read) + stdout, stderr = await process_2.communicate() - return_code = process_2.returncode + return_code = process_2.returncode output = "" if stdout: @@ -1588,7 +1723,16 @@ class K8sHelmBaseConnector(K8sConnector): async def _exec_inspect_command( self, inspect_command: str, kdu_model: str, repo_url: str = None ): - """Obtains information about a kdu, no cluster (no env).""" + """Obtains information about an Helm Chart package (´helm show´ command) + + Args: + inspect_command: the Helm sub command (`helm show ...`) + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + + Returns: + str: the requested info about the Helm Chart package + """ repo_str = "" if repo_url: @@ -1616,13 +1760,13 @@ class K8sHelmBaseConnector(K8sConnector): async def _get_replica_count_url( self, kdu_model: str, - repo_url: str, + repo_url: str = None, resource_name: str = None, ): """Get the replica count value in the Helm Chart Values. Args: - kdu_model: The name or path of a bundle + kdu_model: The name or path of an Helm Chart repo_url: Helm Chart repository url resource_name: Resource name @@ -1631,7 +1775,8 @@ class K8sHelmBaseConnector(K8sConnector): """ kdu_values = yaml.load( - await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader + await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url), + Loader=yaml.SafeLoader, ) if not kdu_values: @@ -1740,42 +1885,51 @@ class K8sHelmBaseConnector(K8sConnector): operation: str, kdu_instance: str, namespace: str = None, - check_every: float = 10, db_dict: dict = None, - run_once: bool = False, - ): - while True: - try: - await asyncio.sleep(check_every) - detailed_status = await self._status_kdu( - cluster_id=cluster_id, - kdu_instance=kdu_instance, - namespace=namespace, - return_text=False, - ) - status = detailed_status.get("info").get("description") - self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) - # write status to db - result = await self.write_app_status_to_db( - db_dict=db_dict, - status=str(status), - detailed_status=str(detailed_status), - operation=operation, - ) - if not result: - self.log.info("Error writing in database. Task exiting...") - return - except asyncio.CancelledError: - self.log.debug("Task cancelled") - return - except Exception as e: - self.log.debug( - "_store_status exception: {}".format(str(e)), exc_info=True - ) - pass - finally: - if run_once: - return + ) -> None: + """ + Obtains the status of the KDU instance based on Helm Charts, and stores it in the database. + + :param cluster_id (str): the cluster where the KDU instance is deployed + :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade") + :param kdu_instance (str): The KDU instance in relation to which the status is obtained + :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None + :param db_dict (dict): A dictionary with the database necessary information. It shall contain the + values for the keys: + - "collection": The Mongo DB collection to write to + - "filter": The query filter to use in the update process + - "path": The dot separated keys which targets the object to be updated + Defaults to None. + """ + + try: + detailed_status = await self._status_kdu( + cluster_id=cluster_id, + kdu_instance=kdu_instance, + yaml_format=False, + namespace=namespace, + ) + + status = detailed_status.get("info").get("description") + self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.") + + # write status to db + result = await self.write_app_status_to_db( + db_dict=db_dict, + status=str(status), + detailed_status=str(detailed_status), + operation=operation, + ) + + if not result: + self.log.info("Error writing in database. Task exiting...") + + except asyncio.CancelledError as e: + self.log.warning( + f"Exception in method {self._store_status.__name__} (task cancelled): {e}" + ) + except Exception as e: + self.log.warning(f"Exception in method {self._store_status.__name__}: {e}") # params for use in -f file # returns values file option and filename (in order to delete it at the end) @@ -1795,7 +1949,7 @@ class K8sHelmBaseConnector(K8sConnector): for key in params: value = params.get(key) if "!!yaml" in str(value): - value = yaml.load(value[7:]) + value = yaml.safe_load(value[7:]) params2[key] = value values_file = get_random_number() + ".yaml" @@ -1861,14 +2015,31 @@ class K8sHelmBaseConnector(K8sConnector): def _split_version(self, kdu_model: str) -> (str, str): version = None - if ":" in kdu_model: + if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model: parts = kdu_model.split(sep=":") if len(parts) == 2: version = str(parts[1]) kdu_model = parts[0] return kdu_model, version + def _split_repo(self, kdu_model: str) -> str: + repo_name = None + idx = kdu_model.find("/") + if idx >= 0: + repo_name = kdu_model[:idx] + return repo_name + async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: + """Obtain the Helm repository for an Helm Chart + + Args: + kdu_model (str): the KDU model associated with the Helm Chart instantiation + cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation + + Returns: + str: the repository URL; if Helm Chart is a local one, the function returns None + """ + repo_url = None idx = kdu_model.find("/") if idx >= 0: