X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=afb7bc23814befc2e5483308d3eb89595aae46a0;hp=273b206ddb13eb8027773db228e8f45b1884bc02;hb=0fcb6feed111b9eb210c03c49287ce114355b994;hpb=867418c142ece1ef0e4c9e083bc747c1f3d13a3c diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 273b206..afb7bc2 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -21,6 +21,7 @@ ## import abc import asyncio +from typing import Union import random import time import shlex @@ -89,14 +90,21 @@ class K8sHelmBaseConnector(K8sConnector): if self._stable_repo_url == "None": self._stable_repo_url = None - @staticmethod - def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): + # Lock to avoid concurrent execution of helm commands + self.cmd_lock = asyncio.Lock() + + def _get_namespace(self, cluster_uuid: str) -> str: """ - Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only - cluster_id for backward compatibility + Obtains the namespace used by the cluster with the uuid passed by argument + + param: cluster_uuid: cluster's uuid """ - namespace, _, cluster_id = cluster_uuid.rpartition(":") - return namespace, cluster_id + + # first, obtain the cluster corresponding to the uuid passed by argument + k8scluster = self.db.get_one( + "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False + ) + return k8scluster.get("namespace") async def init_env( self, @@ -120,11 +128,9 @@ class K8sHelmBaseConnector(K8sConnector): """ if reuse_cluster_uuid: - namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid) - namespace = namespace_ or namespace + cluster_id = reuse_cluster_uuid else: cluster_id = str(uuid4()) - cluster_uuid = "{}:{}".format(namespace, cluster_id) self.log.debug( "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) @@ -146,46 +152,93 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Cluster {} initialized".format(cluster_id)) - return cluster_uuid, n2vc_installed_sw + return cluster_id, n2vc_installed_sw async def repo_add( - self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" + self, + cluster_uuid: str, + name: str, + url: str, + repo_type: str = "chart", + cert: str = None, + user: str = None, + password: str = None, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "Cluster {}, adding {} repository {}. URL: {}".format( - cluster_id, repo_type, name, url + cluster_uuid, repo_type, name, url ) ) # init_env paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) + + # helm repo add name url + command = ("env KUBECONFIG={} {} repo add {} {}").format( + paths["kube_config"], self._helm_command, name, url + ) + + if cert: + temp_cert_file = os.path.join( + self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt" + ) + os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True) + with open(temp_cert_file, "w") as the_cert: + the_cert.write(cert) + command += " --ca-file {}".format(temp_cert_file) + + if user: + command += " --username={}".format(user) + + if password: + command += " --password={}".format(password) + + self.log.debug("adding repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) # helm repo update - command = "env KUBECONFIG={} {} repo update".format( - paths["kube_config"], self._helm_command + command = "env KUBECONFIG={} {} repo update {}".format( + paths["kube_config"], self._helm_command, name ) self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) - # helm repo add name url - command = "env KUBECONFIG={} {} repo add {} {}".format( - paths["kube_config"], self._helm_command, name, url + # sync fs + self.fs.reverse_sync(from_path=cluster_uuid) + + async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"): + self.log.debug( + "Cluster {}, updating {} repository {}".format( + cluster_uuid, repo_type, name + ) ) - self.log.debug("adding repo: {}".format(command)) + + # init_env + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + # helm repo update + command = "{} repo update {}".format(self._helm_command, name) + self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env + command=command, raise_exception_on_error=False, env=env ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def repo_list(self, cluster_uuid: str) -> list: """ @@ -194,16 +247,15 @@ class K8sHelmBaseConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list repositories for cluster {}".format(cluster_id)) + self.log.debug("list repositories for cluster {}".format(cluster_uuid)) # config filename paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = "env KUBECONFIG={} {} repo list --output yaml".format( paths["kube_config"], self._helm_command @@ -215,7 +267,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) if _rc == 0: if output and len(output) > 0: @@ -228,17 +280,17 @@ class K8sHelmBaseConnector(K8sConnector): return [] async def repo_remove(self, cluster_uuid: str, name: str): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id)) + self.log.debug( + "remove {} repositories for cluster {}".format(name, cluster_uuid) + ) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = "env KUBECONFIG={} {} repo remove {}".format( paths["kube_config"], self._helm_command, name @@ -248,7 +300,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def reset( self, @@ -267,15 +319,15 @@ class K8sHelmBaseConnector(K8sConnector): :param kwargs: Additional parameters (None yet) :return: Returns True if successful or raises an exception. """ - namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + namespace = self._get_namespace(cluster_uuid=cluster_uuid) self.log.debug( "Resetting K8s environment. cluster uuid: {} uninstall={}".format( - cluster_id, uninstall_sw + cluster_uuid, uninstall_sw ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # uninstall releases if needed. if uninstall_sw: @@ -304,24 +356,27 @@ class K8sHelmBaseConnector(K8sConnector): else: msg = ( "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" - ).format(cluster_id) + ).format(cluster_uuid) self.log.warn(msg) uninstall_sw = ( False # Allow to remove k8s cluster without removing Tiller ) if uninstall_sw: - await self._uninstall_sw(cluster_id, namespace) + await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace) # delete cluster directory - self.log.debug("Removing directory {}".format(cluster_id)) - self.fs.file_delete(cluster_id, ignore_non_exist=True) + self.log.debug("Removing directory {}".format(cluster_uuid)) + self.fs.file_delete(cluster_uuid, ignore_non_exist=True) # Remove also local directorio if still exist - direct = self.fs.path + "/" + cluster_id + direct = self.fs.path + "/" + cluster_uuid shutil.rmtree(direct, ignore_errors=True) return True + def _is_helm_chart_a_file(self, chart_name: str): + return chart_name.count("/") > 1 + async def _install_impl( self, cluster_id: str, @@ -349,6 +404,10 @@ class K8sHelmBaseConnector(K8sConnector): # version kdu_model, version = self._split_version(kdu_model) + _, repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_id, repo) + command = self._get_install_command( kdu_model, kdu_instance, @@ -378,7 +437,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=False, ) ) @@ -407,8 +465,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=True, - check_every=0, ) if rc != 0: @@ -426,11 +482,10 @@ class K8sHelmBaseConnector(K8sConnector): params: dict = None, db_dict: dict = None, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id)) + self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -439,20 +494,24 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # params to str params_str, file_to_delete = self._params_to_file_option( - cluster_id=cluster_id, params=params + cluster_id=cluster_uuid, params=params ) # version kdu_model, version = self._split_version(kdu_model) + _, repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_uuid, repo) + command = self._get_upgrade_command( kdu_model, kdu_instance, @@ -477,12 +536,11 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="upgrade", - run_once=False, ) ) @@ -505,13 +563,11 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="upgrade", - run_once=True, - check_every=0, ) if rc != 0: @@ -520,7 +576,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -563,12 +619,10 @@ class K8sHelmBaseConnector(K8sConnector): True if successful, False otherwise """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - - debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_id) + debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid) if resource_name: debug_mgs = "scaling resource {} in model {} (cluster {})".format( - resource_name, kdu_model, cluster_id + resource_name, kdu_model, cluster_uuid ) self.log.debug(debug_mgs) @@ -581,17 +635,13 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # version kdu_model, version = self._split_version(kdu_model) repo_url = await self._find_repo(kdu_model, cluster_uuid) - if not repo_url: - raise K8sException( - "Repository not found for kdu_model {}".format(kdu_model) - ) _, replica_str = await self._get_replica_count_url( kdu_model, repo_url, resource_name @@ -622,12 +672,11 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="scale", - run_once=False, ) ) @@ -645,13 +694,11 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="scale", - run_once=True, - check_every=0, ) if rc != 0: @@ -660,7 +707,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return True @@ -678,16 +725,15 @@ class K8sHelmBaseConnector(K8sConnector): cluster_uuid: The UUID of the cluster resource_name: Resource name kdu_instance: KDU instance name - kdu_model: The name or path of a bundle + kdu_model: The name or path of an Helm Chart kwargs: Additional parameters Returns: Resource instance count """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "getting scale count for {} in cluster {}".format(kdu_model, cluster_id) + "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid) ) # look for instance to obtain namespace @@ -697,23 +743,23 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) replicas = await self._get_replica_count_instance( - kdu_instance, instance_info["namespace"], paths["kube_config"] + kdu_instance=kdu_instance, + namespace=instance_info["namespace"], + kubeconfig=paths["kube_config"], + resource_name=resource_name, ) # Get default value if scale count is not found from provided values if not replicas: - repo_url = await self._find_repo(kdu_model, cluster_uuid) - if not repo_url: - raise K8sException( - "Repository not found for kdu_model {}".format(kdu_model) - ) - + repo_url = await self._find_repo( + kdu_model=kdu_model, cluster_uuid=cluster_uuid + ) replicas, _ = await self._get_replica_count_url( - kdu_model, repo_url, resource_name + kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name ) if not replicas: @@ -726,16 +772,14 @@ class K8sHelmBaseConnector(K8sConnector): async def rollback( self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "rollback kdu_instance {} to revision {} from cluster {}".format( - kdu_instance, revision, cluster_id + kdu_instance, revision, cluster_uuid ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -744,11 +788,11 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = self._get_rollback_command( kdu_instance, instance_info["namespace"], revision, paths["kube_config"] @@ -765,12 +809,11 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=False, ) ) @@ -784,13 +827,11 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=True, - check_every=0, ) if rc != 0: @@ -799,7 +840,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -824,13 +865,14 @@ class K8sHelmBaseConnector(K8sConnector): :return: True if successful """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id) + "uninstall kdu_instance {} from cluster {}".format( + kdu_instance, cluster_uuid + ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -839,11 +881,11 @@ class K8sHelmBaseConnector(K8sConnector): return True # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) command = self._get_uninstall_command( kdu_instance, instance_info["namespace"], paths["kube_config"] @@ -853,7 +895,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return self._output_to_table(output) @@ -865,17 +907,16 @@ class K8sHelmBaseConnector(K8sConnector): :return: """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list releases for cluster {}".format(cluster_id)) + self.log.debug("list releases for cluster {}".format(cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # execute internal command - result = await self._instances_list(cluster_id) + result = await self._instances_list(cluster_uuid) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return result @@ -887,6 +928,28 @@ class K8sHelmBaseConnector(K8sConnector): self.log.debug("Instance {} not found".format(kdu_instance)) return None + async def upgrade_charm( + self, + ee_id: str = None, + path: str = None, + charm_id: str = None, + charm_type: str = None, + timeout: float = None, + ) -> str: + """This method upgrade charms in VNFs + + Args: + ee_id: Execution environment id + path: Local path to the charm + charm_id: charm-id + charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm + timeout: (Float) Timeout for the ns update operation + + Returns: + The output of the update operation if status equals to "completed" + """ + raise K8sException("KDUs deployed with Helm do not support charm upgrade") + async def exec_primitive( self, cluster_uuid: str = None, @@ -933,7 +996,6 @@ class K8sHelmBaseConnector(K8sConnector): - `external_ip` List of external ips (in case they are available) """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "get_services: cluster_uuid: {}, kdu_instance: {}".format( cluster_uuid, kdu_instance @@ -942,24 +1004,24 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get list of services names for kdu service_names = await self._get_services( - cluster_id, kdu_instance, namespace, paths["kube_config"] + cluster_uuid, kdu_instance, namespace, paths["kube_config"] ) service_list = [] for service in service_names: - service = await self._get_service(cluster_id, service, namespace) + service = await self._get_service(cluster_uuid, service, namespace) service_list.append(service) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service_list @@ -973,19 +1035,19 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) - service = await self._get_service(cluster_id, service_name, namespace) + service = await self._get_service(cluster_uuid, service_name, namespace) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service - async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str: + async def status_kdu( + self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs + ) -> Union[str, dict]: """ This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve the _composition_ (i.e. K8s objects) and _specific @@ -995,6 +1057,8 @@ class K8sHelmBaseConnector(K8sConnector): :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance :param kwargs: Additional parameters (None yet) + :param yaml_format: if the return shall be returned as an YAML string or as a + dictionary :return: If successful, it will return the following vector of arguments: - K8s `namespace` in the cluster where the KDU lives - `state` of the KDU instance. It can be: @@ -1015,13 +1079,11 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get instance: needed to obtain namespace - instances = await self._instances_list(cluster_id=cluster_id) + instances = await self._instances_list(cluster_id=cluster_uuid) for instance in instances: if instance.get("name") == kdu_instance: break @@ -1029,20 +1091,20 @@ class K8sHelmBaseConnector(K8sConnector): # instance does not exist raise K8sException( "Instance name: {} not found in cluster: {}".format( - kdu_instance, cluster_id + kdu_instance, cluster_uuid ) ) status = await self._status_kdu( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance["namespace"], + yaml_format=yaml_format, show_error_log=True, - return_text=True, ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return status @@ -1060,6 +1122,15 @@ class K8sHelmBaseConnector(K8sConnector): ) async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: + """Method to obtain the Helm Chart package's values + + Args: + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + + Returns: + str: the values of the Helm Chart package + """ self.log.debug( "inspect kdu_model values {} from (optional) repo: {}".format( @@ -1113,9 +1184,19 @@ class K8sHelmBaseConnector(K8sConnector): # add repo self.log.debug("add repo {}".format(db_repo["name"])) - await self.repo_add( - cluster_uuid, db_repo["name"], db_repo["url"] - ) + if "ca_cert" in db_repo: + await self.repo_add( + cluster_uuid, + db_repo["name"], + db_repo["url"], + cert=db_repo["ca_cert"], + ) + else: + await self.repo_add( + cluster_uuid, + db_repo["name"], + db_repo["url"], + ) added_repo_dict[repo_id] = db_repo["name"] except Exception as e: raise K8sException( @@ -1195,9 +1276,9 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: """ Implements the helm version dependent method to obtain status of a helm instance """ @@ -1270,8 +1351,17 @@ class K8sHelmBaseConnector(K8sConnector): def _get_inspect_command( self, show_command: str, kdu_model: str, repo_str: str, version: str ): - """ - Obtain command to be executed to obtain information about the kdu + """Generates the command to obtain the information about an Helm Chart package + (´helm show ...´ command) + + Args: + show_command: the second part of the command (`helm show `) + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + version: constraint with specific version of the Chart to use + + Returns: + str: the generated Helm Chart command """ @abc.abstractmethod @@ -1419,17 +1509,18 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=environ, - ) + async with self.cmd_lock: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=environ, + ) - # wait for command terminate - stdout, stderr = await process.communicate() + # wait for command terminate + stdout, stderr = await process.communicate() - return_code = process.returncode + return_code = process.returncode output = "" if stdout: @@ -1456,6 +1547,9 @@ class K8sHelmBaseConnector(K8sConnector): return output, return_code except asyncio.CancelledError: + # first, kill the process if it is still running + if process.returncode is None: + process.kill() raise except K8sException: raise @@ -1493,16 +1587,19 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - read, write = os.pipe() - await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ) - os.close(write) - process_2 = await asyncio.create_subprocess_exec( - *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ - ) - os.close(read) - stdout, stderr = await process_2.communicate() + async with self.cmd_lock: + read, write = os.pipe() + process_1 = await asyncio.create_subprocess_exec( + *command1, stdout=write, env=environ + ) + os.close(write) + process_2 = await asyncio.create_subprocess_exec( + *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ + ) + os.close(read) + stdout, stderr = await process_2.communicate() - return_code = process_2.returncode + return_code = process_2.returncode output = "" if stdout: @@ -1528,6 +1625,10 @@ class K8sHelmBaseConnector(K8sConnector): return output, return_code except asyncio.CancelledError: + # first, kill the processes if they are still running + for process in (process_1, process_2): + if process.returncode is None: + process.kill() raise except K8sException: raise @@ -1599,16 +1700,23 @@ class K8sHelmBaseConnector(K8sConnector): async def _exec_inspect_command( self, inspect_command: str, kdu_model: str, repo_url: str = None ): - """Obtains information about a kdu, no cluster (no env).""" + """Obtains information about an Helm Chart package (´helm show´ command) + + Args: + inspect_command: the Helm sub command (`helm show ...`) + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + + Returns: + str: the requested info about the Helm Chart package + """ repo_str = "" if repo_url: repo_str = " --repo {}".format(repo_url) - idx = kdu_model.find("/") - if idx >= 0: - idx += 1 - kdu_model = kdu_model[idx:] + # Obtain the Chart's name and store it in the var kdu_model + kdu_model, _ = self._split_repo(kdu_model=kdu_model) kdu_model, version = self._split_version(kdu_model) if version: @@ -1617,23 +1725,26 @@ class K8sHelmBaseConnector(K8sConnector): version_str = "" full_command = self._get_inspect_command( - inspect_command, kdu_model, repo_str, version_str + show_command=inspect_command, + kdu_model=kdu_model, + repo_str=repo_str, + version=version_str, ) - output, _rc = await self._local_async_exec(command=full_command) + output, _ = await self._local_async_exec(command=full_command) return output async def _get_replica_count_url( self, kdu_model: str, - repo_url: str, + repo_url: str = None, resource_name: str = None, ): """Get the replica count value in the Helm Chart Values. Args: - kdu_model: The name or path of a bundle + kdu_model: The name or path of an Helm Chart repo_url: Helm Chart repository url resource_name: Resource name @@ -1642,7 +1753,8 @@ class K8sHelmBaseConnector(K8sConnector): """ kdu_values = yaml.load( - await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader + await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url), + Loader=yaml.SafeLoader, ) if not kdu_values: @@ -1751,42 +1863,51 @@ class K8sHelmBaseConnector(K8sConnector): operation: str, kdu_instance: str, namespace: str = None, - check_every: float = 10, db_dict: dict = None, - run_once: bool = False, - ): - while True: - try: - await asyncio.sleep(check_every) - detailed_status = await self._status_kdu( - cluster_id=cluster_id, - kdu_instance=kdu_instance, - namespace=namespace, - return_text=False, - ) - status = detailed_status.get("info").get("description") - self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) - # write status to db - result = await self.write_app_status_to_db( - db_dict=db_dict, - status=str(status), - detailed_status=str(detailed_status), - operation=operation, - ) - if not result: - self.log.info("Error writing in database. Task exiting...") - return - except asyncio.CancelledError: - self.log.debug("Task cancelled") - return - except Exception as e: - self.log.debug( - "_store_status exception: {}".format(str(e)), exc_info=True - ) - pass - finally: - if run_once: - return + ) -> None: + """ + Obtains the status of the KDU instance based on Helm Charts, and stores it in the database. + + :param cluster_id (str): the cluster where the KDU instance is deployed + :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade") + :param kdu_instance (str): The KDU instance in relation to which the status is obtained + :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None + :param db_dict (dict): A dictionary with the database necessary information. It shall contain the + values for the keys: + - "collection": The Mongo DB collection to write to + - "filter": The query filter to use in the update process + - "path": The dot separated keys which targets the object to be updated + Defaults to None. + """ + + try: + detailed_status = await self._status_kdu( + cluster_id=cluster_id, + kdu_instance=kdu_instance, + yaml_format=False, + namespace=namespace, + ) + + status = detailed_status.get("info").get("description") + self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.") + + # write status to db + result = await self.write_app_status_to_db( + db_dict=db_dict, + status=str(status), + detailed_status=str(detailed_status), + operation=operation, + ) + + if not result: + self.log.info("Error writing in database. Task exiting...") + + except asyncio.CancelledError as e: + self.log.warning( + f"Exception in method {self._store_status.__name__} (task cancelled): {e}" + ) + except Exception as e: + self.log.warning(f"Exception in method {self._store_status.__name__}: {e}") # params for use in -f file # returns values file option and filename (in order to delete it at the end) @@ -1806,7 +1927,7 @@ class K8sHelmBaseConnector(K8sConnector): for key in params: value = params.get(key) if "!!yaml" in str(value): - value = yaml.load(value[7:]) + value = yaml.safe_load(value[7:]) params2[key] = value values_file = get_random_number() + ".yaml" @@ -1872,20 +1993,55 @@ class K8sHelmBaseConnector(K8sConnector): def _split_version(self, kdu_model: str) -> (str, str): version = None - if ":" in kdu_model: + if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model: parts = kdu_model.split(sep=":") if len(parts) == 2: version = str(parts[1]) kdu_model = parts[0] return kdu_model, version - async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: - repo_url = None + def _split_repo(self, kdu_model: str) -> (str, str): + """Obtain the Helm Chart's repository and Chart's names from the KDU model + + Args: + kdu_model (str): Associated KDU model + + Returns: + (str, str): Tuple with the Chart name in index 0, and the repo name + in index 2; if there was a problem finding them, return None + for both + """ + + chart_name = None + repo_name = None + idx = kdu_model.find("/") if idx >= 0: + chart_name = kdu_model[idx + 1 :] repo_name = kdu_model[:idx] + + return chart_name, repo_name + + async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: + """Obtain the Helm repository for an Helm Chart + + Args: + kdu_model (str): the KDU model associated with the Helm Chart instantiation + cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation + + Returns: + str: the repository URL; if Helm Chart is a local one, the function returns None + """ + + _, repo_name = self._split_repo(kdu_model=kdu_model) + + repo_url = None + if repo_name: # Find repository link local_repo_list = await self.repo_list(cluster_uuid) for repo in local_repo_list: - repo_url = repo["url"] if repo["name"] == repo_name else None + if repo["name"] == repo_name: + repo_url = repo["url"] + break # it is not necessary to continue the loop if the repo link was found... + return repo_url