X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=a897e0e52ae9bba2d0f9f41cc586a775d4df220f;hb=afde3be17b3f596a3d7996b1ebaf50c027bf624e;hp=f0a049bb38a98a042716775f6ad4ad5180b7d3f9;hpb=b46f88d2ce319c7661dc6064c8c76d020e314fb6;p=osm%2FN2VC.git diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index f0a049b..a897e0e 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -34,6 +34,7 @@ from uuid import uuid4 from n2vc.config import EnvironConfig from n2vc.exceptions import K8sException from n2vc.k8s_conn import K8sConnector +from n2vc.kubectl import Kubectl class K8sHelmBaseConnector(K8sConnector): @@ -90,6 +91,9 @@ class K8sHelmBaseConnector(K8sConnector): if self._stable_repo_url == "None": self._stable_repo_url = None + # Lock to avoid concurrent execution of helm commands + self.cmd_lock = asyncio.Lock() + def _get_namespace(self, cluster_uuid: str) -> str: """ Obtains the namespace used by the cluster with the uuid passed by argument @@ -401,9 +405,9 @@ class K8sHelmBaseConnector(K8sConnector): # version kdu_model, version = self._split_version(kdu_model) - repo = self._split_repo(kdu_model) + _, repo = self._split_repo(kdu_model) if repo: - self.repo_update(cluster_id, repo) + await self.repo_update(cluster_id, repo) command = self._get_install_command( kdu_model, @@ -446,7 +450,6 @@ class K8sHelmBaseConnector(K8sConnector): output, rc = exec_task.result() else: - output, rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -478,6 +481,8 @@ class K8sHelmBaseConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + namespace: str = None, + force: bool = False, ): self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) @@ -485,9 +490,13 @@ class K8sHelmBaseConnector(K8sConnector): self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace - instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) - if not instance_info: - raise K8sException("kdu_instance {} not found".format(kdu_instance)) + + # set namespace + if not namespace: + instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) + if not instance_info: + raise K8sException("kdu_instance {} not found".format(kdu_instance)) + namespace = instance_info["namespace"] # init env, paths paths, env = self._init_paths_env( @@ -505,25 +514,25 @@ class K8sHelmBaseConnector(K8sConnector): # version kdu_model, version = self._split_version(kdu_model) - repo = self._split_repo(kdu_model) + _, repo = self._split_repo(kdu_model) if repo: - self.repo_update(cluster_uuid, repo) + await self.repo_update(cluster_uuid, repo) command = self._get_upgrade_command( kdu_model, kdu_instance, - instance_info["namespace"], + namespace, params_str, version, atomic, timeout, paths["kube_config"], + force, ) self.log.debug("upgrading: {}".format(command)) if atomic: - # exec helm in a task exec_task = asyncio.ensure_future( coro_or_future=self._local_async_exec( @@ -535,7 +544,7 @@ class K8sHelmBaseConnector(K8sConnector): coro_or_future=self._store_status( cluster_id=cluster_uuid, kdu_instance=kdu_instance, - namespace=instance_info["namespace"], + namespace=namespace, db_dict=db_dict, operation="upgrade", ) @@ -549,7 +558,6 @@ class K8sHelmBaseConnector(K8sConnector): output, rc = exec_task.result() else: - output, rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -562,7 +570,7 @@ class K8sHelmBaseConnector(K8sConnector): await self._store_status( cluster_id=cluster_uuid, kdu_instance=kdu_instance, - namespace=instance_info["namespace"], + namespace=namespace, db_dict=db_dict, operation="upgrade", ) @@ -739,7 +747,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException("kdu_instance {} not found".format(kdu_instance)) # init env, paths - paths, env = self._init_paths_env( + paths, _ = self._init_paths_env( cluster_name=cluster_uuid, create_if_not_exist=True ) @@ -750,8 +758,15 @@ class K8sHelmBaseConnector(K8sConnector): resource_name=resource_name, ) + self.log.debug( + f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}" + ) + # Get default value if scale count is not found from provided values - if not replicas: + # Important note: this piece of code shall only be executed in the first scaling operation, + # since it is expected that the _get_replica_count_instance is able to obtain the number of + # replicas when a scale operation was already conducted previously for this KDU/resource! + if replicas is None: repo_url = await self._find_repo( kdu_model=kdu_model, cluster_uuid=cluster_uuid ) @@ -759,10 +774,15 @@ class K8sHelmBaseConnector(K8sConnector): kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name ) - if not replicas: - msg = "Replica count not found. Cannot be scaled" - self.log.error(msg) - raise K8sException(msg) + self.log.debug( + f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource " + f"{resource_name} obtained: {replicas}" + ) + + if replicas is None: + msg = "Replica count not found. Cannot be scaled" + self.log.error(msg) + raise K8sException(msg) return int(replicas) @@ -1025,7 +1045,6 @@ class K8sHelmBaseConnector(K8sConnector): async def get_service( self, cluster_uuid: str, service_name: str, namespace: str ) -> object: - self.log.debug( "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( service_name, namespace, cluster_uuid @@ -1108,7 +1127,6 @@ class K8sHelmBaseConnector(K8sConnector): async def get_values_kdu( self, kdu_instance: str, namespace: str, kubeconfig: str ) -> str: - self.log.debug("get kdu_instance values {}".format(kdu_instance)) return await self._exec_get_command( @@ -1140,7 +1158,6 @@ class K8sHelmBaseConnector(K8sConnector): ) async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug( "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) ) @@ -1150,7 +1167,6 @@ class K8sHelmBaseConnector(K8sConnector): ) async def synchronize_repos(self, cluster_uuid: str): - self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid)) try: db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid) @@ -1310,7 +1326,24 @@ class K8sHelmBaseConnector(K8sConnector): resource_name, kubeconfig, ) -> str: - """Obtain command to be executed to upgrade the indicated instance.""" + """Generates the command to scale a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + scale (int): Scale count + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + replica_str (str): The key under resource_name key where the scale count is stored + timeout (float): The time, in seconds, to wait + resource_name (str): The KDU's resource to scale + kubeconfig (str): Kubeconfig file path + + Returns: + str: command to scale a Helm Chart release + """ @abc.abstractmethod def _get_upgrade_command( @@ -1323,9 +1356,23 @@ class K8sHelmBaseConnector(K8sConnector): atomic, timeout, kubeconfig, + force, ) -> str: - """ - Obtain command to be executed to upgrade the indicated instance + """Generates the command to upgrade a Helm Chart release + + Args: + kdu_model (str): Kdu model name, corresponding to the Helm local location or repository + kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question + namespace (str): Namespace where this KDU instance is deployed + params_str (str): Params used to upgrade the Helm Chart release + version (str): Constraint with specific version of the Chart to use + atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. + The --wait flag will be set automatically if --atomic is used + timeout (float): The time, in seconds, to wait + kubeconfig (str): Kubeconfig file path + force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. + Returns: + str: command to upgrade a Helm Chart release """ @abc.abstractmethod @@ -1492,7 +1539,6 @@ class K8sHelmBaseConnector(K8sConnector): encode_utf8: bool = False, env: dict = None, ) -> (str, int): - command = K8sHelmBaseConnector._remove_multiple_spaces(command) self.log.debug( "Executing async local command: {}, env: {}".format(command, env) @@ -1506,17 +1552,18 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=environ, - ) + async with self.cmd_lock: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=environ, + ) - # wait for command terminate - stdout, stderr = await process.communicate() + # wait for command terminate + stdout, stderr = await process.communicate() - return_code = process.returncode + return_code = process.returncode output = "" if stdout: @@ -1543,6 +1590,9 @@ class K8sHelmBaseConnector(K8sConnector): return output, return_code except asyncio.CancelledError: + # first, kill the process if it is still running + if process.returncode is None: + process.kill() raise except K8sException: raise @@ -1563,7 +1613,6 @@ class K8sHelmBaseConnector(K8sConnector): encode_utf8: bool = False, env: dict = None, ): - command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) command = "{} | {}".format(command1, command2) @@ -1580,16 +1629,19 @@ class K8sHelmBaseConnector(K8sConnector): environ.update(env) try: - read, write = os.pipe() - await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ) - os.close(write) - process_2 = await asyncio.create_subprocess_exec( - *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ - ) - os.close(read) - stdout, stderr = await process_2.communicate() + async with self.cmd_lock: + read, write = os.pipe() + process_1 = await asyncio.create_subprocess_exec( + *command1, stdout=write, env=environ + ) + os.close(write) + process_2 = await asyncio.create_subprocess_exec( + *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ + ) + os.close(read) + stdout, stderr = await process_2.communicate() - return_code = process_2.returncode + return_code = process_2.returncode output = "" if stdout: @@ -1615,6 +1667,10 @@ class K8sHelmBaseConnector(K8sConnector): return output, return_code except asyncio.CancelledError: + # first, kill the processes if they are still running + for process in (process_1, process_2): + if process.returncode is None: + process.kill() raise except K8sException: raise @@ -1701,10 +1757,8 @@ class K8sHelmBaseConnector(K8sConnector): if repo_url: repo_str = " --repo {}".format(repo_url) - idx = kdu_model.find("/") - if idx >= 0: - idx += 1 - kdu_model = kdu_model[idx:] + # Obtain the Chart's name and store it in the var kdu_model + kdu_model, _ = self._split_repo(kdu_model=kdu_model) kdu_model, version = self._split_version(kdu_model) if version: @@ -1713,10 +1767,13 @@ class K8sHelmBaseConnector(K8sConnector): version_str = "" full_command = self._get_inspect_command( - inspect_command, kdu_model, repo_str, version_str + show_command=inspect_command, + kdu_model=kdu_model, + repo_str=repo_str, + version=version_str, ) - output, _rc = await self._local_async_exec(command=full_command) + output, _ = await self._local_async_exec(command=full_command) return output @@ -1725,7 +1782,7 @@ class K8sHelmBaseConnector(K8sConnector): kdu_model: str, repo_url: str = None, resource_name: str = None, - ): + ) -> (int, str): """Get the replica count value in the Helm Chart Values. Args: @@ -1734,7 +1791,9 @@ class K8sHelmBaseConnector(K8sConnector): resource_name: Resource name Returns: - True if replicas, False replicaCount + A tuple with: + - The number of replicas of the specific instance; if not found, returns None; and + - The string corresponding to the replica count key in the Helm values """ kdu_values = yaml.load( @@ -1742,6 +1801,8 @@ class K8sHelmBaseConnector(K8sConnector): Loader=yaml.SafeLoader, ) + self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}") + if not kdu_values: raise K8sException( "kdu_values not found for kdu_model {}".format(kdu_model) @@ -1762,10 +1823,10 @@ class K8sHelmBaseConnector(K8sConnector): replica_str = "" replicas = None - if kdu_values.get("replicaCount", None): + if kdu_values.get("replicaCount") is not None: replicas = kdu_values["replicaCount"] replica_str = "replicaCount" - elif kdu_values.get("replicas", None): + elif kdu_values.get("replicas") is not None: duplicate_check = True replicas = kdu_values["replicas"] replica_str = "replicas" @@ -1804,7 +1865,7 @@ class K8sHelmBaseConnector(K8sConnector): namespace: str, kubeconfig: str, resource_name: str = None, - ): + ) -> int: """Get the replica count value in the instance. Args: @@ -1814,7 +1875,7 @@ class K8sHelmBaseConnector(K8sConnector): resource_name: Resource name Returns: - True if replicas, False replicaCount + The number of replicas of the specific instance; if not found, returns None """ kdu_values = yaml.load( @@ -1822,23 +1883,23 @@ class K8sHelmBaseConnector(K8sConnector): Loader=yaml.SafeLoader, ) + self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}") + replicas = None if kdu_values: resource_values = ( kdu_values.get(resource_name, None) if resource_name else None ) - replicas = ( - ( - resource_values.get("replicaCount", None) - or resource_values.get("replicas", None) - ) - if resource_values - else ( - kdu_values.get("replicaCount", None) - or kdu_values.get("replicas", None) - ) - ) + + for replica_str in ("replicaCount", "replicas"): + if resource_values: + replicas = resource_values.get(replica_str) + else: + replicas = kdu_values.get(replica_str) + + if replicas is not None: + break return replicas @@ -1897,12 +1958,11 @@ class K8sHelmBaseConnector(K8sConnector): # params for use in -f file # returns values file option and filename (in order to delete it at the end) def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str): - if params and len(params) > 0: self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True) def get_random_number(): - r = random.randrange(start=1, stop=99999999) + r = random.SystemRandom().randint(1, 99999999) s = str(r) while len(s) < 10: s = "0" + s @@ -1968,7 +2028,7 @@ class K8sHelmBaseConnector(K8sConnector): name += "-" def get_random_number(): - r = random.randrange(start=1, stop=99999999) + r = random.SystemRandom().randint(1, 99999999) s = str(r) s = s.rjust(10, "0") return s @@ -1985,12 +2045,27 @@ class K8sHelmBaseConnector(K8sConnector): kdu_model = parts[0] return kdu_model, version - async def _split_repo(self, kdu_model: str) -> str: + def _split_repo(self, kdu_model: str) -> (str, str): + """Obtain the Helm Chart's repository and Chart's names from the KDU model + + Args: + kdu_model (str): Associated KDU model + + Returns: + (str, str): Tuple with the Chart name in index 0, and the repo name + in index 2; if there was a problem finding them, return None + for both + """ + + chart_name = None repo_name = None + idx = kdu_model.find("/") if idx >= 0: + chart_name = kdu_model[idx + 1 :] repo_name = kdu_model[:idx] - return repo_name + + return chart_name, repo_name async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: """Obtain the Helm repository for an Helm Chart @@ -2003,12 +2078,160 @@ class K8sHelmBaseConnector(K8sConnector): str: the repository URL; if Helm Chart is a local one, the function returns None """ + _, repo_name = self._split_repo(kdu_model=kdu_model) + repo_url = None - idx = kdu_model.find("/") - if idx >= 0: - repo_name = kdu_model[:idx] + if repo_name: # Find repository link local_repo_list = await self.repo_list(cluster_uuid) for repo in local_repo_list: - repo_url = repo["url"] if repo["name"] == repo_name else None + if repo["name"] == repo_name: + repo_url = repo["url"] + break # it is not necessary to continue the loop if the repo link was found... + return repo_url + + async def create_certificate( + self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage + ): + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.create_certificate( + namespace=namespace, + name=name, + dns_prefix=dns_prefix, + secret_name=secret_name, + usages=[usage], + issuer_name="ca-issuer", + ) + + async def delete_certificate(self, cluster_uuid, namespace, certificate_name): + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.delete_certificate(namespace, certificate_name) + + async def create_namespace( + self, + namespace, + cluster_uuid, + labels, + ): + """ + Create a namespace in a specific cluster + + :param namespace: Namespace to be created + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param labels: Dictionary with labels for the new namespace + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.create_namespace( + name=namespace, + labels=labels, + ) + + async def delete_namespace( + self, + namespace, + cluster_uuid, + ): + """ + Delete a namespace in a specific cluster + + :param namespace: namespace to be deleted + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.delete_namespace( + name=namespace, + ) + + async def copy_secret_data( + self, + src_secret: str, + dst_secret: str, + cluster_uuid: str, + data_key: str, + src_namespace: str = "osm", + dst_namespace: str = "osm", + ): + """ + Copy a single key and value from an existing secret to a new one + + :param src_secret: name of the existing secret + :param dst_secret: name of the new secret + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param data_key: key of the existing secret to be copied + :param src_namespace: Namespace of the existing secret + :param dst_namespace: Namespace of the new secret + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + secret_data = await kubectl.get_secret_content( + name=src_secret, + namespace=src_namespace, + ) + # Only the corresponding data_key value needs to be copy + data = {data_key: secret_data.get(data_key)} + await kubectl.create_secret( + name=dst_secret, + data=data, + namespace=dst_namespace, + secret_type="Opaque", + ) + + async def setup_default_rbac( + self, + name, + namespace, + cluster_uuid, + api_groups, + resources, + verbs, + service_account, + ): + """ + Create a basic RBAC for a new namespace. + + :param name: name of both Role and Role Binding + :param namespace: K8s namespace + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param api_groups: Api groups to be allowed in Policy Rule + :param resources: Resources to be allowed in Policy Rule + :param verbs: Verbs to be allowed in Policy Rule + :param service_account: Service Account name used to bind the Role + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.create_role( + name=name, + labels={}, + namespace=namespace, + api_groups=api_groups, + resources=resources, + verbs=verbs, + ) + await kubectl.create_role_binding( + name=name, + labels={}, + namespace=namespace, + role_name=name, + sa_name=service_account, + )