##
import abc
import asyncio
+from typing import Union
import random
import time
import shlex
import shutil
import stat
-import subprocess
import os
import yaml
from uuid import uuid4
+from n2vc.config import EnvironConfig
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
################################### P U B L I C ####################################
####################################################################################
"""
+
service_account = "osm"
- _STABLE_REPO_URL = "https://charts.helm.sh/stable"
def __init__(
self,
helm_command: str = "/usr/bin/helm",
log: object = None,
on_update_db=None,
- vca_config: dict = None,
):
"""
self.log.info("Initializing K8S Helm connector")
+ self.config = EnvironConfig()
# random numbers for release name generation
random.seed(time.time())
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
# obtain stable repo url from config or apply default
- if not vca_config or not vca_config.get("stablerepourl"):
- self._stable_repo_url = self._STABLE_REPO_URL
- else:
- self._stable_repo_url = vca_config.get("stablerepourl")
+ self._stable_repo_url = self.config.get("stablerepourl")
+ if self._stable_repo_url == "None":
+ self._stable_repo_url = None
@staticmethod
def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
cluster_id for backward compatibility
"""
- namespace, _, cluster_id = cluster_uuid.rpartition(':')
+ namespace, _, cluster_id = cluster_uuid.rpartition(":")
return namespace, cluster_id
async def init_env(
- self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs,
+ self,
+ k8s_creds: str,
+ namespace: str = "kube-system",
+ reuse_cluster_uuid=None,
+ **kwargs,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Charts
cluster_id = str(uuid4())
cluster_uuid = "{}:{}".format(namespace, cluster_id)
- self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
+ self.log.debug(
+ "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
+ )
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
return cluster_uuid, n2vc_installed_sw
async def repo_add(
- self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
+ self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
):
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
- cluster_id, repo_type, name, url))
+ self.log.debug(
+ "Cluster {}, adding {} repository {}. URL: {}".format(
+ cluster_id, repo_type, name, url
+ )
+ )
+
+ # init_env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
# sync local dir
self.fs.sync(from_path=cluster_id)
+ # helm repo add name url
+ command = "env KUBECONFIG={} {} repo add {} {}".format(
+ paths["kube_config"], self._helm_command, name, url
+ )
+ self.log.debug("adding repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
+
+ # helm repo update
+ command = "env KUBECONFIG={} {} repo update {}".format(
+ paths["kube_config"], self._helm_command, name
+ )
+ self.log.debug("updating repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ async def repo_update(self, cluster_id: str, name: str, repo_type: str = "chart"):
+ self.log.debug(
+ "Cluster {}, updating {} repository {}".format(cluster_id, repo_type, name)
+ )
+
# init_env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# helm repo update
- command = "{} repo update".format(
- self._helm_command
- )
+ command = "{} repo update {}".format(self._helm_command, name)
self.log.debug("updating repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
-
- # helm repo add name url
- command = "{} repo add {} {}".format(
- self._helm_command, name, url
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
)
- self.log.debug("adding repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
# sync fs
self.fs.reverse_sync(from_path=cluster_id)
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("list repositories for cluster {}".format(cluster_id))
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
# config filename
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} repo list --output yaml".format(
- self._helm_command
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = "env KUBECONFIG={} {} repo list --output yaml".format(
+ paths["kube_config"], self._helm_command
)
# Set exception to false because if there are no repos just want an empty list
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} repo remove {}".format(
- self._helm_command, name
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = "env KUBECONFIG={} {} repo remove {}".format(
+ paths["kube_config"], self._helm_command, name
+ )
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
)
- await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
# sync fs
self.fs.reverse_sync(from_path=cluster_id)
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False, **kwargs
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False,
+ **kwargs,
) -> bool:
"""Reset a cluster
:return: Returns True if successful or raises an exception.
"""
namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
- .format(cluster_id, uninstall_sw))
+ self.log.debug(
+ "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
+ cluster_id, uninstall_sw
+ )
+ )
# sync local dir
self.fs.sync(from_path=cluster_id)
# that in some cases of previously installed helm releases it
# raised an error
self.log.warn(
- "Error uninstalling release {}: {}".format(kdu_instance, e)
+ "Error uninstalling release {}: {}".format(
+ kdu_instance, e
+ )
)
else:
msg = (
"Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
).format(cluster_id)
self.log.warn(msg)
- uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
+ uninstall_sw = (
+ False # Allow to remove k8s cluster without removing Tiller
+ )
if uninstall_sw:
await self._uninstall_sw(cluster_id, namespace)
return True
async def _install_impl(
- self,
- cluster_id: str,
- kdu_model: str,
- paths: dict,
- env: dict,
- kdu_instance: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- kdu_name: str = None,
- namespace: str = None,
+ self,
+ cluster_id: str,
+ kdu_model: str,
+ paths: dict,
+ env: dict,
+ kdu_instance: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
):
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
# params to str
params_str, file_to_delete = self._params_to_file_option(
cluster_id=cluster_id, params=params
version = str(parts[1])
kdu_model = parts[0]
- command = self._get_install_command(kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout)
+ repo = self._split_repo(kdu_model)
+ if repo:
+ await self.repo_update(cluster_id, repo)
+
+ command = self._get_install_command(
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ paths["kube_config"],
+ )
self.log.debug("installing: {}".format(command))
namespace=namespace,
db_dict=db_dict,
operation="install",
- run_once=False,
)
)
namespace=namespace,
db_dict=db_dict,
operation="install",
- run_once=True,
- check_every=0,
)
if rc != 0:
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# params to str
params_str, file_to_delete = self._params_to_file_option(
cluster_id=cluster_id, params=params
version = str(parts[1])
kdu_model = parts[0]
- command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
- params_str, version, atomic, timeout)
+ repo = self._split_repo(kdu_model)
+ if repo:
+ await self.repo_update(cluster_id, repo)
+
+ command = self._get_upgrade_command(
+ kdu_model,
+ kdu_instance,
+ instance_info["namespace"],
+ params_str,
+ version,
+ atomic,
+ timeout,
+ paths["kube_config"],
+ )
self.log.debug("upgrading: {}".format(command))
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="upgrade",
- run_once=False,
)
)
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="upgrade",
- run_once=True,
- check_every=0,
)
if rc != 0:
return 0
async def scale(
- self,
- kdu_instance: str,
- scale: int,
- resource_name: str,
- total_timeout: float = 1800,
- **kwargs,
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
):
raise NotImplementedError("Method not implemented")
async def get_scale_count(
- self,
- resource_name: str,
- kdu_instance: str,
- **kwargs,
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
):
raise NotImplementedError("Method not implemented")
cluster_name=cluster_id, create_if_not_exist=True
)
- command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
- revision)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = self._get_rollback_command(
+ kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
+ )
self.log.debug("rolling_back: {}".format(command))
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="rollback",
- run_once=False,
)
)
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="rollback",
- run_once=True,
- check_every=0,
)
if rc != 0:
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
- "uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_id
- )
+ "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
)
# sync local dir
# look for instance to obtain namespace
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
if not instance_info:
- raise K8sException("kdu_instance {} not found".format(kdu_instance))
-
+ self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
+ return True
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = self._get_uninstall_command(
+ kdu_instance, instance_info["namespace"], paths["kube_config"]
+ )
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
"different from rollback, upgrade and status"
)
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str) -> list:
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
"""
Returns a list of services defined for the specified kdu instance.
)
)
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
# sync local dir
self.fs.sync(from_path=cluster_id)
# get list of services names for kdu
- service_names = await self._get_services(cluster_id, kdu_instance, namespace)
+ service_names = await self._get_services(
+ cluster_id, kdu_instance, namespace, paths["kube_config"]
+ )
service_list = []
for service in service_names:
return service_list
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str) -> object:
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
self.log.debug(
"get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
- service_name, namespace, cluster_uuid)
+ service_name, namespace, cluster_uuid
+ )
)
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
return service
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
+ async def status_kdu(
+ self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
+ ) -> Union[str, dict]:
"""
This call would retrieve tha current state of a given KDU instance. It would be
would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
:param kwargs: Additional parameters (None yet)
+ :param yaml_format: if the return shall be returned as an YAML string or as a
+ dictionary
:return: If successful, it will return the following vector of arguments:
- K8s `namespace` in the cluster where the KDU lives
- `state` of the KDU instance. It can be:
break
else:
# instance does not exist
- raise K8sException("Instance name: {} not found in cluster: {}".format(
- kdu_instance, cluster_id))
+ raise K8sException(
+ "Instance name: {} not found in cluster: {}".format(
+ kdu_instance, cluster_id
+ )
+ )
status = await self._status_kdu(
cluster_id=cluster_id,
kdu_instance=kdu_instance,
namespace=instance["namespace"],
+ yaml_format=yaml_format,
show_error_log=True,
- return_text=True,
)
# sync fs
repo_id = db_repo.get("_id")
if curr_repo_url != db_repo["url"]:
if curr_repo_url:
- self.log.debug("repo {} url changed, delete and and again".format(
- db_repo["url"]))
+ self.log.debug(
+ "repo {} url changed, delete and and again".format(
+ db_repo["url"]
+ )
+ )
await self.repo_remove(cluster_uuid, db_repo["name"])
deleted_repo_list.append(repo_id)
# add repo
self.log.debug("add repo {}".format(db_repo["name"]))
- await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
+ await self.repo_add(
+ cluster_uuid, db_repo["name"], db_repo["url"]
+ )
added_repo_dict[repo_id] = db_repo["name"]
except Exception as e:
raise K8sException(
"""
@abc.abstractmethod
- async def _get_services(self, cluster_id, kdu_instance, namespace):
+ async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
"""
Implements the helm version dependent method to obtain services from a helm instance
"""
@abc.abstractmethod
- async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
- show_error_log: bool = False, return_text: bool = False):
+ async def _status_kdu(
+ self,
+ cluster_id: str,
+ kdu_instance: str,
+ namespace: str = None,
+ yaml_format: bool = False,
+ show_error_log: bool = False,
+ ) -> Union[str, dict]:
"""
Implements the helm version dependent method to obtain status of a helm instance
"""
@abc.abstractmethod
- def _get_install_command(self, kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout) -> str:
+ def _get_install_command(
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
+ ) -> str:
"""
Obtain command to be executed to delete the indicated instance
"""
@abc.abstractmethod
- def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout) -> str:
+ def _get_upgrade_command(
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
+ ) -> str:
"""
Obtain command to be executed to upgrade the indicated instance
"""
@abc.abstractmethod
- def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
+ def _get_rollback_command(
+ self, kdu_instance, namespace, revision, kubeconfig
+ ) -> str:
"""
Obtain command to be executed to rollback the indicated instance
"""
@abc.abstractmethod
- def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+ def _get_uninstall_command(
+ self, kdu_instance: str, namespace: str, kubeconfig: str
+ ) -> str:
"""
Obtain command to be executed to delete the indicated instance
"""
@abc.abstractmethod
- def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
- version: str):
+ def _get_inspect_command(
+ self, show_command: str, kdu_model: str, repo_str: str, version: str
+ ):
"""
Obtain command to be executed to obtain information about the kdu
"""
of dictionaries
"""
new_list = []
- for dictionary in input_list:
- new_dict = dict((k.lower(), v) for k, v in dictionary.items())
- new_list.append(new_dict)
+ if input_list:
+ for dictionary in input_list:
+ new_dict = dict((k.lower(), v) for k, v in dictionary.items())
+ new_list.append(new_dict)
return new_list
- def _local_exec(self, command: str) -> (str, int):
- command = self._remove_multiple_spaces(command)
- self.log.debug("Executing sync local command: {}".format(command))
- # raise exception if fails
- output = ""
- try:
- output = subprocess.check_output(
- command, shell=True, universal_newlines=True
- )
- return_code = 0
- self.log.debug(output)
- except Exception:
- return_code = 1
-
- return output, return_code
-
async def _local_async_exec(
self,
command: str,
raise_exception_on_error: bool = False,
show_error_log: bool = True,
encode_utf8: bool = False,
- env: dict = None
+ env: dict = None,
) -> (str, int):
command = K8sHelmBaseConnector._remove_multiple_spaces(command)
- self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+ self.log.debug(
+ "Executing async local command: {}, env: {}".format(command, env)
+ )
# split command
command = shlex.split(command)
try:
process = await asyncio.create_subprocess_exec(
- *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- env=environ
+ *command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ env=environ,
)
# wait for command terminate
else:
return "", -1
- async def _local_async_exec_pipe(self,
- command1: str,
- command2: str,
- raise_exception_on_error: bool = True,
- show_error_log: bool = True,
- encode_utf8: bool = False,
- env: dict = None):
+ async def _local_async_exec_pipe(
+ self,
+ command1: str,
+ command2: str,
+ raise_exception_on_error: bool = True,
+ show_error_log: bool = True,
+ encode_utf8: bool = False,
+ env: dict = None,
+ ):
command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
command = "{} | {}".format(command1, command2)
- self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+ self.log.debug(
+ "Executing async local command: {}, env: {}".format(command, env)
+ )
# split command
command1 = shlex.split(command1)
read, write = os.pipe()
await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
os.close(write)
- process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
- stdout=asyncio.subprocess.PIPE,
- env=environ)
+ process_2 = await asyncio.create_subprocess_exec(
+ *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
+ )
os.close(read)
stdout, stderr = await process_2.communicate()
"name": service_name,
"type": self._get_deep(data, ("spec", "type")),
"ports": self._get_deep(data, ("spec", "ports")),
- "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+ "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
}
if service["type"] == "LoadBalancer":
ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
version = "--version {}".format(str(parts[1]))
kdu_model = parts[0]
- full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
+ full_command = self._get_inspect_command(
+ inspect_command, kdu_model, repo_str, version
+ )
output, _rc = await self._local_async_exec(
command=full_command, encode_utf8=True
)
operation: str,
kdu_instance: str,
namespace: str = None,
- check_every: float = 10,
db_dict: dict = None,
- run_once: bool = False,
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
- return_text=False
- )
- status = detailed_status.get("info").get("description")
- self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation,
- )
- if not result:
- self.log.info("Error writing in database. Task exiting...")
- return
- except asyncio.CancelledError:
- self.log.debug("Task cancelled")
- return
- except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
- pass
- finally:
- if run_once:
- return
+ ) -> None:
+ """
+ Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
+
+ :param cluster_id (str): the cluster where the KDU instance is deployed
+ :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
+ :param kdu_instance (str): The KDU instance in relation to which the status is obtained
+ :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
+ :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
+ values for the keys:
+ - "collection": The Mongo DB collection to write to
+ - "filter": The query filter to use in the update process
+ - "path": The dot separated keys which targets the object to be updated
+ Defaults to None.
+ """
+
+ try:
+ detailed_status = await self._status_kdu(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ yaml_format=False,
+ namespace=namespace,
+ )
+
+ status = detailed_status.get("info").get("description")
+ self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
+
+ # write status to db
+ result = await self.write_app_status_to_db(
+ db_dict=db_dict,
+ status=str(status),
+ detailed_status=str(detailed_status),
+ operation=operation,
+ )
+
+ if not result:
+ self.log.info("Error writing in database. Task exiting...")
+
+ except asyncio.CancelledError as e:
+ self.log.warning(
+ f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
+ )
+ except Exception as e:
+ self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
if params and len(params) > 0:
- self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
+ self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
def get_random_number():
r = random.randrange(start=1, stop=99999999)
# check embeded chart (file or dir)
if chart_name.startswith("/"):
# extract file or directory name
- chart_name = chart_name[chart_name.rfind("/") + 1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
# check URL
elif "://" in chart_name:
# extract last portion of URL
- chart_name = chart_name[chart_name.rfind("/") + 1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
name = ""
for c in chart_name:
name = name + get_random_number()
return name.lower()
+
+ def _split_repo(self, kdu_model: str) -> str:
+ repo_name = None
+ idx = kdu_model.find("/")
+ if idx >= 0:
+ repo_name = kdu_model[:idx]
+ return repo_name