import abc
import asyncio
from typing import Union
+from shlex import quote
import random
import time
import shlex
namespace: str = "kube-system",
reuse_cluster_uuid=None,
**kwargs,
- ) -> (str, bool):
+ ) -> tuple[str, bool]:
"""
It prepares a given K8s cluster environment to run Charts
# helm repo add name url
command = ("env KUBECONFIG={} {} repo add {} {}").format(
- paths["kube_config"], self._helm_command, name, url
+ paths["kube_config"], self._helm_command, quote(name), quote(url)
)
if cert:
os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
with open(temp_cert_file, "w") as the_cert:
the_cert.write(cert)
- command += " --ca-file {}".format(temp_cert_file)
+ command += " --ca-file {}".format(quote(temp_cert_file))
if user:
- command += " --username={}".format(user)
+ command += " --username={}".format(quote(user))
if password:
- command += " --password={}".format(password)
+ command += " --password={}".format(quote(password))
self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(
# helm repo update
command = "env KUBECONFIG={} {} repo update {}".format(
- paths["kube_config"], self._helm_command, name
+ paths["kube_config"], self._helm_command, quote(name)
)
self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(
self.fs.sync(from_path=cluster_uuid)
# helm repo update
- command = "{} repo update {}".format(self._helm_command, name)
+ command = "{} repo update {}".format(self._helm_command, quote(name))
self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
self.fs.sync(from_path=cluster_uuid)
command = "env KUBECONFIG={} {} repo remove {}".format(
- paths["kube_config"], self._helm_command, name
+ paths["kube_config"], self._helm_command, quote(name)
)
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
show_error_log: bool = True,
encode_utf8: bool = False,
env: dict = None,
- ) -> (str, int):
+ ) -> tuple[str, int]:
command = K8sHelmBaseConnector._remove_multiple_spaces(command)
self.log.debug(
"Executing async local command: {}, env: {}".format(command, env)
)
command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
- self.kubectl_command, paths["kube_config"], namespace, service_name
+ self.kubectl_command,
+ paths["kube_config"],
+ quote(namespace),
+ quote(service_name),
)
output, _rc = await self._local_async_exec(
repo_str = ""
if repo_url:
- repo_str = " --repo {}".format(repo_url)
+ repo_str = " --repo {}".format(quote(repo_url))
# Obtain the Chart's name and store it in the var kdu_model
kdu_model, _ = self._split_repo(kdu_model=kdu_model)
kdu_model, version = self._split_version(kdu_model)
if version:
- version_str = "--version {}".format(version)
+ version_str = "--version {}".format(quote(version))
else:
version_str = ""
full_command = self._get_inspect_command(
show_command=inspect_command,
- kdu_model=kdu_model,
+ kdu_model=quote(kdu_model),
repo_str=repo_str,
version=version_str,
)
kdu_model: str,
repo_url: str = None,
resource_name: str = None,
- ) -> (int, str):
+ ) -> tuple[int, str]:
"""Get the replica count value in the Helm Chart Values.
Args:
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
+ def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
if params and len(params) > 0:
self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
# params for use in --set option
@staticmethod
def _params_to_set_option(params: dict) -> str:
- params_str = ""
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += "--set "
- start = False
- else:
- params_str += ","
- params_str += "{}={}".format(key, value)
- return params_str
+ pairs = [
+ f"{quote(str(key))}={quote(str(value))}"
+ for key, value in params.items()
+ if value is not None
+ ]
+ if not pairs:
+ return ""
+ return "--set " + ",".join(pairs)
@staticmethod
def generate_kdu_instance_name(**kwargs):
name = name + get_random_number()
return name.lower()
- def _split_version(self, kdu_model: str) -> (str, str):
+ def _split_version(self, kdu_model: str) -> tuple[str, str]:
version = None
if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
parts = kdu_model.split(sep=":")
kdu_model = parts[0]
return kdu_model, version
- def _split_repo(self, kdu_model: str) -> (str, str):
+ def _split_repo(self, kdu_model: str) -> tuple[str, str]:
"""Obtain the Helm Chart's repository and Chart's names from the KDU model
Args: