X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=e494a76ce8807d729139203966aff2f6610c0e1f;hp=b3d7bda3c83f40812ccf03d2a196ea247099dc48;hb=HEAD;hpb=82b591ceed704c798ead2d9104085a08e75b511b diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index b3d7bda..5f004b3 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -21,18 +21,22 @@ ## import abc import asyncio +from typing import Union +from shlex import quote import random import time import shlex import shutil import stat -import subprocess import os import yaml from uuid import uuid4 +from urllib.parse import urlparse +from n2vc.config import EnvironConfig from n2vc.exceptions import K8sException from n2vc.k8s_conn import K8sConnector +from n2vc.kubectl import Kubectl class K8sHelmBaseConnector(K8sConnector): @@ -44,7 +48,6 @@ class K8sHelmBaseConnector(K8sConnector): """ service_account = "osm" - _STABLE_REPO_URL = "https://charts.helm.sh/stable" def __init__( self, @@ -54,7 +57,6 @@ class K8sHelmBaseConnector(K8sConnector): helm_command: str = "/usr/bin/helm", log: object = None, on_update_db=None, - vca_config: dict = None, ): """ @@ -71,6 +73,7 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Initializing K8S Helm connector") + self.config = EnvironConfig() # random numbers for release name generation random.seed(time.time()) @@ -86,19 +89,25 @@ class K8sHelmBaseConnector(K8sConnector): self._check_file_exists(filename=helm_command, exception_if_not_exists=True) # obtain stable repo url from config or apply default - if not vca_config or not vca_config.get("stablerepourl"): - self._stable_repo_url = self._STABLE_REPO_URL - else: - self._stable_repo_url = vca_config.get("stablerepourl") + self._stable_repo_url = self.config.get("stablerepourl") + if self._stable_repo_url == "None": + self._stable_repo_url = None - @staticmethod - def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): + # Lock to avoid concurrent execution of helm commands + self.cmd_lock = asyncio.Lock() + + def _get_namespace(self, cluster_uuid: str) -> str: """ - Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only - cluster_id for backward compatibility + Obtains the namespace used by the cluster with the uuid passed by argument + + param: cluster_uuid: cluster's uuid """ - namespace, _, cluster_id = cluster_uuid.rpartition(":") - return namespace, cluster_id + + # first, obtain the cluster corresponding to the uuid passed by argument + k8scluster = self.db.get_one( + "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False + ) + return k8scluster.get("namespace") async def init_env( self, @@ -106,7 +115,7 @@ class K8sHelmBaseConnector(K8sConnector): namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs, - ) -> (str, bool): + ) -> tuple[str, bool]: """ It prepares a given K8s cluster environment to run Charts @@ -122,11 +131,9 @@ class K8sHelmBaseConnector(K8sConnector): """ if reuse_cluster_uuid: - namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid) - namespace = namespace_ or namespace + cluster_id = reuse_cluster_uuid else: cluster_id = str(uuid4()) - cluster_uuid = "{}:{}".format(namespace, cluster_id) self.log.debug( "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) @@ -148,42 +155,108 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Cluster {} initialized".format(cluster_id)) - return cluster_uuid, n2vc_installed_sw + return cluster_id, n2vc_installed_sw async def repo_add( - self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" + self, + cluster_uuid: str, + name: str, + url: str, + repo_type: str = "chart", + cert: str = None, + user: str = None, + password: str = None, + oci: bool = False, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "Cluster {}, adding {} repository {}. URL: {}".format( - cluster_id, repo_type, name, url + cluster_uuid, repo_type, name, url ) ) + # init_env + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) + + if oci: + if user and password: + host_port = urlparse(url).netloc if url.startswith("oci://") else url + # helm registry login url + command = "env KUBECONFIG={} {} registry login {}".format( + paths["kube_config"], self._helm_command, quote(host_port) + ) + else: + self.log.debug( + "OCI registry login is not needed for repo: {}".format(name) + ) + return + else: + # helm repo add name url + command = "env KUBECONFIG={} {} repo add {} {}".format( + paths["kube_config"], self._helm_command, quote(name), quote(url) + ) + + if cert: + temp_cert_file = os.path.join( + self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt" + ) + os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True) + with open(temp_cert_file, "w") as the_cert: + the_cert.write(cert) + command += " --ca-file {}".format(quote(temp_cert_file)) + + if user: + command += " --username={}".format(quote(user)) + + if password: + command += " --password={}".format(quote(password)) + + self.log.debug("adding repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) + + if not oci: + # helm repo update + command = "env KUBECONFIG={} {} repo update {}".format( + paths["kube_config"], self._helm_command, quote(name) + ) + self.log.debug("updating repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + + # sync fs + self.fs.reverse_sync(from_path=cluster_uuid) + + async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"): + self.log.debug( + "Cluster {}, updating {} repository {}".format( + cluster_uuid, repo_type, name + ) + ) # init_env paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + # helm repo update - command = "{} repo update".format(self._helm_command) + command = "{} repo update {}".format(self._helm_command, quote(name)) self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) - # helm repo add name url - command = "{} repo add {} {}".format(self._helm_command, name, url) - self.log.debug("adding repo: {}".format(command)) - await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env - ) - # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def repo_list(self, cluster_uuid: str) -> list: """ @@ -192,18 +265,19 @@ class K8sHelmBaseConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list repositories for cluster {}".format(cluster_id)) - - # sync local dir - self.fs.sync(from_path=cluster_id) + self.log.debug("list repositories for cluster {}".format(cluster_uuid)) # config filename paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) - command = "{} repo list --output yaml".format(self._helm_command) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + command = "env KUBECONFIG={} {} repo list --output yaml".format( + paths["kube_config"], self._helm_command + ) # Set exception to false because if there are no repos just want an empty list output, _rc = await self._local_async_exec( @@ -211,7 +285,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) if _rc == 0: if output and len(output) > 0: @@ -224,25 +298,27 @@ class K8sHelmBaseConnector(K8sConnector): return [] async def repo_remove(self, cluster_uuid: str, name: str): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id)) - - # sync local dir - self.fs.sync(from_path=cluster_id) + self.log.debug( + "remove {} repositories for cluster {}".format(name, cluster_uuid) + ) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) - command = "{} repo remove {}".format(self._helm_command, name) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + command = "env KUBECONFIG={} {} repo remove {}".format( + paths["kube_config"], self._helm_command, quote(name) + ) await self._local_async_exec( command=command, raise_exception_on_error=True, env=env ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def reset( self, @@ -261,15 +337,15 @@ class K8sHelmBaseConnector(K8sConnector): :param kwargs: Additional parameters (None yet) :return: Returns True if successful or raises an exception. """ - namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + namespace = self._get_namespace(cluster_uuid=cluster_uuid) self.log.debug( "Resetting K8s environment. cluster uuid: {} uninstall={}".format( - cluster_id, uninstall_sw + cluster_uuid, uninstall_sw ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # uninstall releases if needed. if uninstall_sw: @@ -298,24 +374,32 @@ class K8sHelmBaseConnector(K8sConnector): else: msg = ( "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" - ).format(cluster_id) + ).format(cluster_uuid) self.log.warn(msg) uninstall_sw = ( False # Allow to remove k8s cluster without removing Tiller ) if uninstall_sw: - await self._uninstall_sw(cluster_id, namespace) + await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace) # delete cluster directory - self.log.debug("Removing directory {}".format(cluster_id)) - self.fs.file_delete(cluster_id, ignore_non_exist=True) + self.log.debug("Removing directory {}".format(cluster_uuid)) + self.fs.file_delete(cluster_uuid, ignore_non_exist=True) # Remove also local directorio if still exist - direct = self.fs.path + "/" + cluster_id + direct = self.fs.path + "/" + cluster_uuid shutil.rmtree(direct, ignore_errors=True) return True + def _is_helm_chart_a_file(self, chart_name: str): + return chart_name.count("/") > 1 + + @staticmethod + def _is_helm_chart_a_url(chart_name: str): + result = urlparse(chart_name) + return all([result.scheme, result.netloc]) + async def _install_impl( self, cluster_id: str, @@ -330,21 +414,27 @@ class K8sHelmBaseConnector(K8sConnector): kdu_name: str = None, namespace: str = None, ): + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + # params to str params_str, file_to_delete = self._params_to_file_option( cluster_id=cluster_id, params=params ) - # version - version = None - if ":" in kdu_model: - parts = kdu_model.split(sep=":") - if len(parts) == 2: - version = str(parts[1]) - kdu_model = parts[0] + kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id) command = self._get_install_command( - kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + paths["kube_config"], ) self.log.debug("installing: {}".format(command)) @@ -365,7 +455,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=False, ) ) @@ -378,7 +467,6 @@ class K8sHelmBaseConnector(K8sConnector): output, rc = exec_task.result() else: - output, rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -394,8 +482,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=True, - check_every=0, ) if rc != 0: @@ -412,50 +498,53 @@ class K8sHelmBaseConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + namespace: str = None, + force: bool = False, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id)) + self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace - instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) - if not instance_info: - raise K8sException("kdu_instance {} not found".format(kdu_instance)) + + # set namespace + if not namespace: + instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) + if not instance_info: + raise K8sException("kdu_instance {} not found".format(kdu_instance)) + namespace = instance_info["namespace"] # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + # params to str params_str, file_to_delete = self._params_to_file_option( - cluster_id=cluster_id, params=params + cluster_id=cluster_uuid, params=params ) - # version - version = None - if ":" in kdu_model: - parts = kdu_model.split(sep=":") - if len(parts) == 2: - version = str(parts[1]) - kdu_model = parts[0] + kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid) command = self._get_upgrade_command( kdu_model, kdu_instance, - instance_info["namespace"], + namespace, params_str, version, atomic, timeout, + paths["kube_config"], + force, ) self.log.debug("upgrading: {}".format(command)) if atomic: - # exec helm in a task exec_task = asyncio.ensure_future( coro_or_future=self._local_async_exec( @@ -465,12 +554,11 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, - namespace=instance_info["namespace"], + namespace=namespace, db_dict=db_dict, operation="upgrade", - run_once=False, ) ) @@ -482,7 +570,6 @@ class K8sHelmBaseConnector(K8sConnector): output, rc = exec_task.result() else: - output, rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -493,13 +580,11 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, - namespace=instance_info["namespace"], + namespace=namespace, db_dict=db_dict, operation="upgrade", - run_once=True, - check_every=0, ) if rc != 0: @@ -508,7 +593,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -527,31 +612,203 @@ class K8sHelmBaseConnector(K8sConnector): scale: int, resource_name: str, total_timeout: float = 1800, + cluster_uuid: str = None, + kdu_model: str = None, + atomic: bool = True, + db_dict: dict = None, **kwargs, ): - raise NotImplementedError("Method not implemented") + """Scale a resource in a Helm Chart. + + Args: + kdu_instance: KDU instance name + scale: Scale to which to set the resource + resource_name: Resource name + total_timeout: The time, in seconds, to wait + cluster_uuid: The UUID of the cluster + kdu_model: The chart reference + atomic: if set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + db_dict: Dictionary for any additional data + kwargs: Additional parameters + + Returns: + True if successful, False otherwise + """ + + debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid) + if resource_name: + debug_mgs = "scaling resource {} in model {} (cluster {})".format( + resource_name, kdu_model, cluster_uuid + ) + + self.log.debug(debug_mgs) + + # look for instance to obtain namespace + # get_instance_info function calls the sync command + instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) + if not instance_info: + raise K8sException("kdu_instance {} not found".format(kdu_instance)) + + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + # version + kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid) + + repo_url = await self._find_repo(kdu_model, cluster_uuid) + + _, replica_str = await self._get_replica_count_url( + kdu_model, repo_url, resource_name + ) + + command = self._get_upgrade_scale_command( + kdu_model, + kdu_instance, + instance_info["namespace"], + scale, + version, + atomic, + replica_str, + total_timeout, + resource_name, + paths["kube_config"], + ) + + self.log.debug("scaling: {}".format(command)) + + if atomic: + # exec helm in a task + exec_task = asyncio.ensure_future( + coro_or_future=self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + ) + # write status in another task + status_task = asyncio.ensure_future( + coro_or_future=self._store_status( + cluster_id=cluster_uuid, + kdu_instance=kdu_instance, + namespace=instance_info["namespace"], + db_dict=db_dict, + operation="scale", + ) + ) + + # wait for execution task + await asyncio.wait([exec_task]) + + # cancel status task + status_task.cancel() + output, rc = exec_task.result() + + else: + output, rc = await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + + # write final status + await self._store_status( + cluster_id=cluster_uuid, + kdu_instance=kdu_instance, + namespace=instance_info["namespace"], + db_dict=db_dict, + operation="scale", + ) + + if rc != 0: + msg = "Error executing command: {}\nOutput: {}".format(command, output) + self.log.error(msg) + raise K8sException(msg) + + # sync fs + self.fs.reverse_sync(from_path=cluster_uuid) + + return True async def get_scale_count( self, resource_name: str, kdu_instance: str, + cluster_uuid: str, + kdu_model: str, **kwargs, - ): - raise NotImplementedError("Method not implemented") + ) -> int: + """Get a resource scale count. + + Args: + cluster_uuid: The UUID of the cluster + resource_name: Resource name + kdu_instance: KDU instance name + kdu_model: The name or path of an Helm Chart + kwargs: Additional parameters + + Returns: + Resource instance count + """ + + self.log.debug( + "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid) + ) + + # look for instance to obtain namespace + instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) + if not instance_info: + raise K8sException("kdu_instance {} not found".format(kdu_instance)) + + # init env, paths + paths, _ = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + replicas = await self._get_replica_count_instance( + kdu_instance=kdu_instance, + namespace=instance_info["namespace"], + kubeconfig=paths["kube_config"], + resource_name=resource_name, + ) + + self.log.debug( + f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}" + ) + + # Get default value if scale count is not found from provided values + # Important note: this piece of code shall only be executed in the first scaling operation, + # since it is expected that the _get_replica_count_instance is able to obtain the number of + # replicas when a scale operation was already conducted previously for this KDU/resource! + if replicas is None: + repo_url = await self._find_repo( + kdu_model=kdu_model, cluster_uuid=cluster_uuid + ) + replicas, _ = await self._get_replica_count_url( + kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name + ) + + self.log.debug( + f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource " + f"{resource_name} obtained: {replicas}" + ) + + if replicas is None: + msg = "Replica count not found. Cannot be scaled" + self.log.error(msg) + raise K8sException(msg) + + return int(replicas) async def rollback( self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "rollback kdu_instance {} to revision {} from cluster {}".format( - kdu_instance, revision, cluster_id + kdu_instance, revision, cluster_uuid ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -560,11 +817,14 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + command = self._get_rollback_command( - kdu_instance, instance_info["namespace"], revision + kdu_instance, instance_info["namespace"], revision, paths["kube_config"] ) self.log.debug("rolling_back: {}".format(command)) @@ -578,12 +838,11 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=False, ) ) @@ -597,13 +856,11 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=True, - check_every=0, ) if rc != 0: @@ -612,7 +869,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -637,31 +894,37 @@ class K8sHelmBaseConnector(K8sConnector): :return: True if successful """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id) + "uninstall kdu_instance {} from cluster {}".format( + kdu_instance, cluster_uuid + ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) if not instance_info: - raise K8sException("kdu_instance {} not found".format(kdu_instance)) - + self.log.warning(("kdu_instance {} not found".format(kdu_instance))) + return True # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) - command = self._get_uninstall_command(kdu_instance, instance_info["namespace"]) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + command = self._get_uninstall_command( + kdu_instance, instance_info["namespace"], paths["kube_config"] + ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=True, env=env ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return self._output_to_table(output) @@ -673,17 +936,16 @@ class K8sHelmBaseConnector(K8sConnector): :return: """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list releases for cluster {}".format(cluster_id)) + self.log.debug("list releases for cluster {}".format(cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # execute internal command - result = await self._instances_list(cluster_id) + result = await self._instances_list(cluster_uuid) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return result @@ -695,6 +957,28 @@ class K8sHelmBaseConnector(K8sConnector): self.log.debug("Instance {} not found".format(kdu_instance)) return None + async def upgrade_charm( + self, + ee_id: str = None, + path: str = None, + charm_id: str = None, + charm_type: str = None, + timeout: float = None, + ) -> str: + """This method upgrade charms in VNFs + + Args: + ee_id: Execution environment id + path: Local path to the charm + charm_id: charm-id + charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm + timeout: (Float) Timeout for the ns update operation + + Returns: + The output of the update operation if status equals to "completed" + """ + raise K8sException("KDUs deployed with Helm do not support charm upgrade") + async def exec_primitive( self, cluster_uuid: str = None, @@ -741,52 +1025,57 @@ class K8sHelmBaseConnector(K8sConnector): - `external_ip` List of external ips (in case they are available) """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "get_services: cluster_uuid: {}, kdu_instance: {}".format( cluster_uuid, kdu_instance ) ) + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get list of services names for kdu - service_names = await self._get_services(cluster_id, kdu_instance, namespace) + service_names = await self._get_services( + cluster_uuid, kdu_instance, namespace, paths["kube_config"] + ) service_list = [] for service in service_names: - service = await self._get_service(cluster_id, service, namespace) + service = await self._get_service(cluster_uuid, service, namespace) service_list.append(service) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service_list async def get_service( self, cluster_uuid: str, service_name: str, namespace: str ) -> object: - self.log.debug( "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( service_name, namespace, cluster_uuid ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) - service = await self._get_service(cluster_id, service_name, namespace) + service = await self._get_service(cluster_uuid, service_name, namespace) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service - async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str: + async def status_kdu( + self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs + ) -> Union[str, dict]: """ This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve the _composition_ (i.e. K8s objects) and _specific @@ -796,6 +1085,8 @@ class K8sHelmBaseConnector(K8sConnector): :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance :param kwargs: Additional parameters (None yet) + :param yaml_format: if the return shall be returned as an YAML string or as a + dictionary :return: If successful, it will return the following vector of arguments: - K8s `namespace` in the cluster where the KDU lives - `state` of the KDU instance. It can be: @@ -816,13 +1107,11 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get instance: needed to obtain namespace - instances = await self._instances_list(cluster_id=cluster_id) + instances = await self._instances_list(cluster_id=cluster_uuid) for instance in instances: if instance.get("name") == kdu_instance: break @@ -830,24 +1119,45 @@ class K8sHelmBaseConnector(K8sConnector): # instance does not exist raise K8sException( "Instance name: {} not found in cluster: {}".format( - kdu_instance, cluster_id + kdu_instance, cluster_uuid ) ) status = await self._status_kdu( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance["namespace"], + yaml_format=yaml_format, show_error_log=True, - return_text=True, ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return status + async def get_values_kdu( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: + self.log.debug("get kdu_instance values {}".format(kdu_instance)) + + return await self._exec_get_command( + get_command="values", + kdu_instance=kdu_instance, + namespace=namespace, + kubeconfig=kubeconfig, + ) + async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: + """Method to obtain the Helm Chart package's values + + Args: + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + + Returns: + str: the values of the Helm Chart package + """ self.log.debug( "inspect kdu_model values {} from (optional) repo: {}".format( @@ -855,22 +1165,20 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - return await self._exec_inspect_comand( + return await self._exec_inspect_command( inspect_command="values", kdu_model=kdu_model, repo_url=repo_url ) async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug( "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) ) - return await self._exec_inspect_comand( + return await self._exec_inspect_command( inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url ) async def synchronize_repos(self, cluster_uuid: str): - self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid)) try: db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid) @@ -902,7 +1210,13 @@ class K8sHelmBaseConnector(K8sConnector): # add repo self.log.debug("add repo {}".format(db_repo["name"])) await self.repo_add( - cluster_uuid, db_repo["name"], db_repo["url"] + cluster_uuid, + db_repo["name"], + db_repo["url"], + cert=db_repo.get("ca_cert"), + user=db_repo.get("user"), + password=db_repo.get("password"), + oci=db_repo.get("oci", False), ) added_repo_dict[repo_id] = db_repo["name"] except Exception as e: @@ -972,7 +1286,7 @@ class K8sHelmBaseConnector(K8sConnector): """ @abc.abstractmethod - async def _get_services(self, cluster_id, kdu_instance, namespace): + async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): """ Implements the helm version dependent method to obtain services from a helm instance """ @@ -983,37 +1297,104 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: """ Implements the helm version dependent method to obtain status of a helm instance """ @abc.abstractmethod def _get_install_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: """ Obtain command to be executed to delete the indicated instance """ @abc.abstractmethod - def _get_upgrade_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + def _get_upgrade_scale_command( + self, + kdu_model, + kdu_instance, + namespace, + count, + version, + atomic, + replicas, + timeout, + resource_name, + kubeconfig, ) -> str: + """Generates the command to scale a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + scale (int): Scale count + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + replica_str (str): The key under resource_name key where the scale count is stored + timeout (float): The time, in seconds, to wait + resource_name (str): The KDU's resource to scale + kubeconfig (str): Kubeconfig file path + + Returns: + str: command to scale a Helm Chart release """ - Obtain command to be executed to upgrade the indicated instance + + @abc.abstractmethod + def _get_upgrade_command( + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, + force, + ) -> str: + """Generates the command to upgrade a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + params_str (str): Params used to upgrade the Helm Chart release + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + timeout (float): The time, in seconds, to wait + kubeconfig (str): Kubeconfig file path + force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. + Returns: + str: command to upgrade a Helm Chart release """ @abc.abstractmethod - def _get_rollback_command(self, kdu_instance, namespace, revision) -> str: + def _get_rollback_command( + self, kdu_instance, namespace, revision, kubeconfig + ) -> str: """ Obtain command to be executed to rollback the indicated instance """ @abc.abstractmethod - def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: + def _get_uninstall_command( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: """ Obtain command to be executed to delete the indicated instance """ @@ -1022,9 +1403,24 @@ class K8sHelmBaseConnector(K8sConnector): def _get_inspect_command( self, show_command: str, kdu_model: str, repo_str: str, version: str ): + """Generates the command to obtain the information about an Helm Chart package + (´helm show ...´ command) + + Args: + show_command: the second part of the command (`helm show `) + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + version: constraint with specific version of the Chart to use + + Returns: + str: the generated Helm Chart command """ - Obtain command to be executed to obtain information about the kdu - """ + + @abc.abstractmethod + def _get_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + """Obtain command to be executed to get information about the kdu instance.""" @abc.abstractmethod async def _uninstall_sw(self, cluster_id: str, namespace: str): @@ -1137,27 +1533,12 @@ class K8sHelmBaseConnector(K8sConnector): of dictionaries """ new_list = [] - for dictionary in input_list: - new_dict = dict((k.lower(), v) for k, v in dictionary.items()) - new_list.append(new_dict) + if input_list: + for dictionary in input_list: + new_dict = dict((k.lower(), v) for k, v in dictionary.items()) + new_list.append(new_dict) return new_list - def _local_exec(self, command: str) -> (str, int): - command = self._remove_multiple_spaces(command) - self.log.debug("Executing sync local command: {}".format(command)) - # raise exception if fails - output = "" - try: - output = subprocess.check_output( - command, shell=True, universal_newlines=True - ) - return_code = 0 - self.log.debug(output) - except Exception: - return_code = 1 - - return output, return_code - async def _local_async_exec( self, command: str, @@ -1165,8 +1546,7 @@ class K8sHelmBaseConnector(K8sConnector): show_error_log: bool = True, encode_utf8: bool = False, env: dict = None, - ) -> (str, int): - + ) -> tuple[str, int]: command = K8sHelmBaseConnector._remove_multiple_spaces(command) self.log.debug( "Executing async local command: {}, env: {}".format(command, env) @@ -1180,17 +1560,18 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=environ, - ) + async with self.cmd_lock: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=environ, + ) - # wait for command terminate - stdout, stderr = await process.communicate() + # wait for command terminate + stdout, stderr = await process.communicate() - return_code = process.returncode + return_code = process.returncode output = "" if stdout: @@ -1217,6 +1598,9 @@ class K8sHelmBaseConnector(K8sConnector): return output, return_code except asyncio.CancelledError: + # first, kill the process if it is still running + if process.returncode is None: + process.kill() raise except K8sException: raise @@ -1237,7 +1621,6 @@ class K8sHelmBaseConnector(K8sConnector): encode_utf8: bool = False, env: dict = None, ): - command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) command = "{} | {}".format(command1, command2) @@ -1254,16 +1637,19 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - read, write = os.pipe() - await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ) - os.close(write) - process_2 = await asyncio.create_subprocess_exec( - *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ - ) - os.close(read) - stdout, stderr = await process_2.communicate() + async with self.cmd_lock: + read, write = os.pipe() + process_1 = await asyncio.create_subprocess_exec( + *command1, stdout=write, env=environ + ) + os.close(write) + process_2 = await asyncio.create_subprocess_exec( + *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ + ) + os.close(read) + stdout, stderr = await process_2.communicate() - return_code = process_2.returncode + return_code = process_2.returncode output = "" if stdout: @@ -1289,6 +1675,10 @@ class K8sHelmBaseConnector(K8sConnector): return output, return_code except asyncio.CancelledError: + # first, kill the processes if they are still running + for process in (process_1, process_2): + if process.returncode is None: + process.kill() raise except K8sException: raise @@ -1322,7 +1712,10 @@ class K8sHelmBaseConnector(K8sConnector): ) command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format( - self.kubectl_command, paths["kube_config"], namespace, service_name + self.kubectl_command, + paths["kube_config"], + quote(namespace), + quote(service_name), ) output, _rc = await self._local_async_exec( @@ -1344,90 +1737,243 @@ class K8sHelmBaseConnector(K8sConnector): return service - async def _exec_inspect_comand( + async def _exec_get_command( + self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str + ): + """Obtains information about the kdu instance.""" + + full_command = self._get_get_command( + get_command, kdu_instance, namespace, kubeconfig + ) + + output, _rc = await self._local_async_exec(command=full_command) + + return output + + async def _exec_inspect_command( self, inspect_command: str, kdu_model: str, repo_url: str = None ): - """ - Obtains information about a kdu, no cluster (no env) + """Obtains information about an Helm Chart package (´helm show´ command) + + Args: + inspect_command: the Helm sub command (`helm show ...`) + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + + Returns: + str: the requested info about the Helm Chart package """ repo_str = "" if repo_url: - repo_str = " --repo {}".format(repo_url) + repo_str = " --repo {}".format(quote(repo_url)) - idx = kdu_model.find("/") - if idx >= 0: - idx += 1 - kdu_model = kdu_model[idx:] + # Obtain the Chart's name and store it in the var kdu_model + kdu_model, _ = self._split_repo(kdu_model=kdu_model) - version = "" - if ":" in kdu_model: - parts = kdu_model.split(sep=":") - if len(parts) == 2: - version = "--version {}".format(str(parts[1])) - kdu_model = parts[0] + kdu_model, version = self._split_version(kdu_model) + if version: + version_str = "--version {}".format(quote(version)) + else: + version_str = "" full_command = self._get_inspect_command( - inspect_command, kdu_model, repo_str, version - ) - output, _rc = await self._local_async_exec( - command=full_command, encode_utf8=True + show_command=inspect_command, + kdu_model=quote(kdu_model), + repo_str=repo_str, + version=version_str, ) + output, _ = await self._local_async_exec(command=full_command) + return output + async def _get_replica_count_url( + self, + kdu_model: str, + repo_url: str = None, + resource_name: str = None, + ) -> tuple[int, str]: + """Get the replica count value in the Helm Chart Values. + + Args: + kdu_model: The name or path of an Helm Chart + repo_url: Helm Chart repository url + resource_name: Resource name + + Returns: + A tuple with: + - The number of replicas of the specific instance; if not found, returns None; and + - The string corresponding to the replica count key in the Helm values + """ + + kdu_values = yaml.load( + await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url), + Loader=yaml.SafeLoader, + ) + + self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}") + + if not kdu_values: + raise K8sException( + "kdu_values not found for kdu_model {}".format(kdu_model) + ) + + if resource_name: + kdu_values = kdu_values.get(resource_name, None) + + if not kdu_values: + msg = "resource {} not found in the values in model {}".format( + resource_name, kdu_model + ) + self.log.error(msg) + raise K8sException(msg) + + duplicate_check = False + + replica_str = "" + replicas = None + + if kdu_values.get("replicaCount") is not None: + replicas = kdu_values["replicaCount"] + replica_str = "replicaCount" + elif kdu_values.get("replicas") is not None: + duplicate_check = True + replicas = kdu_values["replicas"] + replica_str = "replicas" + else: + if resource_name: + msg = ( + "replicaCount or replicas not found in the resource" + "{} values in model {}. Cannot be scaled".format( + resource_name, kdu_model + ) + ) + else: + msg = ( + "replicaCount or replicas not found in the values" + "in model {}. Cannot be scaled".format(kdu_model) + ) + self.log.error(msg) + raise K8sException(msg) + + # Control if replicas and replicaCount exists at the same time + msg = "replicaCount and replicas are exists at the same time" + if duplicate_check: + if "replicaCount" in kdu_values: + self.log.error(msg) + raise K8sException(msg) + else: + if "replicas" in kdu_values: + self.log.error(msg) + raise K8sException(msg) + + return replicas, replica_str + + async def _get_replica_count_instance( + self, + kdu_instance: str, + namespace: str, + kubeconfig: str, + resource_name: str = None, + ) -> int: + """Get the replica count value in the instance. + + Args: + kdu_instance: The name of the KDU instance + namespace: KDU instance namespace + kubeconfig: + resource_name: Resource name + + Returns: + The number of replicas of the specific instance; if not found, returns None + """ + + kdu_values = yaml.load( + await self.get_values_kdu(kdu_instance, namespace, kubeconfig), + Loader=yaml.SafeLoader, + ) + + self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}") + + replicas = None + + if kdu_values: + resource_values = ( + kdu_values.get(resource_name, None) if resource_name else None + ) + + for replica_str in ("replicaCount", "replicas"): + if resource_values: + replicas = resource_values.get(replica_str) + else: + replicas = kdu_values.get(replica_str) + + if replicas is not None: + break + + return replicas + async def _store_status( self, cluster_id: str, operation: str, kdu_instance: str, namespace: str = None, - check_every: float = 10, db_dict: dict = None, - run_once: bool = False, - ): - while True: - try: - await asyncio.sleep(check_every) - detailed_status = await self._status_kdu( - cluster_id=cluster_id, - kdu_instance=kdu_instance, - namespace=namespace, - return_text=False, - ) - status = detailed_status.get("info").get("description") - self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) - # write status to db - result = await self.write_app_status_to_db( - db_dict=db_dict, - status=str(status), - detailed_status=str(detailed_status), - operation=operation, - ) - if not result: - self.log.info("Error writing in database. Task exiting...") - return - except asyncio.CancelledError: - self.log.debug("Task cancelled") - return - except Exception as e: - self.log.debug( - "_store_status exception: {}".format(str(e)), exc_info=True - ) - pass - finally: - if run_once: - return + ) -> None: + """ + Obtains the status of the KDU instance based on Helm Charts, and stores it in the database. + + :param cluster_id (str): the cluster where the KDU instance is deployed + :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade") + :param kdu_instance (str): The KDU instance in relation to which the status is obtained + :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None + :param db_dict (dict): A dictionary with the database necessary information. It shall contain the + values for the keys: + - "collection": The Mongo DB collection to write to + - "filter": The query filter to use in the update process + - "path": The dot separated keys which targets the object to be updated + Defaults to None. + """ + + try: + detailed_status = await self._status_kdu( + cluster_id=cluster_id, + kdu_instance=kdu_instance, + yaml_format=False, + namespace=namespace, + ) + + status = detailed_status.get("info").get("description") + self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.") + + # write status to db + result = await self.write_app_status_to_db( + db_dict=db_dict, + status=str(status), + detailed_status=str(detailed_status), + operation=operation, + ) + + if not result: + self.log.info("Error writing in database. Task exiting...") + + except asyncio.CancelledError as e: + self.log.warning( + f"Exception in method {self._store_status.__name__} (task cancelled): {e}" + ) + except Exception as e: + self.log.warning(f"Exception in method {self._store_status.__name__}: {e}") # params for use in -f file # returns values file option and filename (in order to delete it at the end) - def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str): - + def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]: if params and len(params) > 0: self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True) def get_random_number(): - r = random.randrange(start=1, stop=99999999) + r = random.SystemRandom().randint(1, 99999999) s = str(r) while len(s) < 10: s = "0" + s @@ -1437,7 +1983,7 @@ class K8sHelmBaseConnector(K8sConnector): for key in params: value = params.get(key) if "!!yaml" in str(value): - value = yaml.load(value[7:]) + value = yaml.safe_load(value[7:]) params2[key] = value values_file = get_random_number() + ".yaml" @@ -1451,19 +1997,14 @@ class K8sHelmBaseConnector(K8sConnector): # params for use in --set option @staticmethod def _params_to_set_option(params: dict) -> str: - params_str = "" - if params and len(params) > 0: - start = True - for key in params: - value = params.get(key, None) - if value is not None: - if start: - params_str += "--set " - start = False - else: - params_str += "," - params_str += "{}={}".format(key, value) - return params_str + pairs = [ + f"{quote(str(key))}={quote(str(value))}" + for key, value in params.items() + if value is not None + ] + if not pairs: + return "" + return "--set " + ",".join(pairs) @staticmethod def generate_kdu_instance_name(**kwargs): @@ -1493,10 +2034,234 @@ class K8sHelmBaseConnector(K8sConnector): name += "-" def get_random_number(): - r = random.randrange(start=1, stop=99999999) + r = random.SystemRandom().randint(1, 99999999) s = str(r) s = s.rjust(10, "0") return s name = name + get_random_number() return name.lower() + + def _split_version(self, kdu_model: str) -> tuple[str, str]: + version = None + if ( + not ( + self._is_helm_chart_a_file(kdu_model) + or self._is_helm_chart_a_url(kdu_model) + ) + and ":" in kdu_model + ): + parts = kdu_model.split(sep=":") + if len(parts) == 2: + version = str(parts[1]) + kdu_model = parts[0] + return kdu_model, version + + def _split_repo(self, kdu_model: str) -> tuple[str, str]: + """Obtain the Helm Chart's repository and Chart's names from the KDU model + + Args: + kdu_model (str): Associated KDU model + + Returns: + (str, str): Tuple with the Chart name in index 0, and the repo name + in index 2; if there was a problem finding them, return None + for both + """ + + chart_name = None + repo_name = None + + idx = kdu_model.find("/") + if not self._is_helm_chart_a_url(kdu_model) and idx >= 0: + chart_name = kdu_model[idx + 1 :] + repo_name = kdu_model[:idx] + + return chart_name, repo_name + + async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: + """Obtain the Helm repository for an Helm Chart + + Args: + kdu_model (str): the KDU model associated with the Helm Chart instantiation + cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation + + Returns: + str: the repository URL; if Helm Chart is a local one, the function returns None + """ + + _, repo_name = self._split_repo(kdu_model=kdu_model) + + repo_url = None + if repo_name: + # Find repository link + local_repo_list = await self.repo_list(cluster_uuid) + for repo in local_repo_list: + if repo["name"] == repo_name: + repo_url = repo["url"] + break # it is not necessary to continue the loop if the repo link was found... + + return repo_url + + def _repo_to_oci_url(self, repo): + db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False) + if db_repo and "oci" in db_repo: + return db_repo.get("url") + + async def _prepare_helm_chart(self, kdu_model, cluster_id): + # e.g.: "stable/openldap", "1.0" + kdu_model, version = self._split_version(kdu_model) + # e.g.: "openldap, stable" + chart_name, repo = self._split_repo(kdu_model) + if repo and chart_name: # repo/chart case + oci_url = self._repo_to_oci_url(repo) + if oci_url: # oci does not require helm repo update + kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema + else: + await self.repo_update(cluster_id, repo) + return kdu_model, version + + async def create_certificate( + self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage + ): + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.create_certificate( + namespace=namespace, + name=name, + dns_prefix=dns_prefix, + secret_name=secret_name, + usages=[usage], + issuer_name="ca-issuer", + ) + + async def delete_certificate(self, cluster_uuid, namespace, certificate_name): + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.delete_certificate(namespace, certificate_name) + + async def create_namespace( + self, + namespace, + cluster_uuid, + labels, + ): + """ + Create a namespace in a specific cluster + + :param namespace: Namespace to be created + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param labels: Dictionary with labels for the new namespace + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.create_namespace( + name=namespace, + labels=labels, + ) + + async def delete_namespace( + self, + namespace, + cluster_uuid, + ): + """ + Delete a namespace in a specific cluster + + :param namespace: namespace to be deleted + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.delete_namespace( + name=namespace, + ) + + async def copy_secret_data( + self, + src_secret: str, + dst_secret: str, + cluster_uuid: str, + data_key: str, + src_namespace: str = "osm", + dst_namespace: str = "osm", + ): + """ + Copy a single key and value from an existing secret to a new one + + :param src_secret: name of the existing secret + :param dst_secret: name of the new secret + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param data_key: key of the existing secret to be copied + :param src_namespace: Namespace of the existing secret + :param dst_namespace: Namespace of the new secret + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + secret_data = await kubectl.get_secret_content( + name=src_secret, + namespace=src_namespace, + ) + # Only the corresponding data_key value needs to be copy + data = {data_key: secret_data.get(data_key)} + await kubectl.create_secret( + name=dst_secret, + data=data, + namespace=dst_namespace, + secret_type="Opaque", + ) + + async def setup_default_rbac( + self, + name, + namespace, + cluster_uuid, + api_groups, + resources, + verbs, + service_account, + ): + """ + Create a basic RBAC for a new namespace. + + :param name: name of both Role and Role Binding + :param namespace: K8s namespace + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param api_groups: Api groups to be allowed in Policy Rule + :param resources: Resources to be allowed in Policy Rule + :param verbs: Verbs to be allowed in Policy Rule + :param service_account: Service Account name used to bind the Role + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.create_role( + name=name, + labels={}, + namespace=namespace, + api_groups=api_groups, + resources=resources, + verbs=verbs, + ) + await kubectl.create_role_binding( + name=name, + labels={}, + namespace=namespace, + role_name=name, + sa_name=service_account, + )