##
import abc
import asyncio
+from typing import Union
+from shlex import quote
import random
import time
import shlex
import shutil
import stat
-import subprocess
import os
import yaml
from uuid import uuid4
+from n2vc.config import EnvironConfig
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
+from n2vc.kubectl import Kubectl
class K8sHelmBaseConnector(K8sConnector):
################################### P U B L I C ####################################
####################################################################################
"""
+
service_account = "osm"
def __init__(
self.log.info("Initializing K8S Helm connector")
+ self.config = EnvironConfig()
# random numbers for release name generation
random.seed(time.time())
self._helm_command = helm_command
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
- @staticmethod
- def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
+ # obtain stable repo url from config or apply default
+ self._stable_repo_url = self.config.get("stablerepourl")
+ if self._stable_repo_url == "None":
+ self._stable_repo_url = None
+
+ # Lock to avoid concurrent execution of helm commands
+ self.cmd_lock = asyncio.Lock()
+
+ def _get_namespace(self, cluster_uuid: str) -> str:
"""
- Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
- cluster_id for backward compatibility
+ Obtains the namespace used by the cluster with the uuid passed by argument
+
+ param: cluster_uuid: cluster's uuid
"""
- namespace, _, cluster_id = cluster_uuid.rpartition(':')
- return namespace, cluster_id
+
+ # first, obtain the cluster corresponding to the uuid passed by argument
+ k8scluster = self.db.get_one(
+ "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
+ )
+ return k8scluster.get("namespace")
async def init_env(
- self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
- ) -> (str, bool):
+ self,
+ k8s_creds: str,
+ namespace: str = "kube-system",
+ reuse_cluster_uuid=None,
+ **kwargs,
+ ) -> tuple[str, bool]:
"""
It prepares a given K8s cluster environment to run Charts
:param namespace: optional namespace to be used for helm. By default,
'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
+ :param kwargs: Additional parameters (None yet)
:return: uuid of the K8s cluster and True if connector has installed some
software in the cluster
(on error, an exception will be raised)
"""
if reuse_cluster_uuid:
- namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
- namespace = namespace_ or namespace
+ cluster_id = reuse_cluster_uuid
else:
cluster_id = str(uuid4())
- cluster_uuid = "{}:{}".format(namespace, cluster_id)
- self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
+ self.log.debug(
+ "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
+ )
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
self.log.info("Cluster {} initialized".format(cluster_id))
- return cluster_uuid, n2vc_installed_sw
+ return cluster_id, n2vc_installed_sw
async def repo_add(
- self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
+ self,
+ cluster_uuid: str,
+ name: str,
+ url: str,
+ repo_type: str = "chart",
+ cert: str = None,
+ user: str = None,
+ password: str = None,
):
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
- cluster_id, repo_type, name, url))
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.log.debug(
+ "Cluster {}, adding {} repository {}. URL: {}".format(
+ cluster_uuid, repo_type, name, url
+ )
+ )
# init_env
paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
+
+ # helm repo add name url
+ command = ("env KUBECONFIG={} {} repo add {} {}").format(
+ paths["kube_config"], self._helm_command, quote(name), quote(url)
+ )
+
+ if cert:
+ temp_cert_file = os.path.join(
+ self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
+ )
+ os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
+ with open(temp_cert_file, "w") as the_cert:
+ the_cert.write(cert)
+ command += " --ca-file {}".format(quote(temp_cert_file))
+
+ if user:
+ command += " --username={}".format(quote(user))
+
+ if password:
+ command += " --password={}".format(quote(password))
+
+ self.log.debug("adding repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
)
# helm repo update
- command = "{} repo update".format(
- self._helm_command
+ command = "env KUBECONFIG={} {} repo update {}".format(
+ paths["kube_config"], self._helm_command, quote(name)
)
self.log.debug("updating repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
- # helm repo add name url
- command = "{} repo add {} {}".format(
- self._helm_command, name, url
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_uuid)
+
+ async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
+ self.log.debug(
+ "Cluster {}, updating {} repository {}".format(
+ cluster_uuid, repo_type, name
+ )
+ )
+
+ # init_env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
+
+ # helm repo update
+ command = "{} repo update {}".format(self._helm_command, quote(name))
+ self.log.debug("updating repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
)
- self.log.debug("adding repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
async def repo_list(self, cluster_uuid: str) -> list:
"""
:return: list of registered repositories: [ (name, url) .... ]
"""
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("list repositories for cluster {}".format(cluster_id))
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.log.debug("list repositories for cluster {}".format(cluster_uuid))
# config filename
paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
+ cluster_name=cluster_uuid, create_if_not_exist=True
)
- command = "{} repo list --output yaml".format(
- self._helm_command
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
+
+ command = "env KUBECONFIG={} {} repo list --output yaml".format(
+ paths["kube_config"], self._helm_command
)
# Set exception to false because if there are no repos just want an empty list
)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
if _rc == 0:
if output and len(output) > 0:
return []
async def repo_remove(self, cluster_uuid: str, name: str):
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.log.debug(
+ "remove {} repositories for cluster {}".format(name, cluster_uuid)
+ )
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
+ cluster_name=cluster_uuid, create_if_not_exist=True
)
- command = "{} repo remove {}".format(
- self._helm_command, name
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
+
+ command = "env KUBECONFIG={} {} repo remove {}".format(
+ paths["kube_config"], self._helm_command, quote(name)
+ )
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
)
- await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False,
+ **kwargs,
) -> bool:
+ """Reset a cluster
+
+ Resets the Kubernetes cluster by removing the helm deployment that represents it.
- namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
- .format(cluster_id, uninstall_sw))
+ :param cluster_uuid: The UUID of the cluster to reset
+ :param force: Boolean to force the reset
+ :param uninstall_sw: Boolean to force the reset
+ :param kwargs: Additional parameters (None yet)
+ :return: Returns True if successful or raises an exception.
+ """
+ namespace = self._get_namespace(cluster_uuid=cluster_uuid)
+ self.log.debug(
+ "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
+ cluster_uuid, uninstall_sw
+ )
+ )
# sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.fs.sync(from_path=cluster_uuid)
# uninstall releases if needed.
if uninstall_sw:
# that in some cases of previously installed helm releases it
# raised an error
self.log.warn(
- "Error uninstalling release {}: {}".format(kdu_instance, e)
+ "Error uninstalling release {}: {}".format(
+ kdu_instance, e
+ )
)
else:
msg = (
"Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
- ).format(cluster_id)
+ ).format(cluster_uuid)
self.log.warn(msg)
- uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
+ uninstall_sw = (
+ False # Allow to remove k8s cluster without removing Tiller
+ )
if uninstall_sw:
- await self._uninstall_sw(cluster_id, namespace)
+ await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
# delete cluster directory
- self.log.debug("Removing directory {}".format(cluster_id))
- self.fs.file_delete(cluster_id, ignore_non_exist=True)
+ self.log.debug("Removing directory {}".format(cluster_uuid))
+ self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
# Remove also local directorio if still exist
- direct = self.fs.path + "/" + cluster_id
+ direct = self.fs.path + "/" + cluster_uuid
shutil.rmtree(direct, ignore_errors=True)
return True
- async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- kdu_name: str = None,
- namespace: str = None,
- ):
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
+ def _is_helm_chart_a_file(self, chart_name: str):
+ return chart_name.count("/") > 1
+ async def _install_impl(
+ self,
+ cluster_id: str,
+ kdu_model: str,
+ paths: dict,
+ env: dict,
+ kdu_instance: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
+ ):
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
# version
- version = None
- if ":" in kdu_model:
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- version = str(parts[1])
- kdu_model = parts[0]
-
- # generate a name for the release. Then, check if already exists
- kdu_instance = None
- while kdu_instance is None:
- kdu_instance = self._generate_release_name(kdu_model)
- try:
- result = await self._status_kdu(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- namespace=namespace,
- show_error_log=False,
- )
- if result is not None:
- # instance already exists: generate a new one
- kdu_instance = None
- except K8sException:
- pass
-
- command = self._get_install_command(kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout)
+ kdu_model, version = self._split_version(kdu_model)
+
+ _, repo = self._split_repo(kdu_model)
+ if repo:
+ await self.repo_update(cluster_id, repo)
+
+ command = self._get_install_command(
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ paths["kube_config"],
+ )
self.log.debug("installing: {}".format(command))
namespace=namespace,
db_dict=db_dict,
operation="install",
- run_once=False,
)
)
output, rc = exec_task.result()
else:
-
output, rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
namespace=namespace,
db_dict=db_dict,
operation="install",
- run_once=True,
- check_every=0,
)
if rc != 0:
self.log.error(msg)
raise K8sException(msg)
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- self.log.debug("Returning kdu_instance {}".format(kdu_instance))
- return kdu_instance
-
async def upgrade(
self,
cluster_uuid: str,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ namespace: str = None,
+ force: bool = False,
):
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
+ self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
# sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.fs.sync(from_path=cluster_uuid)
# look for instance to obtain namespace
- instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
- if not instance_info:
- raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+ # set namespace
+ if not namespace:
+ instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+ if not instance_info:
+ raise K8sException("kdu_instance {} not found".format(kdu_instance))
+ namespace = instance_info["namespace"]
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
+ cluster_name=cluster_uuid, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
+
# params to str
params_str, file_to_delete = self._params_to_file_option(
- cluster_id=cluster_id, params=params
+ cluster_id=cluster_uuid, params=params
)
# version
- version = None
- if ":" in kdu_model:
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- version = str(parts[1])
- kdu_model = parts[0]
-
- command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
- params_str, version, atomic, timeout)
+ kdu_model, version = self._split_version(kdu_model)
+
+ _, repo = self._split_repo(kdu_model)
+ if repo:
+ await self.repo_update(cluster_uuid, repo)
+
+ command = self._get_upgrade_command(
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ paths["kube_config"],
+ force,
+ )
self.log.debug("upgrading: {}".format(command))
if atomic:
-
# exec helm in a task
exec_task = asyncio.ensure_future(
coro_or_future=self._local_async_exec(
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_id=cluster_id,
+ cluster_id=cluster_uuid,
kdu_instance=kdu_instance,
- namespace=instance_info["namespace"],
+ namespace=namespace,
db_dict=db_dict,
operation="upgrade",
- run_once=False,
)
)
output, rc = exec_task.result()
else:
-
output, rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
# write final status
await self._store_status(
- cluster_id=cluster_id,
+ cluster_id=cluster_uuid,
kdu_instance=kdu_instance,
- namespace=instance_info["namespace"],
+ namespace=namespace,
db_dict=db_dict,
operation="upgrade",
- run_once=True,
- check_every=0,
)
if rc != 0:
raise K8sException(msg)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
# return new revision number
instance = await self.get_instance_info(
else:
return 0
+ async def scale(
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ cluster_uuid: str = None,
+ kdu_model: str = None,
+ atomic: bool = True,
+ db_dict: dict = None,
+ **kwargs,
+ ):
+ """Scale a resource in a Helm Chart.
+
+ Args:
+ kdu_instance: KDU instance name
+ scale: Scale to which to set the resource
+ resource_name: Resource name
+ total_timeout: The time, in seconds, to wait
+ cluster_uuid: The UUID of the cluster
+ kdu_model: The chart reference
+ atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
+ The --wait flag will be set automatically if --atomic is used
+ db_dict: Dictionary for any additional data
+ kwargs: Additional parameters
+
+ Returns:
+ True if successful, False otherwise
+ """
+
+ debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
+ if resource_name:
+ debug_mgs = "scaling resource {} in model {} (cluster {})".format(
+ resource_name, kdu_model, cluster_uuid
+ )
+
+ self.log.debug(debug_mgs)
+
+ # look for instance to obtain namespace
+ # get_instance_info function calls the sync command
+ instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+ if not instance_info:
+ raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+
+ # version
+ kdu_model, version = self._split_version(kdu_model)
+
+ repo_url = await self._find_repo(kdu_model, cluster_uuid)
+
+ _, replica_str = await self._get_replica_count_url(
+ kdu_model, repo_url, resource_name
+ )
+
+ command = self._get_upgrade_scale_command(
+ kdu_model,
+ kdu_instance,
+ instance_info["namespace"],
+ scale,
+ version,
+ atomic,
+ replica_str,
+ total_timeout,
+ resource_name,
+ paths["kube_config"],
+ )
+
+ self.log.debug("scaling: {}".format(command))
+
+ if atomic:
+ # exec helm in a task
+ exec_task = asyncio.ensure_future(
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ )
+ # write status in another task
+ status_task = asyncio.ensure_future(
+ coro_or_future=self._store_status(
+ cluster_id=cluster_uuid,
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ db_dict=db_dict,
+ operation="scale",
+ )
+ )
+
+ # wait for execution task
+ await asyncio.wait([exec_task])
+
+ # cancel status task
+ status_task.cancel()
+ output, rc = exec_task.result()
+
+ else:
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ # write final status
+ await self._store_status(
+ cluster_id=cluster_uuid,
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ db_dict=db_dict,
+ operation="scale",
+ )
+
+ if rc != 0:
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_uuid)
+
+ return True
+
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ cluster_uuid: str,
+ kdu_model: str,
+ **kwargs,
+ ) -> int:
+ """Get a resource scale count.
+
+ Args:
+ cluster_uuid: The UUID of the cluster
+ resource_name: Resource name
+ kdu_instance: KDU instance name
+ kdu_model: The name or path of an Helm Chart
+ kwargs: Additional parameters
+
+ Returns:
+ Resource instance count
+ """
+
+ self.log.debug(
+ "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
+ )
+
+ # look for instance to obtain namespace
+ instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+ if not instance_info:
+ raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+ # init env, paths
+ paths, _ = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+
+ replicas = await self._get_replica_count_instance(
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ kubeconfig=paths["kube_config"],
+ resource_name=resource_name,
+ )
+
+ self.log.debug(
+ f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
+ )
+
+ # Get default value if scale count is not found from provided values
+ # Important note: this piece of code shall only be executed in the first scaling operation,
+ # since it is expected that the _get_replica_count_instance is able to obtain the number of
+ # replicas when a scale operation was already conducted previously for this KDU/resource!
+ if replicas is None:
+ repo_url = await self._find_repo(
+ kdu_model=kdu_model, cluster_uuid=cluster_uuid
+ )
+ replicas, _ = await self._get_replica_count_url(
+ kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
+ )
+
+ self.log.debug(
+ f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
+ f"{resource_name} obtained: {replicas}"
+ )
+
+ if replicas is None:
+ msg = "Replica count not found. Cannot be scaled"
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ return int(replicas)
+
async def rollback(
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"rollback kdu_instance {} to revision {} from cluster {}".format(
- kdu_instance, revision, cluster_id
+ kdu_instance, revision, cluster_uuid
)
)
# sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.fs.sync(from_path=cluster_uuid)
# look for instance to obtain namespace
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
+ cluster_name=cluster_uuid, create_if_not_exist=True
)
- command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
- revision)
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
+
+ command = self._get_rollback_command(
+ kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
+ )
self.log.debug("rolling_back: {}".format(command))
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_id=cluster_id,
+ cluster_id=cluster_uuid,
kdu_instance=kdu_instance,
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="rollback",
- run_once=False,
)
)
# write final status
await self._store_status(
- cluster_id=cluster_id,
+ cluster_id=cluster_uuid,
kdu_instance=kdu_instance,
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="rollback",
- run_once=True,
- check_every=0,
)
if rc != 0:
raise K8sException(msg)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
# return new revision number
instance = await self.get_instance_info(
else:
return 0
- async def uninstall(self, cluster_uuid: str, kdu_instance: str):
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
"""
Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
(this call should happen after all _terminate-config-primitive_ of the VNF
:param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
:param kdu_instance: unique name for the KDU instance to be deleted
+ :param kwargs: Additional parameters (None yet)
:return: True if successful
"""
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_id
+ kdu_instance, cluster_uuid
)
)
# sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.fs.sync(from_path=cluster_uuid)
# look for instance to obtain namespace
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
if not instance_info:
- raise K8sException("kdu_instance {} not found".format(kdu_instance))
-
+ self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
+ return True
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
+ cluster_name=cluster_uuid, create_if_not_exist=True
)
- command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
+
+ command = self._get_uninstall_command(
+ kdu_instance, instance_info["namespace"], paths["kube_config"]
+ )
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
return self._output_to_table(output)
:return:
"""
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("list releases for cluster {}".format(cluster_id))
+ self.log.debug("list releases for cluster {}".format(cluster_uuid))
# sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.fs.sync(from_path=cluster_uuid)
# execute internal command
- result = await self._instances_list(cluster_id)
+ result = await self._instances_list(cluster_uuid)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
return result
self.log.debug("Instance {} not found".format(kdu_instance))
return None
+ async def upgrade_charm(
+ self,
+ ee_id: str = None,
+ path: str = None,
+ charm_id: str = None,
+ charm_type: str = None,
+ timeout: float = None,
+ ) -> str:
+ """This method upgrade charms in VNFs
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ The output of the update operation if status equals to "completed"
+ """
+ raise K8sException("KDUs deployed with Helm do not support charm upgrade")
+
async def exec_primitive(
self,
cluster_uuid: str = None,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ **kwargs,
) -> str:
"""Exec primitive (Juju action)
:param timeout: Timeout for action execution
:param params: Dictionary of all the parameters needed for the action
:db_dict: Dictionary for any additional data
+ :param kwargs: Additional parameters (None yet)
:return: Returns the output of the action
"""
"different from rollback, upgrade and status"
)
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str) -> list:
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
"""
Returns a list of services defined for the specified kdu instance.
- `external_ip` List of external ips (in case they are available)
"""
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"get_services: cluster_uuid: {}, kdu_instance: {}".format(
cluster_uuid, kdu_instance
)
)
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+
# sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.fs.sync(from_path=cluster_uuid)
# get list of services names for kdu
- service_names = await self._get_services(cluster_id, kdu_instance, namespace)
+ service_names = await self._get_services(
+ cluster_uuid, kdu_instance, namespace, paths["kube_config"]
+ )
service_list = []
for service in service_names:
- service = await self._get_service(cluster_id, service, namespace)
+ service = await self._get_service(cluster_uuid, service, namespace)
service_list.append(service)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
return service_list
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str) -> object:
-
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
self.log.debug(
"get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
- service_name, namespace, cluster_uuid)
+ service_name, namespace, cluster_uuid
+ )
)
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-
# sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.fs.sync(from_path=cluster_uuid)
- service = await self._get_service(cluster_id, service_name, namespace)
+ service = await self._get_service(cluster_uuid, service_name, namespace)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
return service
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+ async def status_kdu(
+ self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
+ ) -> Union[str, dict]:
+ """
+ This call would retrieve tha current state of a given KDU instance. It would be
+ would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
+ values_ of the configuration parameters applied to a given instance. This call
+ would be based on the `status` call.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :param kwargs: Additional parameters (None yet)
+ :param yaml_format: if the return shall be returned as an YAML string or as a
+ dictionary
+ :return: If successful, it will return the following vector of arguments:
+ - K8s `namespace` in the cluster where the KDU lives
+ - `state` of the KDU instance. It can be:
+ - UNKNOWN
+ - DEPLOYED
+ - DELETED
+ - SUPERSEDED
+ - FAILED or
+ - DELETING
+ - List of `resources` (objects) that this release consists of, sorted by kind,
+ and the status of those resources
+ - Last `deployment_time`.
+ """
self.log.debug(
"status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
cluster_uuid, kdu_instance
)
)
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-
# sync local dir
- self.fs.sync(from_path=cluster_id)
+ self.fs.sync(from_path=cluster_uuid)
# get instance: needed to obtain namespace
- instances = await self._instances_list(cluster_id=cluster_id)
+ instances = await self._instances_list(cluster_id=cluster_uuid)
for instance in instances:
if instance.get("name") == kdu_instance:
break
else:
# instance does not exist
- raise K8sException("Instance name: {} not found in cluster: {}".format(
- kdu_instance, cluster_id))
+ raise K8sException(
+ "Instance name: {} not found in cluster: {}".format(
+ kdu_instance, cluster_uuid
+ )
+ )
status = await self._status_kdu(
- cluster_id=cluster_id,
+ cluster_id=cluster_uuid,
kdu_instance=kdu_instance,
namespace=instance["namespace"],
+ yaml_format=yaml_format,
show_error_log=True,
- return_text=True,
)
# sync fs
- self.fs.reverse_sync(from_path=cluster_id)
+ self.fs.reverse_sync(from_path=cluster_uuid)
return status
+ async def get_values_kdu(
+ self, kdu_instance: str, namespace: str, kubeconfig: str
+ ) -> str:
+ self.log.debug("get kdu_instance values {}".format(kdu_instance))
+
+ return await self._exec_get_command(
+ get_command="values",
+ kdu_instance=kdu_instance,
+ namespace=namespace,
+ kubeconfig=kubeconfig,
+ )
+
async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+ """Method to obtain the Helm Chart package's values
+
+ Args:
+ kdu_model: The name or path of an Helm Chart
+ repo_url: Helm Chart repository url
+
+ Returns:
+ str: the values of the Helm Chart package
+ """
self.log.debug(
"inspect kdu_model values {} from (optional) repo: {}".format(
)
)
- return await self._exec_inspect_comand(
+ return await self._exec_inspect_command(
inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
)
async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
self.log.debug(
"inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
)
- return await self._exec_inspect_comand(
+ return await self._exec_inspect_command(
inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
)
async def synchronize_repos(self, cluster_uuid: str):
-
self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
try:
db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
repo_id = db_repo.get("_id")
if curr_repo_url != db_repo["url"]:
if curr_repo_url:
- self.log.debug("repo {} url changed, delete and and again".format(
- db_repo["url"]))
+ self.log.debug(
+ "repo {} url changed, delete and and again".format(
+ db_repo["url"]
+ )
+ )
await self.repo_remove(cluster_uuid, db_repo["name"])
deleted_repo_list.append(repo_id)
# add repo
self.log.debug("add repo {}".format(db_repo["name"]))
- await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
+ if "ca_cert" in db_repo:
+ await self.repo_add(
+ cluster_uuid,
+ db_repo["name"],
+ db_repo["url"],
+ cert=db_repo["ca_cert"],
+ )
+ else:
+ await self.repo_add(
+ cluster_uuid,
+ db_repo["name"],
+ db_repo["url"],
+ )
added_repo_dict[repo_id] = db_repo["name"]
except Exception as e:
raise K8sException(
self.log.error("Error synchronizing repos: {}".format(e))
raise Exception("Error synchronizing repos: {}".format(e))
- def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
- repo_ids = []
- cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
- cluster = self.db.get_one("k8sclusters", cluster_filter)
- if cluster:
- repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
- return repo_ids
- else:
- raise K8sException(
- "k8cluster with helm-id : {} not found".format(cluster_uuid)
- )
-
def _get_db_repos_dict(self, repo_ids: list):
db_repos_dict = {}
for repo_id in repo_ids:
"""
@abc.abstractmethod
- async def _get_services(self, cluster_id, kdu_instance, namespace):
+ async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
"""
Implements the helm version dependent method to obtain services from a helm instance
"""
@abc.abstractmethod
- async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
- show_error_log: bool = False, return_text: bool = False):
+ async def _status_kdu(
+ self,
+ cluster_id: str,
+ kdu_instance: str,
+ namespace: str = None,
+ yaml_format: bool = False,
+ show_error_log: bool = False,
+ ) -> Union[str, dict]:
"""
Implements the helm version dependent method to obtain status of a helm instance
"""
@abc.abstractmethod
- def _get_install_command(self, kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout) -> str:
+ def _get_install_command(
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
+ ) -> str:
"""
Obtain command to be executed to delete the indicated instance
"""
@abc.abstractmethod
- def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout) -> str:
+ def _get_upgrade_scale_command(
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ count,
+ version,
+ atomic,
+ replicas,
+ timeout,
+ resource_name,
+ kubeconfig,
+ ) -> str:
+ """Generates the command to scale a Helm Chart release
+
+ Args:
+ kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
+ kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
+ namespace (str): Namespace where this KDU instance is deployed
+ scale (int): Scale count
+ version (str): Constraint with specific version of the Chart to use
+ atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
+ The --wait flag will be set automatically if --atomic is used
+ replica_str (str): The key under resource_name key where the scale count is stored
+ timeout (float): The time, in seconds, to wait
+ resource_name (str): The KDU's resource to scale
+ kubeconfig (str): Kubeconfig file path
+
+ Returns:
+ str: command to scale a Helm Chart release
"""
- Obtain command to be executed to upgrade the indicated instance
+
+ @abc.abstractmethod
+ def _get_upgrade_command(
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
+ force,
+ ) -> str:
+ """Generates the command to upgrade a Helm Chart release
+
+ Args:
+ kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
+ kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
+ namespace (str): Namespace where this KDU instance is deployed
+ params_str (str): Params used to upgrade the Helm Chart release
+ version (str): Constraint with specific version of the Chart to use
+ atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
+ The --wait flag will be set automatically if --atomic is used
+ timeout (float): The time, in seconds, to wait
+ kubeconfig (str): Kubeconfig file path
+ force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
+ Returns:
+ str: command to upgrade a Helm Chart release
"""
@abc.abstractmethod
- def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
+ def _get_rollback_command(
+ self, kdu_instance, namespace, revision, kubeconfig
+ ) -> str:
"""
Obtain command to be executed to rollback the indicated instance
"""
@abc.abstractmethod
- def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+ def _get_uninstall_command(
+ self, kdu_instance: str, namespace: str, kubeconfig: str
+ ) -> str:
"""
Obtain command to be executed to delete the indicated instance
"""
@abc.abstractmethod
- def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
- version: str):
- """
- Obtain command to be executed to obtain information about the kdu
+ def _get_inspect_command(
+ self, show_command: str, kdu_model: str, repo_str: str, version: str
+ ):
+ """Generates the command to obtain the information about an Helm Chart package
+ (´helm show ...´ command)
+
+ Args:
+ show_command: the second part of the command (`helm show <show_command>`)
+ kdu_model: The name or path of an Helm Chart
+ repo_url: Helm Chart repository url
+ version: constraint with specific version of the Chart to use
+
+ Returns:
+ str: the generated Helm Chart command
"""
+ @abc.abstractmethod
+ def _get_get_command(
+ self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
+ ):
+ """Obtain command to be executed to get information about the kdu instance."""
+
@abc.abstractmethod
async def _uninstall_sw(self, cluster_id: str, namespace: str):
"""
For Helm v3 it does nothing and does not need to be callled
"""
+ @abc.abstractmethod
+ def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+ """
+ Obtains the cluster repos identifiers
+ """
+
"""
####################################################################################
################################### P R I V A T E ##################################
of dictionaries
"""
new_list = []
- for dictionary in input_list:
- new_dict = dict((k.lower(), v) for k, v in dictionary.items())
- new_list.append(new_dict)
+ if input_list:
+ for dictionary in input_list:
+ new_dict = dict((k.lower(), v) for k, v in dictionary.items())
+ new_list.append(new_dict)
return new_list
- def _local_exec(self, command: str) -> (str, int):
- command = self._remove_multiple_spaces(command)
- self.log.debug("Executing sync local command: {}".format(command))
- # raise exception if fails
- output = ""
- try:
- output = subprocess.check_output(
- command, shell=True, universal_newlines=True
- )
- return_code = 0
- self.log.debug(output)
- except Exception:
- return_code = 1
-
- return output, return_code
-
async def _local_async_exec(
self,
command: str,
raise_exception_on_error: bool = False,
show_error_log: bool = True,
encode_utf8: bool = False,
- env: dict = None
- ) -> (str, int):
-
+ env: dict = None,
+ ) -> tuple[str, int]:
command = K8sHelmBaseConnector._remove_multiple_spaces(command)
- self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+ self.log.debug(
+ "Executing async local command: {}, env: {}".format(command, env)
+ )
# split command
command = shlex.split(command)
environ.update(env)
try:
- process = await asyncio.create_subprocess_exec(
- *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- env=environ
- )
+ async with self.cmd_lock:
+ process = await asyncio.create_subprocess_exec(
+ *command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ env=environ,
+ )
- # wait for command terminate
- stdout, stderr = await process.communicate()
+ # wait for command terminate
+ stdout, stderr = await process.communicate()
- return_code = process.returncode
+ return_code = process.returncode
output = ""
if stdout:
return output, return_code
except asyncio.CancelledError:
+ # first, kill the process if it is still running
+ if process.returncode is None:
+ process.kill()
raise
except K8sException:
raise
else:
return "", -1
- async def _local_async_exec_pipe(self,
- command1: str,
- command2: str,
- raise_exception_on_error: bool = True,
- show_error_log: bool = True,
- encode_utf8: bool = False,
- env: dict = None):
-
+ async def _local_async_exec_pipe(
+ self,
+ command1: str,
+ command2: str,
+ raise_exception_on_error: bool = True,
+ show_error_log: bool = True,
+ encode_utf8: bool = False,
+ env: dict = None,
+ ):
command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
command = "{} | {}".format(command1, command2)
- self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+ self.log.debug(
+ "Executing async local command: {}, env: {}".format(command, env)
+ )
# split command
command1 = shlex.split(command1)
environ.update(env)
try:
- read, write = os.pipe()
- await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
- os.close(write)
- process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
- stdout=asyncio.subprocess.PIPE,
- env=environ)
- os.close(read)
- stdout, stderr = await process_2.communicate()
+ async with self.cmd_lock:
+ read, write = os.pipe()
+ process_1 = await asyncio.create_subprocess_exec(
+ *command1, stdout=write, env=environ
+ )
+ os.close(write)
+ process_2 = await asyncio.create_subprocess_exec(
+ *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
+ )
+ os.close(read)
+ stdout, stderr = await process_2.communicate()
- return_code = process_2.returncode
+ return_code = process_2.returncode
output = ""
if stdout:
return output, return_code
except asyncio.CancelledError:
+ # first, kill the processes if they are still running
+ for process in (process_1, process_2):
+ if process.returncode is None:
+ process.kill()
raise
except K8sException:
raise
)
command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
- self.kubectl_command, paths["kube_config"], namespace, service_name
+ self.kubectl_command,
+ paths["kube_config"],
+ quote(namespace),
+ quote(service_name),
)
output, _rc = await self._local_async_exec(
"name": service_name,
"type": self._get_deep(data, ("spec", "type")),
"ports": self._get_deep(data, ("spec", "ports")),
- "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+ "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
}
if service["type"] == "LoadBalancer":
ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
return service
- async def _exec_inspect_comand(
+ async def _exec_get_command(
+ self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
+ ):
+ """Obtains information about the kdu instance."""
+
+ full_command = self._get_get_command(
+ get_command, kdu_instance, namespace, kubeconfig
+ )
+
+ output, _rc = await self._local_async_exec(command=full_command)
+
+ return output
+
+ async def _exec_inspect_command(
self, inspect_command: str, kdu_model: str, repo_url: str = None
):
- """
- Obtains information about a kdu, no cluster (no env)
+ """Obtains information about an Helm Chart package (´helm show´ command)
+
+ Args:
+ inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
+ kdu_model: The name or path of an Helm Chart
+ repo_url: Helm Chart repository url
+
+ Returns:
+ str: the requested info about the Helm Chart package
"""
repo_str = ""
if repo_url:
- repo_str = " --repo {}".format(repo_url)
+ repo_str = " --repo {}".format(quote(repo_url))
- idx = kdu_model.find("/")
- if idx >= 0:
- idx += 1
- kdu_model = kdu_model[idx:]
+ # Obtain the Chart's name and store it in the var kdu_model
+ kdu_model, _ = self._split_repo(kdu_model=kdu_model)
- version = ""
- if ":" in kdu_model:
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- version = "--version {}".format(str(parts[1]))
- kdu_model = parts[0]
+ kdu_model, version = self._split_version(kdu_model)
+ if version:
+ version_str = "--version {}".format(quote(version))
+ else:
+ version_str = ""
- full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
- output, _rc = await self._local_async_exec(
- command=full_command, encode_utf8=True
+ full_command = self._get_inspect_command(
+ show_command=inspect_command,
+ kdu_model=quote(kdu_model),
+ repo_str=repo_str,
+ version=version_str,
)
+ output, _ = await self._local_async_exec(command=full_command)
+
return output
+ async def _get_replica_count_url(
+ self,
+ kdu_model: str,
+ repo_url: str = None,
+ resource_name: str = None,
+ ) -> tuple[int, str]:
+ """Get the replica count value in the Helm Chart Values.
+
+ Args:
+ kdu_model: The name or path of an Helm Chart
+ repo_url: Helm Chart repository url
+ resource_name: Resource name
+
+ Returns:
+ A tuple with:
+ - The number of replicas of the specific instance; if not found, returns None; and
+ - The string corresponding to the replica count key in the Helm values
+ """
+
+ kdu_values = yaml.load(
+ await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
+ Loader=yaml.SafeLoader,
+ )
+
+ self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
+
+ if not kdu_values:
+ raise K8sException(
+ "kdu_values not found for kdu_model {}".format(kdu_model)
+ )
+
+ if resource_name:
+ kdu_values = kdu_values.get(resource_name, None)
+
+ if not kdu_values:
+ msg = "resource {} not found in the values in model {}".format(
+ resource_name, kdu_model
+ )
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ duplicate_check = False
+
+ replica_str = ""
+ replicas = None
+
+ if kdu_values.get("replicaCount") is not None:
+ replicas = kdu_values["replicaCount"]
+ replica_str = "replicaCount"
+ elif kdu_values.get("replicas") is not None:
+ duplicate_check = True
+ replicas = kdu_values["replicas"]
+ replica_str = "replicas"
+ else:
+ if resource_name:
+ msg = (
+ "replicaCount or replicas not found in the resource"
+ "{} values in model {}. Cannot be scaled".format(
+ resource_name, kdu_model
+ )
+ )
+ else:
+ msg = (
+ "replicaCount or replicas not found in the values"
+ "in model {}. Cannot be scaled".format(kdu_model)
+ )
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ # Control if replicas and replicaCount exists at the same time
+ msg = "replicaCount and replicas are exists at the same time"
+ if duplicate_check:
+ if "replicaCount" in kdu_values:
+ self.log.error(msg)
+ raise K8sException(msg)
+ else:
+ if "replicas" in kdu_values:
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ return replicas, replica_str
+
+ async def _get_replica_count_instance(
+ self,
+ kdu_instance: str,
+ namespace: str,
+ kubeconfig: str,
+ resource_name: str = None,
+ ) -> int:
+ """Get the replica count value in the instance.
+
+ Args:
+ kdu_instance: The name of the KDU instance
+ namespace: KDU instance namespace
+ kubeconfig:
+ resource_name: Resource name
+
+ Returns:
+ The number of replicas of the specific instance; if not found, returns None
+ """
+
+ kdu_values = yaml.load(
+ await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
+ Loader=yaml.SafeLoader,
+ )
+
+ self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
+
+ replicas = None
+
+ if kdu_values:
+ resource_values = (
+ kdu_values.get(resource_name, None) if resource_name else None
+ )
+
+ for replica_str in ("replicaCount", "replicas"):
+ if resource_values:
+ replicas = resource_values.get(replica_str)
+ else:
+ replicas = kdu_values.get(replica_str)
+
+ if replicas is not None:
+ break
+
+ return replicas
+
async def _store_status(
self,
cluster_id: str,
operation: str,
kdu_instance: str,
namespace: str = None,
- check_every: float = 10,
db_dict: dict = None,
- run_once: bool = False,
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
- return_text=False
- )
- status = detailed_status.get("info").get("description")
- self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation,
- )
- if not result:
- self.log.info("Error writing in database. Task exiting...")
- return
- except asyncio.CancelledError:
- self.log.debug("Task cancelled")
- return
- except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
- pass
- finally:
- if run_once:
- return
+ ) -> None:
+ """
+ Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
+
+ :param cluster_id (str): the cluster where the KDU instance is deployed
+ :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
+ :param kdu_instance (str): The KDU instance in relation to which the status is obtained
+ :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
+ :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
+ values for the keys:
+ - "collection": The Mongo DB collection to write to
+ - "filter": The query filter to use in the update process
+ - "path": The dot separated keys which targets the object to be updated
+ Defaults to None.
+ """
+
+ try:
+ detailed_status = await self._status_kdu(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ yaml_format=False,
+ namespace=namespace,
+ )
+
+ status = detailed_status.get("info").get("description")
+ self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
+
+ # write status to db
+ result = await self.write_app_status_to_db(
+ db_dict=db_dict,
+ status=str(status),
+ detailed_status=str(detailed_status),
+ operation=operation,
+ )
+
+ if not result:
+ self.log.info("Error writing in database. Task exiting...")
+
+ except asyncio.CancelledError as e:
+ self.log.warning(
+ f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
+ )
+ except Exception as e:
+ self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
-
+ def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
if params and len(params) > 0:
- self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
+ self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
def get_random_number():
- r = random.randrange(start=1, stop=99999999)
+ r = random.SystemRandom().randint(1, 99999999)
s = str(r)
while len(s) < 10:
s = "0" + s
for key in params:
value = params.get(key)
if "!!yaml" in str(value):
- value = yaml.load(value[7:])
+ value = yaml.safe_load(value[7:])
params2[key] = value
values_file = get_random_number() + ".yaml"
# params for use in --set option
@staticmethod
def _params_to_set_option(params: dict) -> str:
- params_str = ""
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += "--set "
- start = False
- else:
- params_str += ","
- params_str += "{}={}".format(key, value)
- return params_str
+ pairs = [
+ f"{quote(str(key))}={quote(str(value))}"
+ for key, value in params.items()
+ if value is not None
+ ]
+ if not pairs:
+ return ""
+ return "--set " + ",".join(pairs)
@staticmethod
- def _generate_release_name(chart_name: str):
+ def generate_kdu_instance_name(**kwargs):
+ chart_name = kwargs["kdu_model"]
# check embeded chart (file or dir)
if chart_name.startswith("/"):
# extract file or directory name
- chart_name = chart_name[chart_name.rfind("/") + 1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
# check URL
elif "://" in chart_name:
# extract last portion of URL
- chart_name = chart_name[chart_name.rfind("/") + 1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
name = ""
for c in chart_name:
name += "-"
def get_random_number():
- r = random.randrange(start=1, stop=99999999)
+ r = random.SystemRandom().randint(1, 99999999)
s = str(r)
s = s.rjust(10, "0")
return s
name = name + get_random_number()
return name.lower()
+
+ def _split_version(self, kdu_model: str) -> tuple[str, str]:
+ version = None
+ if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ version = str(parts[1])
+ kdu_model = parts[0]
+ return kdu_model, version
+
+ def _split_repo(self, kdu_model: str) -> tuple[str, str]:
+ """Obtain the Helm Chart's repository and Chart's names from the KDU model
+
+ Args:
+ kdu_model (str): Associated KDU model
+
+ Returns:
+ (str, str): Tuple with the Chart name in index 0, and the repo name
+ in index 2; if there was a problem finding them, return None
+ for both
+ """
+
+ chart_name = None
+ repo_name = None
+
+ idx = kdu_model.find("/")
+ if idx >= 0:
+ chart_name = kdu_model[idx + 1 :]
+ repo_name = kdu_model[:idx]
+
+ return chart_name, repo_name
+
+ async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
+ """Obtain the Helm repository for an Helm Chart
+
+ Args:
+ kdu_model (str): the KDU model associated with the Helm Chart instantiation
+ cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
+
+ Returns:
+ str: the repository URL; if Helm Chart is a local one, the function returns None
+ """
+
+ _, repo_name = self._split_repo(kdu_model=kdu_model)
+
+ repo_url = None
+ if repo_name:
+ # Find repository link
+ local_repo_list = await self.repo_list(cluster_uuid)
+ for repo in local_repo_list:
+ if repo["name"] == repo_name:
+ repo_url = repo["url"]
+ break # it is not necessary to continue the loop if the repo link was found...
+
+ return repo_url
+
+ async def create_certificate(
+ self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
+ ):
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_certificate(
+ namespace=namespace,
+ name=name,
+ dns_prefix=dns_prefix,
+ secret_name=secret_name,
+ usages=[usage],
+ issuer_name="ca-issuer",
+ )
+
+ async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_certificate(namespace, certificate_name)
+
+ async def create_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ labels,
+ ):
+ """
+ Create a namespace in a specific cluster
+
+ :param namespace: Namespace to be created
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param labels: Dictionary with labels for the new namespace
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_namespace(
+ name=namespace,
+ labels=labels,
+ )
+
+ async def delete_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ ):
+ """
+ Delete a namespace in a specific cluster
+
+ :param namespace: namespace to be deleted
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_namespace(
+ name=namespace,
+ )
+
+ async def copy_secret_data(
+ self,
+ src_secret: str,
+ dst_secret: str,
+ cluster_uuid: str,
+ data_key: str,
+ src_namespace: str = "osm",
+ dst_namespace: str = "osm",
+ ):
+ """
+ Copy a single key and value from an existing secret to a new one
+
+ :param src_secret: name of the existing secret
+ :param dst_secret: name of the new secret
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param data_key: key of the existing secret to be copied
+ :param src_namespace: Namespace of the existing secret
+ :param dst_namespace: Namespace of the new secret
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ secret_data = await kubectl.get_secret_content(
+ name=src_secret,
+ namespace=src_namespace,
+ )
+ # Only the corresponding data_key value needs to be copy
+ data = {data_key: secret_data.get(data_key)}
+ await kubectl.create_secret(
+ name=dst_secret,
+ data=data,
+ namespace=dst_namespace,
+ secret_type="Opaque",
+ )
+
+ async def setup_default_rbac(
+ self,
+ name,
+ namespace,
+ cluster_uuid,
+ api_groups,
+ resources,
+ verbs,
+ service_account,
+ ):
+ """
+ Create a basic RBAC for a new namespace.
+
+ :param name: name of both Role and Role Binding
+ :param namespace: K8s namespace
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param api_groups: Api groups to be allowed in Policy Rule
+ :param resources: Resources to be allowed in Policy Rule
+ :param verbs: Verbs to be allowed in Policy Rule
+ :param service_account: Service Account name used to bind the Role
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_role(
+ name=name,
+ labels={},
+ namespace=namespace,
+ api_groups=api_groups,
+ resources=resources,
+ verbs=verbs,
+ )
+ await kubectl.create_role_binding(
+ name=name,
+ labels={},
+ namespace=namespace,
+ role_name=name,
+ sa_name=service_account,
+ )