# contact with: nfvlabs@tid.es
##
from typing import Union
+from shlex import quote
import os
import yaml
"""Install a helm chart
:param cluster_uuid str: The UUID of the cluster to install to
- :param kdu_model str: The name or path of a bundle to install
+ :param kdu_model str: chart/reference (string), which can be either
+ of these options:
+ - a name of chart available via the repos known by OSM
+ (e.g. stable/openldap, stable/openldap:1.2.4)
+ - a path to a packaged chart (e.g. mychart.tgz)
+ - a path to an unpacked chart directory or a URL (e.g. mychart)
:param kdu_instance: Kdu instance name
:param atomic bool: If set, waits until the model is active and resets
the cluster on failure.
if namespace and namespace != "kube-system":
if not await self._namespace_exists(cluster_uuid, namespace):
try:
+ # TODO: refactor to use kubernetes API client
await self._create_namespace(cluster_uuid, namespace)
except Exception as e:
if not await self._namespace_exists(cluster_uuid, namespace):
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
return namespace in namespaces if namespaces else False
async def _get_namespaces(self, cluster_id: str):
-
self.log.debug("get namespaces cluster_id {}".format(cluster_id))
# init config, env
)
command = "{} --kubeconfig={} get namespaces -o=yaml".format(
- self.kubectl_command, paths["kube_config"]
+ self.kubectl_command, quote(paths["kube_config"])
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
return namespaces
async def _create_namespace(self, cluster_id: str, namespace: str):
-
self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
# init config, env
)
command = "{} --kubeconfig={} create namespace {}".format(
- self.kubectl_command, paths["kube_config"], namespace
+ self.kubectl_command, quote(paths["kube_config"]), quote(namespace)
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
async def _get_services(
self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
):
-
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
- kubeconfig, self._helm_command, kdu_instance, namespace
+ kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
+ )
+ command2 = "{} get --namespace={} -f -".format(
+ self.kubectl_command, quote(namespace)
)
- command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
output, _rc = await self._local_async_exec_pipe(
command1, command2, env=env, raise_exception_on_error=True
)
if namespace != "kube-system":
namespaces = await self._get_namespaces(cluster_id)
if namespace not in namespaces:
+ # TODO: refactor to use kubernetes API client
await self._create_namespace(cluster_id, namespace)
repo_list = await self.repo_list(cluster_id)
pass
async def _instances_list(self, cluster_id: str):
-
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
return []
def _get_inspect_command(
- self, inspect_command: str, kdu_model: str, repo_str: str, version: str
+ self, show_command: str, kdu_model: str, repo_str: str, version: str
):
+ """Generates the command to obtain the information about an Helm Chart package
+ (´helm show ...´ command)
+
+ Args:
+ show_command: the second part of the command (`helm show <show_command>`)
+ kdu_model: The name or path of an Helm Chart
+ repo_url: Helm Chart repository url
+ version: constraint with specific version of the Chart to use
+
+ Returns:
+ str: the generated Helm Chart command
+ """
+
inspect_command = "{} show {} {}{} {}".format(
- self._helm_command, inspect_command, kdu_model, repo_str, version
+ self._helm_command, show_command, quote(kdu_model), repo_str, version
)
return inspect_command
):
get_command = (
"env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
- kubeconfig, self._helm_command, get_command, kdu_instance, namespace
+ kubeconfig,
+ self._helm_command,
+ get_command,
+ quote(kdu_instance),
+ quote(namespace),
)
)
return get_command
yaml_format: bool = False,
show_error_log: bool = False,
) -> Union[str, dict]:
-
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
cluster_name=cluster_id, create_if_not_exist=True
)
command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
- paths["kube_config"], self._helm_command, kdu_instance, namespace
+ paths["kube_config"],
+ self._helm_command,
+ quote(kdu_instance),
+ quote(namespace),
)
output, rc = await self._local_async_exec(
timeout: float,
kubeconfig: str,
) -> str:
-
timeout_str = ""
if timeout:
timeout_str = "--timeout {}s".format(timeout)
# namespace
namespace_str = ""
if namespace:
- namespace_str = "--namespace {}".format(namespace)
+ namespace_str = "--namespace {}".format(quote(namespace))
# version
version_str = ""
"{params} {timeout} {ns} {model} {ver}".format(
kubeconfig=kubeconfig,
helm=self._helm_command,
- name=kdu_instance,
+ name=quote(kdu_instance),
atomic=atomic_str,
params=params_str,
timeout=timeout_str,
ns=namespace_str,
- model=kdu_model,
+ model=quote(kdu_model),
ver=version_str,
)
)
resource_name: str,
kubeconfig: str,
) -> str:
-
- timeout_str = ""
- if timeout:
- timeout_str = "--timeout {}s".format(timeout)
-
- # atomic
- atomic_str = ""
- if atomic:
- atomic_str = "--atomic"
-
- # version
- version_str = ""
- if version:
- version_str = "--version {}".format(version)
-
- # namespace
- namespace_str = ""
- if namespace:
- namespace_str = "--namespace {}".format(namespace)
+ """Generates the command to scale a Helm Chart release
+
+ Args:
+ kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
+ kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
+ namespace (str): Namespace where this KDU instance is deployed
+ scale (int): Scale count
+ version (str): Constraint with specific version of the Chart to use
+ atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
+ The --wait flag will be set automatically if --atomic is used
+ replica_str (str): The key under resource_name key where the scale count is stored
+ timeout (float): The time, in seconds, to wait
+ resource_name (str): The KDU's resource to scale
+ kubeconfig (str): Kubeconfig file path
+
+ Returns:
+ str: command to scale a Helm Chart release
+ """
# scale
if resource_name:
scale_str = self._params_to_set_option(scale_dict)
- command = (
- "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} "
- "{timeout} {ver}"
- ).format(
- helm=self._helm_command,
- name=kdu_instance,
- namespace=namespace_str,
- atomic=atomic_str,
- scale=scale_str,
- timeout=timeout_str,
- model=kdu_model,
- ver=version_str,
+ return self._get_upgrade_command(
+ kdu_model=kdu_model,
+ kdu_instance=kdu_instance,
+ namespace=namespace,
+ params_str=scale_str,
+ version=version,
+ atomic=atomic,
+ timeout=timeout,
kubeconfig=kubeconfig,
)
- return command
def _get_upgrade_command(
self,
atomic: bool,
timeout: float,
kubeconfig: str,
+ force: bool = False,
) -> str:
+ """Generates the command to upgrade a Helm Chart release
+
+ Args:
+ kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
+ kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
+ namespace (str): Namespace where this KDU instance is deployed
+ params_str (str): Params used to upgrade the Helm Chart release
+ version (str): Constraint with specific version of the Chart to use
+ atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
+ The --wait flag will be set automatically if --atomic is used
+ timeout (float): The time, in seconds, to wait
+ kubeconfig (str): Kubeconfig file path
+ force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
+ Returns:
+ str: command to upgrade a Helm Chart release
+ """
timeout_str = ""
if timeout:
if atomic:
atomic_str = "--atomic"
+ # force
+ force_str = ""
+ if force:
+ force_str = "--force "
+
# version
version_str = ""
if version:
- version_str = "--version {}".format(version)
+ version_str = "--version {}".format(quote(version))
# namespace
namespace_str = ""
if namespace:
- namespace_str = "--namespace {}".format(namespace)
+ namespace_str = "--namespace {}".format(quote(namespace))
command = (
- "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
- "--output yaml {params} {timeout} {ver}"
+ "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
+ "--output yaml {params} {timeout} --reuse-values {ver}"
).format(
kubeconfig=kubeconfig,
helm=self._helm_command,
- name=kdu_instance,
+ name=quote(kdu_instance),
namespace=namespace_str,
atomic=atomic_str,
+ force=force_str,
params=params_str,
timeout=timeout_str,
- model=kdu_model,
+ model=quote(kdu_model),
ver=version_str,
)
return command
self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
) -> str:
return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
- kubeconfig, self._helm_command, kdu_instance, revision, namespace
+ kubeconfig,
+ self._helm_command,
+ quote(kdu_instance),
+ revision,
+ quote(namespace),
)
def _get_uninstall_command(
self, kdu_instance: str, namespace: str, kubeconfig: str
) -> str:
-
return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
- kubeconfig, self._helm_command, kdu_instance, namespace
+ kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
)
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: