import abc
import asyncio
from typing import Union
+from shlex import quote
import random
import time
import shlex
import os
import yaml
from uuid import uuid4
+from urllib.parse import urlparse
from n2vc.config import EnvironConfig
from n2vc.exceptions import K8sException
self._helm_command = helm_command
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+ # exception if main post renderer executable is not present
+ self.main_post_renderer_path = EnvironConfig(prefixes=["OSMLCM_"]).get(
+ "mainpostrendererpath"
+ )
+ if self.main_post_renderer_path:
+ self._check_file_exists(
+ filename=self.main_post_renderer_path, exception_if_not_exists=True
+ )
+
+ # exception if podLabels post renderer executable is not present
+ self.podLabels_post_renderer_path = EnvironConfig(prefixes=["OSMLCM_"]).get(
+ "podlabelspostrendererpath"
+ )
+ if self.podLabels_post_renderer_path:
+ self._check_file_exists(
+ filename=self.podLabels_post_renderer_path, exception_if_not_exists=True
+ )
+
# obtain stable repo url from config or apply default
self._stable_repo_url = self.config.get("stablerepourl")
if self._stable_repo_url == "None":
namespace: str = "kube-system",
reuse_cluster_uuid=None,
**kwargs,
- ) -> (str, bool):
+ ) -> tuple[str, bool]:
"""
It prepares a given K8s cluster environment to run Charts
cert: str = None,
user: str = None,
password: str = None,
+ oci: bool = False,
):
self.log.debug(
"Cluster {}, adding {} repository {}. URL: {}".format(
# sync local dir
self.fs.sync(from_path=cluster_uuid)
- # helm repo add name url
- command = ("env KUBECONFIG={} {} repo add {} {}").format(
- paths["kube_config"], self._helm_command, name, url
- )
+ if oci:
+ if user and password:
+ host_port = urlparse(url).netloc if url.startswith("oci://") else url
+ # helm registry login url
+ command = "env KUBECONFIG={} {} registry login {}".format(
+ paths["kube_config"], self._helm_command, quote(host_port)
+ )
+ else:
+ self.log.debug(
+ "OCI registry login is not needed for repo: {}".format(name)
+ )
+ return
+ else:
+ # helm repo add name url
+ command = "env KUBECONFIG={} {} repo add {} {}".format(
+ paths["kube_config"], self._helm_command, quote(name), quote(url)
+ )
if cert:
temp_cert_file = os.path.join(
os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
with open(temp_cert_file, "w") as the_cert:
the_cert.write(cert)
- command += " --ca-file {}".format(temp_cert_file)
+ command += " --ca-file {}".format(quote(temp_cert_file))
if user:
- command += " --username={}".format(user)
+ command += " --username={}".format(quote(user))
if password:
- command += " --password={}".format(password)
+ command += " --password={}".format(quote(password))
self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
- # helm repo update
- command = "env KUBECONFIG={} {} repo update {}".format(
- paths["kube_config"], self._helm_command, name
- )
- self.log.debug("updating repo: {}".format(command))
- await self._local_async_exec(
- command=command, raise_exception_on_error=False, env=env
- )
+ if not oci:
+ # helm repo update
+ command = "env KUBECONFIG={} {} repo update {}".format(
+ paths["kube_config"], self._helm_command, quote(name)
+ )
+ self.log.debug("updating repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
# sync fs
self.fs.reverse_sync(from_path=cluster_uuid)
self.fs.sync(from_path=cluster_uuid)
# helm repo update
- command = "{} repo update {}".format(self._helm_command, name)
+ command = "{} repo update {}".format(self._helm_command, quote(name))
self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
self.fs.sync(from_path=cluster_uuid)
command = "env KUBECONFIG={} {} repo remove {}".format(
- paths["kube_config"], self._helm_command, name
+ paths["kube_config"], self._helm_command, quote(name)
)
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
def _is_helm_chart_a_file(self, chart_name: str):
return chart_name.count("/") > 1
+ @staticmethod
+ def _is_helm_chart_a_url(chart_name: str):
+ result = urlparse(chart_name)
+ return all([result.scheme, result.netloc])
+
async def _install_impl(
self,
cluster_id: str,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ labels: dict = None,
kdu_name: str = None,
namespace: str = None,
):
cluster_id=cluster_id, params=params
)
- # version
- kdu_model, version = self._split_version(kdu_model)
-
- _, repo = self._split_repo(kdu_model)
- if repo:
- await self.repo_update(cluster_id, repo)
+ kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id)
command = self._get_install_command(
kdu_model,
kdu_instance,
namespace,
+ labels,
params_str,
version,
atomic,
params: dict = None,
db_dict: dict = None,
namespace: str = None,
+ reset_values: bool = False,
+ reuse_values: bool = True,
+ reset_then_reuse_values: bool = False,
force: bool = False,
):
self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
cluster_id=cluster_uuid, params=params
)
- # version
- kdu_model, version = self._split_version(kdu_model)
+ kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
- _, repo = self._split_repo(kdu_model)
- if repo:
- await self.repo_update(cluster_uuid, repo)
+ labels_dict = None
+ if db_dict and await self._contains_labels(
+ kdu_instance, namespace, paths["kube_config"], env
+ ):
+ labels_dict = await self._labels_dict(db_dict, kdu_instance)
command = self._get_upgrade_command(
kdu_model,
kdu_instance,
namespace,
params_str,
+ labels_dict,
version,
atomic,
timeout,
paths["kube_config"],
+ reset_values,
+ reuse_values,
+ reset_then_reuse_values,
force,
)
)
# version
- kdu_model, version = self._split_version(kdu_model)
+ kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
repo_url = await self._find_repo(kdu_model, cluster_uuid)
kdu_model, repo_url, resource_name
)
+ labels_dict = None
+ if db_dict and await self._contains_labels(
+ kdu_instance, instance_info["namespace"], paths["kube_config"], env
+ ):
+ labels_dict = await self._labels_dict(db_dict, kdu_instance)
+
command = self._get_upgrade_scale_command(
kdu_model,
kdu_instance,
instance_info["namespace"],
scale,
+ labels_dict,
version,
atomic,
replica_str,
# add repo
self.log.debug("add repo {}".format(db_repo["name"]))
- if "ca_cert" in db_repo:
- await self.repo_add(
- cluster_uuid,
- db_repo["name"],
- db_repo["url"],
- cert=db_repo["ca_cert"],
- )
- else:
- await self.repo_add(
- cluster_uuid,
- db_repo["name"],
- db_repo["url"],
- )
+ await self.repo_add(
+ cluster_uuid,
+ db_repo["name"],
+ db_repo["url"],
+ cert=db_repo.get("ca_cert"),
+ user=db_repo.get("user"),
+ password=db_repo.get("password"),
+ oci=db_repo.get("oci", False),
+ )
added_repo_dict[repo_id] = db_repo["name"]
except Exception as e:
raise K8sException(
kdu_model,
kdu_instance,
namespace,
+ labels,
params_str,
version,
atomic,
kdu_instance,
namespace,
count,
+ labels,
version,
atomic,
replicas,
kdu_instance,
namespace,
params_str,
+ labels,
version,
atomic,
timeout,
kubeconfig,
+ reset_values,
+ reuse_values,
+ reset_then_reuse_values,
force,
) -> str:
"""Generates the command to upgrade a Helm Chart release
The --wait flag will be set automatically if --atomic is used
timeout (float): The time, in seconds, to wait
kubeconfig (str): Kubeconfig file path
+ reset_values(bool): If set, helm resets values instead of reusing previous values.
+ reuse_values(bool): If set, helm reuses previous values.
+ reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values
force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
Returns:
str: command to upgrade a Helm Chart release
show_error_log: bool = True,
encode_utf8: bool = False,
env: dict = None,
- ) -> (str, int):
+ ) -> tuple[str, int]:
command = K8sHelmBaseConnector._remove_multiple_spaces(command)
self.log.debug(
"Executing async local command: {}, env: {}".format(command, env)
)
command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
- self.kubectl_command, paths["kube_config"], namespace, service_name
+ self.kubectl_command,
+ paths["kube_config"],
+ quote(namespace),
+ quote(service_name),
)
output, _rc = await self._local_async_exec(
repo_str = ""
if repo_url:
- repo_str = " --repo {}".format(repo_url)
+ repo_str = " --repo {}".format(quote(repo_url))
# Obtain the Chart's name and store it in the var kdu_model
kdu_model, _ = self._split_repo(kdu_model=kdu_model)
kdu_model, version = self._split_version(kdu_model)
if version:
- version_str = "--version {}".format(version)
+ version_str = "--version {}".format(quote(version))
else:
version_str = ""
full_command = self._get_inspect_command(
show_command=inspect_command,
- kdu_model=kdu_model,
+ kdu_model=quote(kdu_model),
repo_str=repo_str,
version=version_str,
)
kdu_model: str,
repo_url: str = None,
resource_name: str = None,
- ) -> (int, str):
+ ) -> tuple[int, str]:
"""Get the replica count value in the Helm Chart Values.
Args:
return replicas
+ async def _labels_dict(self, db_dict, kdu_instance):
+ # get the network service registry
+ ns_id = db_dict["filter"]["_id"]
+ try:
+ db_nsr = self.db.get_one("nsrs", {"_id": ns_id})
+ except Exception as e:
+ print("nsr {} not found: {}".format(ns_id, e))
+ nsd_id = db_nsr["nsd"]["_id"]
+
+ # get the kdu registry
+ for index, kdu in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
+ if kdu["kdu-instance"] == kdu_instance:
+ db_kdur = kdu
+ break
+ else:
+ # No kdur found, could be the case of an EE chart
+ return {}
+
+ kdu_name = db_kdur["kdu-name"]
+ member_vnf_index = db_kdur["member-vnf-index"]
+ # get the vnf registry
+ try:
+ db_vnfr = self.db.get_one(
+ "vnfrs",
+ {"nsr-id-ref": ns_id, "member-vnf-index-ref": member_vnf_index},
+ )
+ except Exception as e:
+ print("vnfr {} not found: {}".format(member_vnf_index, e))
+
+ vnf_id = db_vnfr["_id"]
+ vnfd_id = db_vnfr["vnfd-id"]
+
+ return {
+ "managed-by": "osm.etsi.org",
+ "osm.etsi.org/ns-id": ns_id,
+ "osm.etsi.org/nsd-id": nsd_id,
+ "osm.etsi.org/vnf-id": vnf_id,
+ "osm.etsi.org/vnfd-id": vnfd_id,
+ "osm.etsi.org/kdu-id": kdu_instance,
+ "osm.etsi.org/kdu-name": kdu_name,
+ }
+
+ async def _contains_labels(self, kdu_instance, namespace, kube_config, env):
+ command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
+ kube_config,
+ self._helm_command,
+ quote(kdu_instance),
+ quote(namespace),
+ )
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ manifests = yaml.safe_load_all(output)
+ for manifest in manifests:
+ # Check if the manifest has metadata and labels
+ if (
+ manifest is not None
+ and "metadata" in manifest
+ and "labels" in manifest["metadata"]
+ ):
+ labels = {
+ "managed-by",
+ "osm.etsi.org/kdu-id",
+ "osm.etsi.org/kdu-name",
+ "osm.etsi.org/ns-id",
+ "osm.etsi.org/nsd-id",
+ "osm.etsi.org/vnf-id",
+ "osm.etsi.org/vnfd-id",
+ }
+ if labels.issubset(manifest["metadata"]["labels"].keys()):
+ return True
+ return False
+
async def _store_status(
self,
cluster_id: str,
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
+ def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
if params and len(params) > 0:
self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
def get_random_number():
- r = random.randrange(start=1, stop=99999999)
+ r = random.SystemRandom().randint(1, 99999999)
s = str(r)
while len(s) < 10:
s = "0" + s
# params for use in --set option
@staticmethod
def _params_to_set_option(params: dict) -> str:
- params_str = ""
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += "--set "
- start = False
- else:
- params_str += ","
- params_str += "{}={}".format(key, value)
- return params_str
+ pairs = [
+ f"{quote(str(key))}={quote(str(value))}"
+ for key, value in params.items()
+ if value is not None
+ ]
+ if not pairs:
+ return ""
+ return "--set " + ",".join(pairs)
@staticmethod
def generate_kdu_instance_name(**kwargs):
name += "-"
def get_random_number():
- r = random.randrange(start=1, stop=99999999)
+ r = random.SystemRandom().randint(1, 99999999)
s = str(r)
s = s.rjust(10, "0")
return s
name = name + get_random_number()
return name.lower()
- def _split_version(self, kdu_model: str) -> (str, str):
+ def _split_version(self, kdu_model: str) -> tuple[str, str]:
version = None
- if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
+ if (
+ not (
+ self._is_helm_chart_a_file(kdu_model)
+ or self._is_helm_chart_a_url(kdu_model)
+ )
+ and ":" in kdu_model
+ ):
parts = kdu_model.split(sep=":")
if len(parts) == 2:
version = str(parts[1])
kdu_model = parts[0]
return kdu_model, version
- def _split_repo(self, kdu_model: str) -> (str, str):
+ def _split_repo(self, kdu_model: str) -> tuple[str, str]:
"""Obtain the Helm Chart's repository and Chart's names from the KDU model
Args:
repo_name = None
idx = kdu_model.find("/")
- if idx >= 0:
+ if not self._is_helm_chart_a_url(kdu_model) and idx >= 0:
chart_name = kdu_model[idx + 1 :]
repo_name = kdu_model[:idx]
return repo_url
+ def _repo_to_oci_url(self, repo):
+ db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False)
+ if db_repo and "oci" in db_repo:
+ return db_repo.get("url")
+
+ async def _prepare_helm_chart(self, kdu_model, cluster_id):
+ # e.g.: "stable/openldap", "1.0"
+ kdu_model, version = self._split_version(kdu_model)
+ # e.g.: "openldap, stable"
+ chart_name, repo = self._split_repo(kdu_model)
+ if repo and chart_name: # repo/chart case
+ oci_url = self._repo_to_oci_url(repo)
+ if oci_url: # oci does not require helm repo update
+ kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
+ else:
+ await self.repo_update(cluster_id, repo)
+ return kdu_model, version
+
async def create_certificate(
self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
):
)
kubectl = Kubectl(config_file=paths["kube_config"])
await kubectl.delete_certificate(namespace, certificate_name)
+
+ async def create_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ labels,
+ ):
+ """
+ Create a namespace in a specific cluster
+
+ :param namespace: Namespace to be created
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param labels: Dictionary with labels for the new namespace
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_namespace(
+ name=namespace,
+ labels=labels,
+ )
+
+ async def delete_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ ):
+ """
+ Delete a namespace in a specific cluster
+
+ :param namespace: namespace to be deleted
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_namespace(
+ name=namespace,
+ )
+
+ async def copy_secret_data(
+ self,
+ src_secret: str,
+ dst_secret: str,
+ cluster_uuid: str,
+ data_key: str,
+ src_namespace: str = "osm",
+ dst_namespace: str = "osm",
+ ):
+ """
+ Copy a single key and value from an existing secret to a new one
+
+ :param src_secret: name of the existing secret
+ :param dst_secret: name of the new secret
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param data_key: key of the existing secret to be copied
+ :param src_namespace: Namespace of the existing secret
+ :param dst_namespace: Namespace of the new secret
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ secret_data = await kubectl.get_secret_content(
+ name=src_secret,
+ namespace=src_namespace,
+ )
+ # Only the corresponding data_key value needs to be copy
+ data = {data_key: secret_data.get(data_key)}
+ await kubectl.create_secret(
+ name=dst_secret,
+ data=data,
+ namespace=dst_namespace,
+ secret_type="Opaque",
+ )
+
+ async def setup_default_rbac(
+ self,
+ name,
+ namespace,
+ cluster_uuid,
+ api_groups,
+ resources,
+ verbs,
+ service_account,
+ ):
+ """
+ Create a basic RBAC for a new namespace.
+
+ :param name: name of both Role and Role Binding
+ :param namespace: K8s namespace
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param api_groups: Api groups to be allowed in Policy Rule
+ :param resources: Resources to be allowed in Policy Rule
+ :param verbs: Verbs to be allowed in Policy Rule
+ :param service_account: Service Account name used to bind the Role
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_role(
+ name=name,
+ labels={},
+ namespace=namespace,
+ api_groups=api_groups,
+ resources=resources,
+ verbs=verbs,
+ )
+ await kubectl.create_role_binding(
+ name=name,
+ labels={},
+ namespace=namespace,
+ role_name=name,
+ sa_name=service_account,
+ )