X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=n2vc%2Fk8s_helm_base_conn.py;h=abf2d7e582391417df279403ad8a01d81cb1640f;hb=HEAD;hp=5588c3dc30f431be9ad00cc58f80ebc315d165e9;hpb=2b2dc52b95660e3b4a5564914aa1f490d88a2b9f;p=osm%2FN2VC.git diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 5588c3d..fb5aa09 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -22,6 +22,7 @@ import abc import asyncio from typing import Union +from shlex import quote import random import time import shlex @@ -30,6 +31,7 @@ import stat import os import yaml from uuid import uuid4 +from urllib.parse import urlparse from n2vc.config import EnvironConfig from n2vc.exceptions import K8sException @@ -86,6 +88,24 @@ class K8sHelmBaseConnector(K8sConnector): self._helm_command = helm_command self._check_file_exists(filename=helm_command, exception_if_not_exists=True) + # exception if main post renderer executable is not present + self.main_post_renderer_path = EnvironConfig(prefixes=["OSMLCM_"]).get( + "mainpostrendererpath" + ) + if self.main_post_renderer_path: + self._check_file_exists( + filename=self.main_post_renderer_path, exception_if_not_exists=True + ) + + # exception if podLabels post renderer executable is not present + self.podLabels_post_renderer_path = EnvironConfig(prefixes=["OSMLCM_"]).get( + "podlabelspostrendererpath" + ) + if self.podLabels_post_renderer_path: + self._check_file_exists( + filename=self.podLabels_post_renderer_path, exception_if_not_exists=True + ) + # obtain stable repo url from config or apply default self._stable_repo_url = self.config.get("stablerepourl") if self._stable_repo_url == "None": @@ -113,7 +133,7 @@ class K8sHelmBaseConnector(K8sConnector): namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs, - ) -> (str, bool): + ) -> tuple[str, bool]: """ It prepares a given K8s cluster environment to run Charts @@ -164,6 +184,7 @@ class K8sHelmBaseConnector(K8sConnector): cert: str = None, user: str = None, password: str = None, + oci: bool = False, ): self.log.debug( "Cluster {}, adding {} repository {}. URL: {}".format( @@ -179,10 +200,23 @@ class K8sHelmBaseConnector(K8sConnector): # sync local dir self.fs.sync(from_path=cluster_uuid) - # helm repo add name url - command = ("env KUBECONFIG={} {} repo add {} {}").format( - paths["kube_config"], self._helm_command, name, url - ) + if oci: + if user and password: + host_port = urlparse(url).netloc if url.startswith("oci://") else url + # helm registry login url + command = "env KUBECONFIG={} {} registry login {}".format( + paths["kube_config"], self._helm_command, quote(host_port) + ) + else: + self.log.debug( + "OCI registry login is not needed for repo: {}".format(name) + ) + return + else: + # helm repo add name url + command = "env KUBECONFIG={} {} repo add {} {}".format( + paths["kube_config"], self._helm_command, quote(name), quote(url) + ) if cert: temp_cert_file = os.path.join( @@ -191,27 +225,28 @@ class K8sHelmBaseConnector(K8sConnector): os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True) with open(temp_cert_file, "w") as the_cert: the_cert.write(cert) - command += " --ca-file {}".format(temp_cert_file) + command += " --ca-file {}".format(quote(temp_cert_file)) if user: - command += " --username={}".format(user) + command += " --username={}".format(quote(user)) if password: - command += " --password={}".format(password) + command += " --password={}".format(quote(password)) self.log.debug("adding repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=True, env=env ) - # helm repo update - command = "env KUBECONFIG={} {} repo update {}".format( - paths["kube_config"], self._helm_command, name - ) - self.log.debug("updating repo: {}".format(command)) - await self._local_async_exec( - command=command, raise_exception_on_error=False, env=env - ) + if not oci: + # helm repo update + command = "env KUBECONFIG={} {} repo update {}".format( + paths["kube_config"], self._helm_command, quote(name) + ) + self.log.debug("updating repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) # sync fs self.fs.reverse_sync(from_path=cluster_uuid) @@ -232,7 +267,7 @@ class K8sHelmBaseConnector(K8sConnector): self.fs.sync(from_path=cluster_uuid) # helm repo update - command = "{} repo update {}".format(self._helm_command, name) + command = "{} repo update {}".format(self._helm_command, quote(name)) self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=False, env=env @@ -294,7 +329,7 @@ class K8sHelmBaseConnector(K8sConnector): self.fs.sync(from_path=cluster_uuid) command = "env KUBECONFIG={} {} repo remove {}".format( - paths["kube_config"], self._helm_command, name + paths["kube_config"], self._helm_command, quote(name) ) await self._local_async_exec( command=command, raise_exception_on_error=True, env=env @@ -378,6 +413,11 @@ class K8sHelmBaseConnector(K8sConnector): def _is_helm_chart_a_file(self, chart_name: str): return chart_name.count("/") > 1 + @staticmethod + def _is_helm_chart_a_url(chart_name: str): + result = urlparse(chart_name) + return all([result.scheme, result.netloc]) + async def _install_impl( self, cluster_id: str, @@ -389,6 +429,7 @@ class K8sHelmBaseConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + labels: dict = None, kdu_name: str = None, namespace: str = None, ): @@ -402,17 +443,13 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id=cluster_id, params=params ) - # version - kdu_model, version = self._split_version(kdu_model) - - _, repo = self._split_repo(kdu_model) - if repo: - await self.repo_update(cluster_id, repo) + kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id) command = self._get_install_command( kdu_model, kdu_instance, namespace, + labels, params_str, version, atomic, @@ -482,6 +519,9 @@ class K8sHelmBaseConnector(K8sConnector): params: dict = None, db_dict: dict = None, namespace: str = None, + reset_values: bool = False, + reuse_values: bool = True, + reset_then_reuse_values: bool = False, force: bool = False, ): self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) @@ -511,22 +551,27 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id=cluster_uuid, params=params ) - # version - kdu_model, version = self._split_version(kdu_model) + kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid) - _, repo = self._split_repo(kdu_model) - if repo: - await self.repo_update(cluster_uuid, repo) + labels_dict = None + if db_dict and await self._contains_labels( + kdu_instance, namespace, paths["kube_config"], env + ): + labels_dict = await self._labels_dict(db_dict, kdu_instance) command = self._get_upgrade_command( kdu_model, kdu_instance, namespace, params_str, + labels_dict, version, atomic, timeout, paths["kube_config"], + reset_values, + reuse_values, + reset_then_reuse_values, force, ) @@ -644,7 +689,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # version - kdu_model, version = self._split_version(kdu_model) + kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid) repo_url = await self._find_repo(kdu_model, cluster_uuid) @@ -652,11 +697,18 @@ class K8sHelmBaseConnector(K8sConnector): kdu_model, repo_url, resource_name ) + labels_dict = None + if db_dict and await self._contains_labels( + kdu_instance, instance_info["namespace"], paths["kube_config"], env + ): + labels_dict = await self._labels_dict(db_dict, kdu_instance) + command = self._get_upgrade_scale_command( kdu_model, kdu_instance, instance_info["namespace"], scale, + labels_dict, version, atomic, replica_str, @@ -1197,19 +1249,15 @@ class K8sHelmBaseConnector(K8sConnector): # add repo self.log.debug("add repo {}".format(db_repo["name"])) - if "ca_cert" in db_repo: - await self.repo_add( - cluster_uuid, - db_repo["name"], - db_repo["url"], - cert=db_repo["ca_cert"], - ) - else: - await self.repo_add( - cluster_uuid, - db_repo["name"], - db_repo["url"], - ) + await self.repo_add( + cluster_uuid, + db_repo["name"], + db_repo["url"], + cert=db_repo.get("ca_cert"), + user=db_repo.get("user"), + password=db_repo.get("password"), + oci=db_repo.get("oci", False), + ) added_repo_dict[repo_id] = db_repo["name"] except Exception as e: raise K8sException( @@ -1302,6 +1350,7 @@ class K8sHelmBaseConnector(K8sConnector): kdu_model, kdu_instance, namespace, + labels, params_str, version, atomic, @@ -1319,6 +1368,7 @@ class K8sHelmBaseConnector(K8sConnector): kdu_instance, namespace, count, + labels, version, atomic, replicas, @@ -1352,10 +1402,14 @@ class K8sHelmBaseConnector(K8sConnector): kdu_instance, namespace, params_str, + labels, version, atomic, timeout, kubeconfig, + reset_values, + reuse_values, + reset_then_reuse_values, force, ) -> str: """Generates the command to upgrade a Helm Chart release @@ -1370,6 +1424,9 @@ class K8sHelmBaseConnector(K8sConnector): The --wait flag will be set automatically if --atomic is used timeout (float): The time, in seconds, to wait kubeconfig (str): Kubeconfig file path + reset_values(bool): If set, helm resets values instead of reusing previous values. + reuse_values(bool): If set, helm reuses previous values. + reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. Returns: str: command to upgrade a Helm Chart release @@ -1538,7 +1595,7 @@ class K8sHelmBaseConnector(K8sConnector): show_error_log: bool = True, encode_utf8: bool = False, env: dict = None, - ) -> (str, int): + ) -> tuple[str, int]: command = K8sHelmBaseConnector._remove_multiple_spaces(command) self.log.debug( "Executing async local command: {}, env: {}".format(command, env) @@ -1704,7 +1761,10 @@ class K8sHelmBaseConnector(K8sConnector): ) command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format( - self.kubectl_command, paths["kube_config"], namespace, service_name + self.kubectl_command, + paths["kube_config"], + quote(namespace), + quote(service_name), ) output, _rc = await self._local_async_exec( @@ -1755,20 +1815,20 @@ class K8sHelmBaseConnector(K8sConnector): repo_str = "" if repo_url: - repo_str = " --repo {}".format(repo_url) + repo_str = " --repo {}".format(quote(repo_url)) # Obtain the Chart's name and store it in the var kdu_model kdu_model, _ = self._split_repo(kdu_model=kdu_model) kdu_model, version = self._split_version(kdu_model) if version: - version_str = "--version {}".format(version) + version_str = "--version {}".format(quote(version)) else: version_str = "" full_command = self._get_inspect_command( show_command=inspect_command, - kdu_model=kdu_model, + kdu_model=quote(kdu_model), repo_str=repo_str, version=version_str, ) @@ -1782,7 +1842,7 @@ class K8sHelmBaseConnector(K8sConnector): kdu_model: str, repo_url: str = None, resource_name: str = None, - ) -> (int, str): + ) -> tuple[int, str]: """Get the replica count value in the Helm Chart Values. Args: @@ -1903,6 +1963,79 @@ class K8sHelmBaseConnector(K8sConnector): return replicas + async def _labels_dict(self, db_dict, kdu_instance): + # get the network service registry + ns_id = db_dict["filter"]["_id"] + try: + db_nsr = self.db.get_one("nsrs", {"_id": ns_id}) + except Exception as e: + print("nsr {} not found: {}".format(ns_id, e)) + nsd_id = db_nsr["nsd"]["_id"] + + # get the kdu registry + for index, kdu in enumerate(db_nsr["_admin"]["deployed"]["K8s"]): + if kdu["kdu-instance"] == kdu_instance: + db_kdur = kdu + break + else: + # No kdur found, could be the case of an EE chart + return {} + + kdu_name = db_kdur["kdu-name"] + member_vnf_index = db_kdur["member-vnf-index"] + # get the vnf registry + try: + db_vnfr = self.db.get_one( + "vnfrs", + {"nsr-id-ref": ns_id, "member-vnf-index-ref": member_vnf_index}, + ) + except Exception as e: + print("vnfr {} not found: {}".format(member_vnf_index, e)) + + vnf_id = db_vnfr["_id"] + vnfd_id = db_vnfr["vnfd-id"] + + return { + "managed-by": "osm.etsi.org", + "osm.etsi.org/ns-id": ns_id, + "osm.etsi.org/nsd-id": nsd_id, + "osm.etsi.org/vnf-id": vnf_id, + "osm.etsi.org/vnfd-id": vnfd_id, + "osm.etsi.org/kdu-id": kdu_instance, + "osm.etsi.org/kdu-name": kdu_name, + } + + async def _contains_labels(self, kdu_instance, namespace, kube_config, env): + command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format( + kube_config, + self._helm_command, + quote(kdu_instance), + quote(namespace), + ) + output, rc = await self._local_async_exec( + command=command, raise_exception_on_error=False, env=env + ) + manifests = yaml.safe_load_all(output) + for manifest in manifests: + # Check if the manifest has metadata and labels + if ( + manifest is not None + and "metadata" in manifest + and "labels" in manifest["metadata"] + ): + labels = { + "managed-by", + "osm.etsi.org/kdu-id", + "osm.etsi.org/kdu-name", + "osm.etsi.org/ns-id", + "osm.etsi.org/nsd-id", + "osm.etsi.org/vnf-id", + "osm.etsi.org/vnfd-id", + } + if labels.issubset(manifest["metadata"]["labels"].keys()): + return True + return False + async def _store_status( self, cluster_id: str, @@ -1957,12 +2090,12 @@ class K8sHelmBaseConnector(K8sConnector): # params for use in -f file # returns values file option and filename (in order to delete it at the end) - def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str): + def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]: if params and len(params) > 0: self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True) def get_random_number(): - r = random.randrange(start=1, stop=99999999) + r = random.SystemRandom().randint(1, 99999999) s = str(r) while len(s) < 10: s = "0" + s @@ -1986,19 +2119,14 @@ class K8sHelmBaseConnector(K8sConnector): # params for use in --set option @staticmethod def _params_to_set_option(params: dict) -> str: - params_str = "" - if params and len(params) > 0: - start = True - for key in params: - value = params.get(key, None) - if value is not None: - if start: - params_str += "--set " - start = False - else: - params_str += "," - params_str += "{}={}".format(key, value) - return params_str + pairs = [ + f"{quote(str(key))}={quote(str(value))}" + for key, value in params.items() + if value is not None + ] + if not pairs: + return "" + return "--set " + ",".join(pairs) @staticmethod def generate_kdu_instance_name(**kwargs): @@ -2028,7 +2156,7 @@ class K8sHelmBaseConnector(K8sConnector): name += "-" def get_random_number(): - r = random.randrange(start=1, stop=99999999) + r = random.SystemRandom().randint(1, 99999999) s = str(r) s = s.rjust(10, "0") return s @@ -2036,16 +2164,22 @@ class K8sHelmBaseConnector(K8sConnector): name = name + get_random_number() return name.lower() - def _split_version(self, kdu_model: str) -> (str, str): + def _split_version(self, kdu_model: str) -> tuple[str, str]: version = None - if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model: + if ( + not ( + self._is_helm_chart_a_file(kdu_model) + or self._is_helm_chart_a_url(kdu_model) + ) + and ":" in kdu_model + ): parts = kdu_model.split(sep=":") if len(parts) == 2: version = str(parts[1]) kdu_model = parts[0] return kdu_model, version - def _split_repo(self, kdu_model: str) -> (str, str): + def _split_repo(self, kdu_model: str) -> tuple[str, str]: """Obtain the Helm Chart's repository and Chart's names from the KDU model Args: @@ -2061,7 +2195,7 @@ class K8sHelmBaseConnector(K8sConnector): repo_name = None idx = kdu_model.find("/") - if idx >= 0: + if not self._is_helm_chart_a_url(kdu_model) and idx >= 0: chart_name = kdu_model[idx + 1 :] repo_name = kdu_model[:idx] @@ -2091,6 +2225,24 @@ class K8sHelmBaseConnector(K8sConnector): return repo_url + def _repo_to_oci_url(self, repo): + db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False) + if db_repo and "oci" in db_repo: + return db_repo.get("url") + + async def _prepare_helm_chart(self, kdu_model, cluster_id): + # e.g.: "stable/openldap", "1.0" + kdu_model, version = self._split_version(kdu_model) + # e.g.: "openldap, stable" + chart_name, repo = self._split_repo(kdu_model) + if repo and chart_name: # repo/chart case + oci_url = self._repo_to_oci_url(repo) + if oci_url: # oci does not require helm repo update + kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema + else: + await self.repo_update(cluster_id, repo) + return kdu_model, version + async def create_certificate( self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage ): @@ -2113,3 +2265,125 @@ class K8sHelmBaseConnector(K8sConnector): ) kubectl = Kubectl(config_file=paths["kube_config"]) await kubectl.delete_certificate(namespace, certificate_name) + + async def create_namespace( + self, + namespace, + cluster_uuid, + labels, + ): + """ + Create a namespace in a specific cluster + + :param namespace: Namespace to be created + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param labels: Dictionary with labels for the new namespace + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.create_namespace( + name=namespace, + labels=labels, + ) + + async def delete_namespace( + self, + namespace, + cluster_uuid, + ): + """ + Delete a namespace in a specific cluster + + :param namespace: namespace to be deleted + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.delete_namespace( + name=namespace, + ) + + async def copy_secret_data( + self, + src_secret: str, + dst_secret: str, + cluster_uuid: str, + data_key: str, + src_namespace: str = "osm", + dst_namespace: str = "osm", + ): + """ + Copy a single key and value from an existing secret to a new one + + :param src_secret: name of the existing secret + :param dst_secret: name of the new secret + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param data_key: key of the existing secret to be copied + :param src_namespace: Namespace of the existing secret + :param dst_namespace: Namespace of the new secret + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + secret_data = await kubectl.get_secret_content( + name=src_secret, + namespace=src_namespace, + ) + # Only the corresponding data_key value needs to be copy + data = {data_key: secret_data.get(data_key)} + await kubectl.create_secret( + name=dst_secret, + data=data, + namespace=dst_namespace, + secret_type="Opaque", + ) + + async def setup_default_rbac( + self, + name, + namespace, + cluster_uuid, + api_groups, + resources, + verbs, + service_account, + ): + """ + Create a basic RBAC for a new namespace. + + :param name: name of both Role and Role Binding + :param namespace: K8s namespace + :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig + :param api_groups: Api groups to be allowed in Policy Rule + :param resources: Resources to be allowed in Policy Rule + :param verbs: Verbs to be allowed in Policy Rule + :param service_account: Service Account name used to bind the Role + :returns: None + """ + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + kubectl = Kubectl(config_file=paths["kube_config"]) + await kubectl.create_role( + name=name, + labels={}, + namespace=namespace, + api_groups=api_groups, + resources=resources, + verbs=verbs, + ) + await kubectl.create_role_binding( + name=name, + labels={}, + namespace=namespace, + role_name=name, + sa_name=service_account, + )