class JujuModelWatcher:
@staticmethod
- async def wait_for_model(
- model: Model,
- timeout: float = 3600
- ):
+ async def wait_for_model(model: Model, timeout: float = 3600):
"""
Wait for all entities in model to reach its final state.
################################### P U B L I C ####################################
####################################################################################
"""
+
@staticmethod
def generate_kdu_instance_name(**kwargs):
raise NotImplementedError("Method not implemented")
@abc.abstractmethod
async def scale(
- self, kdu_instance: str,
- scale: int,
- resource_name: str,
- total_timeout: float = 1800,
- **kwargs,
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
) -> bool:
"""
Scales an application in KDU instance.
@abc.abstractmethod
async def get_scale_count(
- self,
- resource_name: str,
- kdu_instance: str,
- **kwargs,
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
) -> int:
"""
Get an application scale count.
"""
@abc.abstractmethod
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str) -> list:
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
"""
Returns a list of services defined for the specified kdu instance.
"""
@abc.abstractmethod
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str = None) -> object:
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str = None
+ ) -> object:
"""
Obtains the data of the specified service in the k8cluster.
"""
def __init__(
- self,
- fs: object,
- db: object,
- kubectl_command: str = "/usr/bin/kubectl",
- helm_command: str = "/usr/bin/helm3",
- log: object = None,
- on_update_db=None,
- vca_config: dict = None,
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ helm_command: str = "/usr/bin/helm3",
+ log: object = None,
+ on_update_db=None,
+ vca_config: dict = None,
):
"""
Initializes helm connector for helm v3
"""
# parent class
- K8sHelmBaseConnector.__init__(self,
- db=db,
- log=log,
- fs=fs,
- kubectl_command=kubectl_command,
- helm_command=helm_command,
- on_update_db=on_update_db,
- vca_config=vca_config)
+ K8sHelmBaseConnector.__init__(
+ self,
+ db=db,
+ log=log,
+ fs=fs,
+ kubectl_command=kubectl_command,
+ helm_command=helm_command,
+ on_update_db=on_update_db,
+ vca_config=vca_config,
+ )
self.log.info("K8S Helm3 connector initialized")
async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- kdu_instance: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- kdu_name: str = None,
- namespace: str = None,
- **kwargs,
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ kdu_instance: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
+ **kwargs,
):
"""Install a helm chart
paths = {
"kube_dir": kube_dir,
"kube_config": config_filename,
- "cluster_dir": cluster_dir
+ "cluster_dir": cluster_dir,
}
# 3 - Prepare environment variables
"HELM_CACHE_HOME": helm_path_cache,
"HELM_CONFIG_HOME": helm_path_config,
"HELM_DATA_HOME": helm_path_data,
- "KUBECONFIG": config_filename
+ "KUBECONFIG": config_filename,
}
for file_name, file in paths.items():
return paths, env
- async def _get_namespaces(self,
- cluster_id: str):
+ async def _get_namespaces(self, cluster_id: str):
self.log.debug("get namespaces cluster_id {}".format(cluster_id))
return namespaces
- async def _create_namespace(self,
- cluster_id: str,
- namespace: str):
+ async def _create_namespace(self, cluster_id: str, namespace: str):
self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
command1 = "{} get manifest {} --namespace={}".format(
self._helm_command, kdu_instance, namespace
)
- command2 = "{} get --namespace={} -f -".format(
- self.kubectl_command, namespace
- )
+ command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
output, _rc = await self._local_async_exec_pipe(
command1, command2, env=env, raise_exception_on_error=True
)
self.log.debug("Default repo already present")
break
else:
- await self.repo_add(cluster_uuid,
- "stable",
- self._stable_repo_url)
+ await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
# Returns False as no software needs to be uninstalled
return False
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} list --all-namespaces --output yaml".format(
- self._helm_command
- )
+ command = "{} list --all-namespaces --output yaml".format(self._helm_command)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
else:
return []
- def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str,
- version: str):
+ def _get_inspect_command(
+ self, inspect_command: str, kdu_model: str, repo_str: str, version: str
+ ):
inspect_command = "{} show {} {}{} {}".format(
self._helm_command, inspect_command, kdu_model, repo_str, version
)
return_text: bool = False,
):
- self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace))
+ self.log.debug(
+ "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
+ )
if not namespace:
namespace = "kube-system"
command=command,
raise_exception_on_error=True,
show_error_log=show_error_log,
- env=env
+ env=env,
)
if return_text:
# unable to parse 'resources' as currently it is not included in helm3
return data
- def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str,
- params_str: str, version: str, atomic: bool, timeout: float) -> str:
+ def _get_install_command(
+ self,
+ kdu_model: str,
+ kdu_instance: str,
+ namespace: str,
+ params_str: str,
+ version: str,
+ atomic: bool,
+ timeout: float,
+ ) -> str:
timeout_str = ""
if timeout:
)
return command
- def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str,
- params_str: str, version: str, atomic: bool, timeout: float) -> str:
+ def _get_upgrade_command(
+ self,
+ kdu_model: str,
+ kdu_instance: str,
+ namespace: str,
+ params_str: str,
+ version: str,
+ atomic: bool,
+ timeout: float,
+ ) -> str:
timeout_str = ""
if timeout:
)
return command
- def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str:
+ def _get_rollback_command(
+ self, kdu_instance: str, namespace: str, revision: float
+ ) -> str:
return "{} rollback {} {} --namespace={} --wait".format(
self._helm_command, kdu_instance, revision, namespace
)
def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
return "{} uninstall {} --namespace={}".format(
- self._helm_command, kdu_instance, namespace)
+ self._helm_command, kdu_instance, namespace
+ )
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
repo_ids = []
################################### P U B L I C ####################################
####################################################################################
"""
+
service_account = "osm"
_STABLE_REPO_URL = "https://charts.helm.sh/stable"
Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
cluster_id for backward compatibility
"""
- namespace, _, cluster_id = cluster_uuid.rpartition(':')
+ namespace, _, cluster_id = cluster_uuid.rpartition(":")
return namespace, cluster_id
async def init_env(
- self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs,
+ self,
+ k8s_creds: str,
+ namespace: str = "kube-system",
+ reuse_cluster_uuid=None,
+ **kwargs,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Charts
cluster_id = str(uuid4())
cluster_uuid = "{}:{}".format(namespace, cluster_id)
- self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
+ self.log.debug(
+ "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
+ )
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
return cluster_uuid, n2vc_installed_sw
async def repo_add(
- self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
+ self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
):
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
- cluster_id, repo_type, name, url))
+ self.log.debug(
+ "Cluster {}, adding {} repository {}. URL: {}".format(
+ cluster_id, repo_type, name, url
+ )
+ )
# sync local dir
self.fs.sync(from_path=cluster_id)
)
# helm repo update
- command = "{} repo update".format(
- self._helm_command
- )
+ command = "{} repo update".format(self._helm_command)
self.log.debug("updating repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
# helm repo add name url
- command = "{} repo add {} {}".format(
- self._helm_command, name, url
- )
+ command = "{} repo add {} {}".format(self._helm_command, name, url)
self.log.debug("adding repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
# sync fs
self.fs.reverse_sync(from_path=cluster_id)
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} repo list --output yaml".format(
- self._helm_command
- )
+ command = "{} repo list --output yaml".format(self._helm_command)
# Set exception to false because if there are no repos just want an empty list
output, _rc = await self._local_async_exec(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} repo remove {}".format(
- self._helm_command, name
+ command = "{} repo remove {}".format(self._helm_command, name)
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
)
- await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
# sync fs
self.fs.reverse_sync(from_path=cluster_id)
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False, **kwargs
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False,
+ **kwargs,
) -> bool:
"""Reset a cluster
:return: Returns True if successful or raises an exception.
"""
namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
- .format(cluster_id, uninstall_sw))
+ self.log.debug(
+ "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
+ cluster_id, uninstall_sw
+ )
+ )
# sync local dir
self.fs.sync(from_path=cluster_id)
# that in some cases of previously installed helm releases it
# raised an error
self.log.warn(
- "Error uninstalling release {}: {}".format(kdu_instance, e)
+ "Error uninstalling release {}: {}".format(
+ kdu_instance, e
+ )
)
else:
msg = (
"Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
).format(cluster_id)
self.log.warn(msg)
- uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
+ uninstall_sw = (
+ False # Allow to remove k8s cluster without removing Tiller
+ )
if uninstall_sw:
await self._uninstall_sw(cluster_id, namespace)
return True
async def _install_impl(
- self,
- cluster_id: str,
- kdu_model: str,
- paths: dict,
- env: dict,
- kdu_instance: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- kdu_name: str = None,
- namespace: str = None,
+ self,
+ cluster_id: str,
+ kdu_model: str,
+ paths: dict,
+ env: dict,
+ kdu_instance: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
):
# params to str
params_str, file_to_delete = self._params_to_file_option(
version = str(parts[1])
kdu_model = parts[0]
- command = self._get_install_command(kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout)
+ command = self._get_install_command(
+ kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ )
self.log.debug("installing: {}".format(command))
version = str(parts[1])
kdu_model = parts[0]
- command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
- params_str, version, atomic, timeout)
+ command = self._get_upgrade_command(
+ kdu_model,
+ kdu_instance,
+ instance_info["namespace"],
+ params_str,
+ version,
+ atomic,
+ timeout,
+ )
self.log.debug("upgrading: {}".format(command))
return 0
async def scale(
- self,
- kdu_instance: str,
- scale: int,
- resource_name: str,
- total_timeout: float = 1800,
- **kwargs,
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
):
raise NotImplementedError("Method not implemented")
async def get_scale_count(
- self,
- resource_name: str,
- kdu_instance: str,
- **kwargs,
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
):
raise NotImplementedError("Method not implemented")
cluster_name=cluster_id, create_if_not_exist=True
)
- command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
- revision)
+ command = self._get_rollback_command(
+ kdu_instance, instance_info["namespace"], revision
+ )
self.log.debug("rolling_back: {}".format(command))
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
- "uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_id
- )
+ "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
)
# sync local dir
"different from rollback, upgrade and status"
)
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str) -> list:
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
"""
Returns a list of services defined for the specified kdu instance.
return service_list
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str) -> object:
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
self.log.debug(
"get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
- service_name, namespace, cluster_uuid)
+ service_name, namespace, cluster_uuid
+ )
)
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
break
else:
# instance does not exist
- raise K8sException("Instance name: {} not found in cluster: {}".format(
- kdu_instance, cluster_id))
+ raise K8sException(
+ "Instance name: {} not found in cluster: {}".format(
+ kdu_instance, cluster_id
+ )
+ )
status = await self._status_kdu(
cluster_id=cluster_id,
repo_id = db_repo.get("_id")
if curr_repo_url != db_repo["url"]:
if curr_repo_url:
- self.log.debug("repo {} url changed, delete and and again".format(
- db_repo["url"]))
+ self.log.debug(
+ "repo {} url changed, delete and and again".format(
+ db_repo["url"]
+ )
+ )
await self.repo_remove(cluster_uuid, db_repo["name"])
deleted_repo_list.append(repo_id)
# add repo
self.log.debug("add repo {}".format(db_repo["name"]))
- await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
+ await self.repo_add(
+ cluster_uuid, db_repo["name"], db_repo["url"]
+ )
added_repo_dict[repo_id] = db_repo["name"]
except Exception as e:
raise K8sException(
"""
@abc.abstractmethod
- async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
- show_error_log: bool = False, return_text: bool = False):
+ async def _status_kdu(
+ self,
+ cluster_id: str,
+ kdu_instance: str,
+ namespace: str = None,
+ show_error_log: bool = False,
+ return_text: bool = False,
+ ):
"""
Implements the helm version dependent method to obtain status of a helm instance
"""
@abc.abstractmethod
- def _get_install_command(self, kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout) -> str:
+ def _get_install_command(
+ self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ ) -> str:
"""
Obtain command to be executed to delete the indicated instance
"""
@abc.abstractmethod
- def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
- params_str, version, atomic, timeout) -> str:
+ def _get_upgrade_command(
+ self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ ) -> str:
"""
Obtain command to be executed to upgrade the indicated instance
"""
"""
@abc.abstractmethod
- def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
- version: str):
+ def _get_inspect_command(
+ self, show_command: str, kdu_model: str, repo_str: str, version: str
+ ):
"""
Obtain command to be executed to obtain information about the kdu
"""
raise_exception_on_error: bool = False,
show_error_log: bool = True,
encode_utf8: bool = False,
- env: dict = None
+ env: dict = None,
) -> (str, int):
command = K8sHelmBaseConnector._remove_multiple_spaces(command)
- self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+ self.log.debug(
+ "Executing async local command: {}, env: {}".format(command, env)
+ )
# split command
command = shlex.split(command)
try:
process = await asyncio.create_subprocess_exec(
- *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- env=environ
+ *command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ env=environ,
)
# wait for command terminate
else:
return "", -1
- async def _local_async_exec_pipe(self,
- command1: str,
- command2: str,
- raise_exception_on_error: bool = True,
- show_error_log: bool = True,
- encode_utf8: bool = False,
- env: dict = None):
+ async def _local_async_exec_pipe(
+ self,
+ command1: str,
+ command2: str,
+ raise_exception_on_error: bool = True,
+ show_error_log: bool = True,
+ encode_utf8: bool = False,
+ env: dict = None,
+ ):
command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
command = "{} | {}".format(command1, command2)
- self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+ self.log.debug(
+ "Executing async local command: {}, env: {}".format(command, env)
+ )
# split command
command1 = shlex.split(command1)
read, write = os.pipe()
await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
os.close(write)
- process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
- stdout=asyncio.subprocess.PIPE,
- env=environ)
+ process_2 = await asyncio.create_subprocess_exec(
+ *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
+ )
os.close(read)
stdout, stderr = await process_2.communicate()
"name": service_name,
"type": self._get_deep(data, ("spec", "type")),
"ports": self._get_deep(data, ("spec", "ports")),
- "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+ "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
}
if service["type"] == "LoadBalancer":
ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
version = "--version {}".format(str(parts[1]))
kdu_model = parts[0]
- full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
+ full_command = self._get_inspect_command(
+ inspect_command, kdu_model, repo_str, version
+ )
output, _rc = await self._local_async_exec(
command=full_command, encode_utf8=True
)
try:
await asyncio.sleep(check_every)
detailed_status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
- return_text=False
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=namespace,
+ return_text=False,
)
status = detailed_status.get("info").get("description")
- self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
+ self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
# write status to db
result = await self.write_app_status_to_db(
db_dict=db_dict,
self.log.debug("Task cancelled")
return
except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
+ self.log.debug(
+ "_store_status exception: {}".format(str(e)), exc_info=True
+ )
pass
finally:
if run_once:
def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
if params and len(params) > 0:
- self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
+ self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
def get_random_number():
r = random.randrange(start=1, stop=99999999)
# check embeded chart (file or dir)
if chart_name.startswith("/"):
# extract file or directory name
- chart_name = chart_name[chart_name.rfind("/") + 1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
# check URL
elif "://" in chart_name:
# extract last portion of URL
- chart_name = chart_name[chart_name.rfind("/") + 1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
name = ""
for c in chart_name:
# initialize helm client-only
self.log.debug("Initializing helm client-only...")
command = "{} init --client-only --stable-repo-url {} ".format(
- self._helm_command, self._stable_repo_url)
+ self._helm_command, self._stable_repo_url
+ )
try:
asyncio.ensure_future(
self._local_async_exec(command=command, raise_exception_on_error=False)
self.log.info("K8S Helm2 connector initialized")
async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- kdu_instance: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- kdu_name: str = None,
- namespace: str = None,
- **kwargs,
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ kdu_instance: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
+ **kwargs,
):
"""
Deploys of a new KDU instance. It would implicitly rely on the `install` call
return services
- async def _cluster_init(self, cluster_id: str, namespace: str,
- paths: dict, env: dict):
+ async def _cluster_init(
+ self, cluster_id: str, namespace: str, paths: dict, env: dict
+ ):
"""
Implements the helm version dependent cluster initialization:
For helm2 it initialized tiller environment if needed
namespace,
paths["helm_dir"],
self.service_account,
- self._stable_repo_url
+ self._stable_repo_url,
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
for repo in repo_list:
if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
self.log.debug("Add new stable repo url: {}")
- await self.repo_remove(cluster_uuid,
- "stable")
- await self.repo_add(cluster_uuid,
- "stable",
- self._stable_repo_url)
+ await self.repo_remove(cluster_uuid, "stable")
+ await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
break
return n2vc_installed_sw
else:
return []
- def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
- version: str):
+ def _get_inspect_command(
+ self, show_command: str, kdu_model: str, repo_str: str, version: str
+ ):
inspect_command = "{} inspect {} {}{} {}".format(
self._helm_command, show_command, kdu_model, repo_str, version
)
if version:
version_str = "--version {}".format(version)
- command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
- .format(helm=self._helm_command,
- atomic=atomic_str,
- params=params_str,
- timeout=timeout_str,
- name=kdu_instance,
- model=kdu_model,
- ver=version_str
- )
+ command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}".format(
+ helm=self._helm_command,
+ atomic=atomic_str,
+ params=params_str,
+ timeout=timeout_str,
+ name=kdu_instance,
+ model=kdu_model,
+ ver=version_str,
+ )
return command
def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
cluster_uuid,
kdu_instance,
filter=db_dict["filter"],
- vca_id=kwargs.get("vca_id")
+ vca_id=kwargs.get("vca_id"),
)
return True
async def scale(
- self,
- kdu_instance: str,
- scale: int,
- resource_name: str,
- total_timeout: float = 1800,
- **kwargs,
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
) -> bool:
"""Scale an application in a model
model_name=kdu_instance,
application_name=resource_name,
scale=scale,
- total_timeout=total_timeout
+ total_timeout=total_timeout,
)
except Exception as e:
error_msg = "Error scaling application {} in kdu instance {}: {}".format(
- resource_name, kdu_instance, e)
+ resource_name, kdu_instance, e
+ )
self.log.error(error_msg)
raise K8sException(message=error_msg)
return True
async def get_scale_count(
- self,
- resource_name: str,
- kdu_instance: str,
- **kwargs,
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
) -> int:
"""Get an application scale count
return len(status.applications[resource_name].units)
except Exception as e:
error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
- resource_name, kdu_instance, e)
+ resource_name, kdu_instance, e
+ )
self.log.error(error_msg)
raise K8sException(message=error_msg)
"status is not completed: {} output: {}".format(status, output)
)
if self.on_update_db:
- await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
+ await self.on_update_db(
+ cluster_uuid, kdu_instance, filter=db_dict["filter"]
+ )
return output
try:
for model_name in vcastatus:
# Adding executed actions
- vcastatus[model_name]["executedActions"] = \
- await libjuju.get_executed_actions(kdu_instance)
+ vcastatus[model_name][
+ "executedActions"
+ ] = await libjuju.get_executed_actions(kdu_instance)
for application in vcastatus[model_name]["applications"]:
# Adding application actions
- vcastatus[model_name]["applications"][application]["actions"] = \
- await libjuju.get_actions(application, kdu_instance)
+ vcastatus[model_name]["applications"][application][
+ "actions"
+ ] = await libjuju.get_actions(application, kdu_instance)
# Adding application configs
- vcastatus[model_name]["applications"][application]["configs"] = \
- await libjuju.get_application_configs(kdu_instance, application)
+ vcastatus[model_name]["applications"][application][
+ "configs"
+ ] = await libjuju.get_application_configs(kdu_instance, application)
except Exception as e:
self.log.debug("Error in updating vca status: {}".format(str(e)))
"""
return await controller.get_model(model_name)
- async def model_exists(self, model_name: str, controller: Controller = None) -> bool:
+ async def model_exists(
+ self, model_name: str, controller: Controller = None
+ ) -> bool:
"""
Check if model exists
return application
async def scale_application(
- self,
- model_name: str,
- application_name: str,
- scale: int = 1,
- total_timeout: float = None,
+ self,
+ model_name: str,
+ application_name: str,
+ scale: int = 1,
+ total_timeout: float = None,
):
"""
Scale application (K8s)
await application.scale(scale=scale)
# Wait until application is scaled in model
self.log.debug(
- "Waiting for application {} to be scaled in model {}...".format
- (
+ "Waiting for application {} to be scaled in model {}...".format(
application_name, model_name
)
)
# wait until application unit count and scale count are equal.
# Because there is a delay before scaling triggers in Juju model.
if application_scale == scale:
- await JujuModelWatcher.wait_for_model(model=model, timeout=total_timeout)
+ await JujuModelWatcher.wait_for_model(
+ model=model, timeout=total_timeout
+ )
self.log.debug(
"Application {} is scaled in model {}".format(
application_name, model_name
if not include_path:
i = filename.rfind("/")
if i > 0:
- filename = filename[i + 1:]
+ filename = filename[i + 1 :]
# datetime
dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
# TODO
@abc.abstractmethod
async def remove_relation(self):
- """
- """
+ """ """
# TODO
@abc.abstractmethod
async def deregister_execution_environments(self):
- """
- """
+ """ """
@abc.abstractmethod
async def delete_namespace(
the_table, the_filter, the_path, update_dict, vca_id=vca_id
)
else:
- self.on_update_db(the_table, the_filter, the_path, update_dict, vca_id=vca_id)
+ self.on_update_db(
+ the_table, the_filter, the_path, update_dict, vca_id=vca_id
+ )
except DbException as e:
if e.http_code == HTTPStatus.NOT_FOUND:
libjuju = await self._get_libjuju(vca_id)
for model_name in vcastatus:
# Adding executed actions
- vcastatus[model_name]["executedActions"] = \
- await libjuju.get_executed_actions(model_name)
+ vcastatus[model_name][
+ "executedActions"
+ ] = await libjuju.get_executed_actions(model_name)
for application in vcastatus[model_name]["applications"]:
# Adding application actions
- vcastatus[model_name]["applications"][application]["actions"] = \
- await libjuju.get_actions(application, model_name)
+ vcastatus[model_name]["applications"][application][
+ "actions"
+ ] = await libjuju.get_actions(application, model_name)
# Adding application configs
- vcastatus[model_name]["applications"][application]["configs"] = \
- await libjuju.get_application_configs(model_name, application)
+ vcastatus[model_name]["applications"][application][
+ "configs"
+ ] = await libjuju.get_application_configs(model_name, application)
except Exception as e:
self.log.debug("Error in updating vca status: {}".format(str(e)))
params.series = hw["series"]
params.instance_id = "manual:{}".format(self.host)
params.nonce = "manual:{}:{}".format(
- self.host, str(uuid.uuid4()),
+ self.host,
+ str(uuid.uuid4()),
) # a nop for Juju w/manual machines
params.hardware_characteristics = {
"arch": hw["arch"],
return params
- async def install_agent(self, connection, nonce, machine_id, proxy=None, series=None):
+ async def install_agent(
+ self, connection, nonce, machine_id, proxy=None, series=None
+ ):
"""
:param object connection: Connection to Juju API
:param str nonce: The nonce machine specification
- '[::1]:17070'
"""
if proxy:
- m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
+ m = re.search(
+ r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script
+ )
apiaddress = m.group(1)
"""Add IP Table rule
stdout, stderr = await self._run_configure_script(script)
break
except Exception as e:
- self.log.debug("Waiting for DNAT rules to be applied and saved, "
- "sleeping {} seconds".format(delay))
+ self.log.debug(
+ "Waiting for DNAT rules to be applied and saved, "
+ "sleeping {} seconds".format(delay)
+ )
if attempts > retry:
raise e
else:
"HELM_CACHE_HOME": "{}/.cache/helm".format(cluster_dir),
"HELM_CONFIG_HOME": "{}/.config/helm".format(cluster_dir),
"HELM_DATA_HOME": "{}/.local/share/helm".format(cluster_dir),
- "KUBECONFIG": "{}/.kube/config".format(cluster_dir)
+ "KUBECONFIG": "{}/.kube/config".format(cluster_dir),
}
- self.helm_conn = K8sHelm3Connector(self.fs, self.db,
- log=self.logger)
+ self.helm_conn = K8sHelm3Connector(self.fs, self.db, log=self.logger)
self.logger.debug("Set up executed")
@asynctest.fail_on(active_handles=True)
self.helm_conn.repo_add = asynctest.CoroutineMock()
k8scluster_uuid, installed = await self.helm_conn.init_env(
- k8s_creds, namespace=self.namespace, reuse_cluster_uuid=self.cluster_id)
+ k8s_creds, namespace=self.namespace, reuse_cluster_uuid=self.cluster_id
+ )
- self.assertEqual(k8scluster_uuid, "{}:{}".format(self.namespace, self.cluster_id),
- "Check cluster_uuid format: <namespace>.<cluster_id>")
+ self.assertEqual(
+ k8scluster_uuid,
+ "{}:{}".format(self.namespace, self.cluster_id),
+ "Check cluster_uuid format: <namespace>.<cluster_id>",
+ )
self.helm_conn._get_namespaces.assert_called_once_with(self.cluster_id)
- self.helm_conn._create_namespace.assert_called_once_with(self.cluster_id, self.namespace)
+ self.helm_conn._create_namespace.assert_called_once_with(
+ self.cluster_id, self.namespace
+ )
self.helm_conn.repo_list.assert_called_once_with(k8scluster_uuid)
self.helm_conn.repo_add.assert_called_once_with(
- k8scluster_uuid, "stable", "https://charts.helm.sh/stable")
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ k8scluster_uuid, "stable", "https://charts.helm.sh/stable"
+ )
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
self.logger.debug(f"cluster_uuid: {k8scluster_uuid}")
@asynctest.fail_on(active_handles=True)
await self.helm_conn.repo_add(self.cluster_uuid, repo_name, repo_url)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- self.assertEqual(self.helm_conn._local_async_exec.call_count, 2,
- "local_async_exec expected 2 calls, called {}".format(
- self.helm_conn._local_async_exec.call_count))
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ self.assertEqual(
+ self.helm_conn._local_async_exec.call_count,
+ 2,
+ "local_async_exec expected 2 calls, called {}".format(
+ self.helm_conn._local_async_exec.call_count
+ ),
+ )
repo_update_command = "/usr/bin/helm3 repo update"
repo_add_command = "/usr/bin/helm3 repo add {} {}".format(repo_name, repo_url)
calls = self.helm_conn._local_async_exec.call_args_list
call0_kargs = calls[0][1]
- self.assertEqual(call0_kargs.get("command"), repo_update_command,
- "Invalid repo update command: {}".format(call0_kargs.get("command")))
- self.assertEqual(call0_kargs.get("env"), self.env,
- "Invalid env for update command: {}".format(call0_kargs.get("env")))
+ self.assertEqual(
+ call0_kargs.get("command"),
+ repo_update_command,
+ "Invalid repo update command: {}".format(call0_kargs.get("command")),
+ )
+ self.assertEqual(
+ call0_kargs.get("env"),
+ self.env,
+ "Invalid env for update command: {}".format(call0_kargs.get("env")),
+ )
call1_kargs = calls[1][1]
- self.assertEqual(call1_kargs.get("command"), repo_add_command,
- "Invalid repo add command: {}".format(call1_kargs.get("command")))
- self.assertEqual(call1_kargs.get("env"), self.env,
- "Invalid env for add command: {}".format(call1_kargs.get("env")))
+ self.assertEqual(
+ call1_kargs.get("command"),
+ repo_add_command,
+ "Invalid repo add command: {}".format(call1_kargs.get("command")),
+ )
+ self.assertEqual(
+ call1_kargs.get("env"),
+ self.env,
+ "Invalid env for add command: {}".format(call1_kargs.get("env")),
+ )
@asynctest.fail_on(active_handles=True)
async def test_repo_list(self):
await self.helm_conn.repo_list(self.cluster_uuid)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
command = "/usr/bin/helm3 repo list --output yaml"
- self.helm_conn._local_async_exec.assert_called_with(command=command, env=self.env,
- raise_exception_on_error=False)
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
@asynctest.fail_on(active_handles=True)
async def test_repo_remove(self):
await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
command = "/usr/bin/helm3 repo remove {}".format(repo_name)
- self.helm_conn._local_async_exec.assert_called_with(command=command, env=self.env,
- raise_exception_on_error=True)
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, env=self.env, raise_exception_on_error=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_install(self):
self.kdu_instance,
atomic=True,
namespace=self.namespace,
- db_dict=db_dict
+ db_dict=db_dict,
)
self.helm_conn._get_namespaces.assert_called_once()
- self.helm_conn._create_namespace.assert_called_once_with(self.cluster_id, self.namespace)
+ self.helm_conn._create_namespace.assert_called_once_with(
+ self.cluster_id, self.namespace
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="install",
- run_once=True,
- check_every=0)
- command = "/usr/bin/helm3 install stable-openldap-0005399828 --atomic --output yaml " \
- "--timeout 300s --namespace testk8s stable/openldap --version 1.2.2"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=False)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ self.helm_conn._store_status.assert_called_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0,
+ )
+ command = (
+ "/usr/bin/helm3 install stable-openldap-0005399828 --atomic --output yaml "
+ "--timeout 300s --namespace testk8s stable/openldap --version 1.2.2"
+ )
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
@asynctest.fail_on(active_handles=True)
async def test_upgrade(self):
"name": kdu_instance,
"namespace": self.namespace,
"revision": 1,
- "status": "DEPLOYED"
+ "status": "DEPLOYED",
}
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+ return_value=instance_info
+ )
- await self.helm_conn.upgrade(self.cluster_uuid,
- kdu_instance,
- kdu_model,
- atomic=True,
- db_dict=db_dict)
+ await self.helm_conn.upgrade(
+ self.cluster_uuid, kdu_instance, kdu_model, atomic=True, db_dict=db_dict
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="upgrade",
- run_once=True,
- check_every=0)
- command = "/usr/bin/helm3 upgrade stable-openldap-0005399828 stable/openldap " \
- "--namespace testk8s --atomic --output yaml --timeout 300s " \
- "--version 1.2.3"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=False)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ self.helm_conn._store_status.assert_called_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="upgrade",
+ run_once=True,
+ check_every=0,
+ )
+ command = (
+ "/usr/bin/helm3 upgrade stable-openldap-0005399828 stable/openldap "
+ "--namespace testk8s --atomic --output yaml --timeout 300s "
+ "--version 1.2.3"
+ )
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
@asynctest.fail_on(active_handles=True)
async def test_rollback(self):
"name": kdu_instance,
"namespace": self.namespace,
"revision": 2,
- "status": "DEPLOYED"
+ "status": "DEPLOYED",
}
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+ return_value=instance_info
+ )
- await self.helm_conn.rollback(self.cluster_uuid,
- kdu_instance=kdu_instance,
- revision=1,
- db_dict=db_dict)
+ await self.helm_conn.rollback(
+ self.cluster_uuid, kdu_instance=kdu_instance, revision=1, db_dict=db_dict
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="rollback",
- run_once=True,
- check_every=0)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ self.helm_conn._store_status.assert_called_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="rollback",
+ run_once=True,
+ check_every=0,
+ )
command = "/usr/bin/helm3 rollback stable-openldap-0005399828 1 --namespace=testk8s --wait"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=False)
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
@asynctest.fail_on(active_handles=True)
async def test_uninstall(self):
"name": kdu_instance,
"namespace": self.namespace,
"revision": 3,
- "status": "DEPLOYED"
+ "status": "DEPLOYED",
}
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+ return_value=instance_info
+ )
await self.helm_conn.uninstall(self.cluster_uuid, kdu_instance)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
command = "/usr/bin/helm3 uninstall {} --namespace={}".format(
- kdu_instance, self.namespace)
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=True)
+ kdu_instance, self.namespace
+ )
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_get_services(self):
kdu_instance = "test_services_1"
- service = {
- "name": "testservice",
- "type": "LoadBalancer"
- }
- self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(return_value=("", 0))
+ service = {"name": "testservice", "type": "LoadBalancer"}
+ self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(
+ return_value=("", 0)
+ )
self.helm_conn._parse_services = Mock(return_value=["testservice"])
self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
- services = await self.helm_conn.get_services(self.cluster_uuid, kdu_instance,
- self.namespace)
+ services = await self.helm_conn.get_services(
+ self.cluster_uuid, kdu_instance, self.namespace
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
self.helm_conn._parse_services.assert_called_once()
- command1 = "/usr/bin/helm3 get manifest {} --namespace=testk8s".format(kdu_instance)
+ command1 = "/usr/bin/helm3 get manifest {} --namespace=testk8s".format(
+ kdu_instance
+ )
command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
- self.helm_conn._local_async_exec_pipe.assert_called_once_with(command1, command2,
- env=self.env,
- raise_exception_on_error=True)
- self.assertEqual(services, [service], "Invalid service returned from get_service")
+ self.helm_conn._local_async_exec_pipe.assert_called_once_with(
+ command1, command2, env=self.env, raise_exception_on_error=True
+ )
+ self.assertEqual(
+ services, [service], "Invalid service returned from get_service"
+ )
@asynctest.fail_on(active_handles=True)
async def test_get_service(self):
service_name = "service1"
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- await self.helm_conn.get_service(self.cluster_uuid, service_name, self.namespace)
+ await self.helm_conn.get_service(
+ self.cluster_uuid, service_name, self.namespace
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- command = "/usr/bin/kubectl --kubeconfig=./tmp/helm3_cluster_id/.kube/config " \
- "--namespace=testk8s get service service1 -o=yaml"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=True)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ command = (
+ "/usr/bin/kubectl --kubeconfig=./tmp/helm3_cluster_id/.kube/config "
+ "--namespace=testk8s get service service1 -o=yaml"
+ )
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_inspect_kdu(self):
repo_url = "https://kubernetes-charts.storage.googleapis.com/"
await self.helm_conn.inspect_kdu(kdu_model, repo_url)
- command = "/usr/bin/helm3 show all openldap --repo " \
- "https://kubernetes-charts.storage.googleapis.com/ " \
- "--version 1.2.4"
- self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+ command = (
+ "/usr/bin/helm3 show all openldap --repo "
+ "https://kubernetes-charts.storage.googleapis.com/ "
+ "--version 1.2.4"
+ )
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, encode_utf8=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_help_kdu(self):
repo_url = "https://kubernetes-charts.storage.googleapis.com/"
await self.helm_conn.help_kdu(kdu_model, repo_url)
- command = "/usr/bin/helm3 show readme openldap --repo " \
- "https://kubernetes-charts.storage.googleapis.com/ " \
- "--version 1.2.4"
- self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+ command = (
+ "/usr/bin/helm3 show readme openldap --repo "
+ "https://kubernetes-charts.storage.googleapis.com/ "
+ "--version 1.2.4"
+ )
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, encode_utf8=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_values_kdu(self):
repo_url = "https://kubernetes-charts.storage.googleapis.com/"
await self.helm_conn.values_kdu(kdu_model, repo_url)
- command = "/usr/bin/helm3 show values openldap --repo " \
- "https://kubernetes-charts.storage.googleapis.com/ " \
- "--version 1.2.4"
- self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+ command = (
+ "/usr/bin/helm3 show values openldap --repo "
+ "https://kubernetes-charts.storage.googleapis.com/ "
+ "--version 1.2.4"
+ )
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, encode_utf8=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_instances_list(self):
await self.helm_conn.instances_list(self.cluster_uuid)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
command = "/usr/bin/helm3 list --all-namespaces --output yaml"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=True)
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_status_kdu(self):
kdu_instance = "stable-openldap-0005399828"
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- await self.helm_conn._status_kdu(self.cluster_id, kdu_instance,
- self.namespace, return_text=True)
+ await self.helm_conn._status_kdu(
+ self.cluster_id, kdu_instance, self.namespace, return_text=True
+ )
command = "/usr/bin/helm3 status {} --namespace={} --output yaml".format(
kdu_instance, self.namespace
)
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=True,
- show_error_log=False)
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command,
+ env=self.env,
+ raise_exception_on_error=True,
+ show_error_log=False,
+ )
@asynctest.fail_on(active_handles=True)
async def test_store_status(self):
"description": "Install complete",
"status": {
"code": "1",
- "notes": "The openldap helm chart has been installed"
- }
+ "notes": "The openldap helm chart has been installed",
+ },
}
}
self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=status)
- self.helm_conn.write_app_status_to_db = asynctest.CoroutineMock(return_value=status)
-
- await self.helm_conn._store_status(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="install",
- run_once=True,
- check_every=0)
- self.helm_conn._status_kdu.assert_called_once_with(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- return_text=False)
- self.helm_conn.write_app_status_to_db.assert_called_once_with(db_dict=db_dict,
- status="Install complete",
- detailed_status=str(status),
- operation="install")
+ self.helm_conn.write_app_status_to_db = asynctest.CoroutineMock(
+ return_value=status
+ )
+
+ await self.helm_conn._store_status(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0,
+ )
+ self.helm_conn._status_kdu.assert_called_once_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ return_text=False,
+ )
+ self.helm_conn.write_app_status_to_db.assert_called_once_with(
+ db_dict=db_dict,
+ status="Install complete",
+ detailed_status=str(status),
+ operation="install",
+ )
@asynctest.fail_on(active_handles=True)
async def test_reset_uninstall_false(self):
await self.helm_conn.reset(self.cluster_uuid, force=False, uninstall_sw=False)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.file_delete.assert_called_once_with(self.cluster_id,
- ignore_non_exist=True)
+ self.helm_conn.fs.file_delete.assert_called_once_with(
+ self.cluster_id, ignore_non_exist=True
+ )
self.helm_conn._uninstall_sw.assert_not_called()
@asynctest.fail_on(active_handles=True)
async def test_reset_uninstall(self):
- kdu_instance = 'stable-openldap-0021099429'
+ kdu_instance = "stable-openldap-0021099429"
instances = [
{
- 'app_version': '2.4.48',
- 'chart': 'openldap-1.2.3',
- 'name': kdu_instance,
- 'namespace': self.namespace,
- 'revision': '1',
- 'status': 'deployed',
- 'updated': '2020-10-30 11:11:20.376744191 +0000 UTC'
+ "app_version": "2.4.48",
+ "chart": "openldap-1.2.3",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": "1",
+ "status": "deployed",
+ "updated": "2020-10-30 11:11:20.376744191 +0000 UTC",
}
]
self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
await self.helm_conn.reset(self.cluster_uuid, force=True, uninstall_sw=True)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.file_delete.assert_called_once_with(self.cluster_id,
- ignore_non_exist=True)
- self.helm_conn.instances_list.assert_called_once_with(cluster_uuid=self.cluster_uuid)
- self.helm_conn.uninstall.assert_called_once_with(cluster_uuid=self.cluster_uuid,
- kdu_instance=kdu_instance)
- self.helm_conn._uninstall_sw.assert_called_once_with(self.cluster_id, self.namespace)
+ self.helm_conn.fs.file_delete.assert_called_once_with(
+ self.cluster_id, ignore_non_exist=True
+ )
+ self.helm_conn.instances_list.assert_called_once_with(
+ cluster_uuid=self.cluster_uuid
+ )
+ self.helm_conn.uninstall.assert_called_once_with(
+ cluster_uuid=self.cluster_uuid, kdu_instance=kdu_instance
+ )
+ self.helm_conn._uninstall_sw.assert_called_once_with(
+ self.cluster_id, self.namespace
+ )
@asynctest.fail_on(active_handles=True)
async def test_sync_repos_add(self):
repo_list = [
{
"name": "stable",
- "url": "https://kubernetes-charts.storage.googleapis.com/"
+ "url": "https://kubernetes-charts.storage.googleapis.com/",
}
]
self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=repo_list)
if args[0] == "k8sclusters":
return {
"_admin": {
- "helm_chart_repos": [
- "4b5550a9-990d-4d95-8a48-1f4614d6ac9c"
- ]
+ "helm_chart_repos": ["4b5550a9-990d-4d95-8a48-1f4614d6ac9c"]
}
}
elif args[0] == "k8srepos":
"_id": "4b5550a9-990d-4d95-8a48-1f4614d6ac9c",
"type": "helm-chart",
"name": "bitnami",
- "url": "https://charts.bitnami.com/bitnami"
+ "url": "https://charts.bitnami.com/bitnami",
}
+
self.helm_conn.db.get_one = asynctest.Mock()
self.helm_conn.db.get_one.side_effect = get_one_result
self.helm_conn.repo_remove = asynctest.CoroutineMock()
deleted_repo_list, added_repo_dict = await self.helm_conn.synchronize_repos(
- self.cluster_uuid)
+ self.cluster_uuid
+ )
self.helm_conn.repo_remove.assert_not_called()
- self.helm_conn.repo_add.assert_called_once_with(self.cluster_uuid, "bitnami",
- "https://charts.bitnami.com/bitnami")
+ self.helm_conn.repo_add.assert_called_once_with(
+ self.cluster_uuid, "bitnami", "https://charts.bitnami.com/bitnami"
+ )
self.assertEqual(deleted_repo_list, [], "Deleted repo list should be empty")
- self.assertEqual(added_repo_dict,
- {"4b5550a9-990d-4d95-8a48-1f4614d6ac9c": "bitnami"},
- "Repos added should include only one bitnami")
+ self.assertEqual(
+ added_repo_dict,
+ {"4b5550a9-990d-4d95-8a48-1f4614d6ac9c": "bitnami"},
+ "Repos added should include only one bitnami",
+ )
@asynctest.fail_on(active_handles=True)
async def test_sync_repos_delete(self):
repo_list = [
{
"name": "stable",
- "url": "https://kubernetes-charts.storage.googleapis.com/"
+ "url": "https://kubernetes-charts.storage.googleapis.com/",
},
- {
- "name": "bitnami",
- "url": "https://charts.bitnami.com/bitnami"
- }
+ {"name": "bitnami", "url": "https://charts.bitnami.com/bitnami"},
]
self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=repo_list)
def get_one_result(*args, **kwargs):
if args[0] == "k8sclusters":
- return {
- "_admin": {
- "helm_chart_repos": []
- }
- }
+ return {"_admin": {"helm_chart_repos": []}}
self.helm_conn.db.get_one = asynctest.Mock()
self.helm_conn.db.get_one.side_effect = get_one_result
self.helm_conn.repo_remove = asynctest.CoroutineMock()
deleted_repo_list, added_repo_dict = await self.helm_conn.synchronize_repos(
- self.cluster_uuid)
+ self.cluster_uuid
+ )
self.helm_conn.repo_add.assert_not_called()
self.helm_conn.repo_remove.assert_called_once_with(self.cluster_uuid, "bitnami")
- self.assertEqual(deleted_repo_list, ["bitnami"], "Deleted repo list should be bitnami")
+ self.assertEqual(
+ deleted_repo_list, ["bitnami"], "Deleted repo list should be bitnami"
+ )
self.assertEqual(added_repo_dict, {}, "No repos should be added")
self.cluster_uuid = "{}:{}".format(self.namespace, self.cluster_id)
# pass fake kubectl and helm commands to make sure it does not call actual commands
K8sHelmConnector._check_file_exists = asynctest.Mock(return_value=True)
- K8sHelmConnector._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ K8sHelmConnector._local_async_exec = asynctest.CoroutineMock(
+ return_value=("", 0)
+ )
cluster_dir = self.fs.path + self.cluster_id
self.kube_config = self.fs.path + self.cluster_id + "/.kube/config"
self.helm_home = self.fs.path + self.cluster_id + "/.helm"
self.env = {
"HELM_HOME": "{}/.helm".format(cluster_dir),
- "KUBECONFIG": "{}/.kube/config".format(cluster_dir)
+ "KUBECONFIG": "{}/.kube/config".format(cluster_dir),
}
- self.helm_conn = K8sHelmConnector(self.fs, self.db,
- log=self.logger)
+ self.helm_conn = K8sHelmConnector(self.fs, self.db, log=self.logger)
self.logger.debug("Set up executed")
@asynctest.fail_on(active_handles=True)
await self.helm_conn.repo_add(self.cluster_uuid, repo_name, repo_url)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- self.assertEqual(self.helm_conn._local_async_exec.call_count, 2,
- "local_async_exec expected 2 calls, called {}".format(
- self.helm_conn._local_async_exec.call_count))
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ self.assertEqual(
+ self.helm_conn._local_async_exec.call_count,
+ 2,
+ "local_async_exec expected 2 calls, called {}".format(
+ self.helm_conn._local_async_exec.call_count
+ ),
+ )
repo_update_command = "/usr/bin/helm repo update"
repo_add_command = "/usr/bin/helm repo add {} {}".format(repo_name, repo_url)
calls = self.helm_conn._local_async_exec.call_args_list
call0_kargs = calls[0][1]
- self.assertEqual(call0_kargs.get("command"), repo_update_command,
- "Invalid repo update command: {}".format(call0_kargs.get("command")))
- self.assertEqual(call0_kargs.get("env"), self.env,
- "Invalid env for update command: {}".format(call0_kargs.get("env")))
+ self.assertEqual(
+ call0_kargs.get("command"),
+ repo_update_command,
+ "Invalid repo update command: {}".format(call0_kargs.get("command")),
+ )
+ self.assertEqual(
+ call0_kargs.get("env"),
+ self.env,
+ "Invalid env for update command: {}".format(call0_kargs.get("env")),
+ )
call1_kargs = calls[1][1]
- self.assertEqual(call1_kargs.get("command"), repo_add_command,
- "Invalid repo add command: {}".format(call1_kargs.get("command")))
- self.assertEqual(call1_kargs.get("env"), self.env,
- "Invalid env for add command: {}".format(call1_kargs.get("env")))
+ self.assertEqual(
+ call1_kargs.get("command"),
+ repo_add_command,
+ "Invalid repo add command: {}".format(call1_kargs.get("command")),
+ )
+ self.assertEqual(
+ call1_kargs.get("env"),
+ self.env,
+ "Invalid env for add command: {}".format(call1_kargs.get("env")),
+ )
@asynctest.fail_on(active_handles=True)
async def test_repo_list(self):
await self.helm_conn.repo_list(self.cluster_uuid)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
command = "/usr/bin/helm repo list --output yaml"
- self.helm_conn._local_async_exec.assert_called_with(command=command, env=self.env,
- raise_exception_on_error=False)
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
@asynctest.fail_on(active_handles=True)
async def test_repo_remove(self):
await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
command = "/usr/bin/helm repo remove {}".format(repo_name)
- self.helm_conn._local_async_exec.assert_called_once_with(command=command, env=self.env,
- raise_exception_on_error=True)
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_install(self):
kdu_instance,
atomic=True,
namespace=self.namespace,
- db_dict=db_dict
+ db_dict=db_dict,
)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="install",
- run_once=True,
- check_every=0)
- command = "/usr/bin/helm install --atomic --output yaml --timeout 300 " \
- "--name=stable-openldap-0005399828 --namespace testk8s stable/openldap " \
- "--version 1.2.2"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=False)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ self.helm_conn._store_status.assert_called_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0,
+ )
+ command = (
+ "/usr/bin/helm install --atomic --output yaml --timeout 300 "
+ "--name=stable-openldap-0005399828 --namespace testk8s stable/openldap "
+ "--version 1.2.2"
+ )
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
@asynctest.fail_on(active_handles=True)
async def test_upgrade(self):
"name": kdu_instance,
"namespace": self.namespace,
"revision": 1,
- "status": "DEPLOYED"
+ "status": "DEPLOYED",
}
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+ return_value=instance_info
+ )
- await self.helm_conn.upgrade(self.cluster_uuid,
- kdu_instance,
- kdu_model,
- atomic=True,
- db_dict=db_dict)
+ await self.helm_conn.upgrade(
+ self.cluster_uuid, kdu_instance, kdu_model, atomic=True, db_dict=db_dict
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="upgrade",
- run_once=True,
- check_every=0)
- command = "/usr/bin/helm upgrade --atomic --output yaml --timeout 300 " \
- "stable-openldap-0005399828 stable/openldap --version 1.2.3"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=False)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ self.helm_conn._store_status.assert_called_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="upgrade",
+ run_once=True,
+ check_every=0,
+ )
+ command = (
+ "/usr/bin/helm upgrade --atomic --output yaml --timeout 300 "
+ "stable-openldap-0005399828 stable/openldap --version 1.2.3"
+ )
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
@asynctest.fail_on(active_handles=True)
async def test_rollback(self):
"name": kdu_instance,
"namespace": self.namespace,
"revision": 2,
- "status": "DEPLOYED"
+ "status": "DEPLOYED",
}
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+ return_value=instance_info
+ )
- await self.helm_conn.rollback(self.cluster_uuid,
- kdu_instance=kdu_instance,
- revision=1,
- db_dict=db_dict)
+ await self.helm_conn.rollback(
+ self.cluster_uuid, kdu_instance=kdu_instance, revision=1, db_dict=db_dict
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn._store_status.assert_called_with(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="rollback",
- run_once=True,
- check_every=0)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ self.helm_conn._store_status.assert_called_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="rollback",
+ run_once=True,
+ check_every=0,
+ )
command = "/usr/bin/helm rollback stable-openldap-0005399828 1 --wait"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=False)
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
@asynctest.fail_on(active_handles=True)
async def test_uninstall(self):
"name": kdu_instance,
"namespace": self.namespace,
"revision": 3,
- "status": "DEPLOYED"
+ "status": "DEPLOYED",
}
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(return_value=instance_info)
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+ return_value=instance_info
+ )
await self.helm_conn.uninstall(self.cluster_uuid, kdu_instance)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
command = "/usr/bin/helm delete --purge {}".format(kdu_instance)
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=True)
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_get_services(self):
kdu_instance = "test_services_1"
- service = {
- "name": "testservice",
- "type": "LoadBalancer"
- }
- self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(return_value=("", 0))
+ service = {"name": "testservice", "type": "LoadBalancer"}
+ self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(
+ return_value=("", 0)
+ )
self.helm_conn._parse_services = Mock(return_value=["testservice"])
self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
- services = await self.helm_conn.get_services(self.cluster_uuid, kdu_instance,
- self.namespace)
+ services = await self.helm_conn.get_services(
+ self.cluster_uuid, kdu_instance, self.namespace
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
self.helm_conn._parse_services.assert_called_once()
command1 = "/usr/bin/helm get manifest {} ".format(kdu_instance)
command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
- self.helm_conn._local_async_exec_pipe.assert_called_once_with(command1, command2,
- env=self.env,
- raise_exception_on_error=True)
- self.assertEqual(services, [service], "Invalid service returned from get_service")
+ self.helm_conn._local_async_exec_pipe.assert_called_once_with(
+ command1, command2, env=self.env, raise_exception_on_error=True
+ )
+ self.assertEqual(
+ services, [service], "Invalid service returned from get_service"
+ )
@asynctest.fail_on(active_handles=True)
async def test_get_service(self):
service_name = "service1"
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- await self.helm_conn.get_service(self.cluster_uuid, service_name, self.namespace)
+ await self.helm_conn.get_service(
+ self.cluster_uuid, service_name, self.namespace
+ )
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
- command = "/usr/bin/kubectl --kubeconfig=./tmp/helm_cluster_id/.kube/config " \
- "--namespace=testk8s get service service1 -o=yaml"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=True)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
+ command = (
+ "/usr/bin/kubectl --kubeconfig=./tmp/helm_cluster_id/.kube/config "
+ "--namespace=testk8s get service service1 -o=yaml"
+ )
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_inspect_kdu(self):
repo_url = "https://kubernetes-charts.storage.googleapis.com/"
await self.helm_conn.inspect_kdu(kdu_model, repo_url)
- command = "/usr/bin/helm inspect openldap --repo " \
- "https://kubernetes-charts.storage.googleapis.com/ " \
- "--version 1.2.4"
- self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+ command = (
+ "/usr/bin/helm inspect openldap --repo "
+ "https://kubernetes-charts.storage.googleapis.com/ "
+ "--version 1.2.4"
+ )
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, encode_utf8=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_help_kdu(self):
repo_url = "https://kubernetes-charts.storage.googleapis.com/"
await self.helm_conn.help_kdu(kdu_model, repo_url)
- command = "/usr/bin/helm inspect readme openldap --repo " \
- "https://kubernetes-charts.storage.googleapis.com/ " \
- "--version 1.2.4"
- self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+ command = (
+ "/usr/bin/helm inspect readme openldap --repo "
+ "https://kubernetes-charts.storage.googleapis.com/ "
+ "--version 1.2.4"
+ )
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, encode_utf8=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_values_kdu(self):
repo_url = "https://kubernetes-charts.storage.googleapis.com/"
await self.helm_conn.values_kdu(kdu_model, repo_url)
- command = "/usr/bin/helm inspect values openldap --repo " \
- "https://kubernetes-charts.storage.googleapis.com/ " \
- "--version 1.2.4"
- self.helm_conn._local_async_exec.assert_called_with(command=command, encode_utf8=True)
+ command = (
+ "/usr/bin/helm inspect values openldap --repo "
+ "https://kubernetes-charts.storage.googleapis.com/ "
+ "--version 1.2.4"
+ )
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, encode_utf8=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_instances_list(self):
await self.helm_conn.instances_list(self.cluster_uuid)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_called_once_with(
+ from_path=self.cluster_id
+ )
command = "/usr/bin/helm list --output yaml"
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=True)
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command, env=self.env, raise_exception_on_error=True
+ )
@asynctest.fail_on(active_handles=True)
async def test_status_kdu(self):
kdu_instance = "stable-openldap-0005399828"
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- await self.helm_conn._status_kdu(self.cluster_id, kdu_instance,
- self.namespace, return_text=True)
+ await self.helm_conn._status_kdu(
+ self.cluster_id, kdu_instance, self.namespace, return_text=True
+ )
command = "/usr/bin/helm status {} --output yaml".format(kdu_instance)
- self.helm_conn._local_async_exec.assert_called_once_with(command=command,
- env=self.env,
- raise_exception_on_error=True,
- show_error_log=False)
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command=command,
+ env=self.env,
+ raise_exception_on_error=True,
+ show_error_log=False,
+ )
@asynctest.fail_on(active_handles=True)
async def test_store_status(self):
"description": "Install complete",
"status": {
"code": "1",
- "notes": "The openldap helm chart has been installed"
- }
+ "notes": "The openldap helm chart has been installed",
+ },
}
}
self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=status)
- self.helm_conn.write_app_status_to_db = asynctest.CoroutineMock(return_value=status)
-
- await self.helm_conn._store_status(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="install",
- run_once=True,
- check_every=0)
- self.helm_conn._status_kdu.assert_called_once_with(cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- return_text=False)
- self.helm_conn.write_app_status_to_db.assert_called_once_with(db_dict=db_dict,
- status="Install complete",
- detailed_status=str(status),
- operation="install")
+ self.helm_conn.write_app_status_to_db = asynctest.CoroutineMock(
+ return_value=status
+ )
+
+ await self.helm_conn._store_status(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="install",
+ run_once=True,
+ check_every=0,
+ )
+ self.helm_conn._status_kdu.assert_called_once_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ return_text=False,
+ )
+ self.helm_conn.write_app_status_to_db.assert_called_once_with(
+ db_dict=db_dict,
+ status="Install complete",
+ detailed_status=str(status),
+ operation="install",
+ )
@asynctest.fail_on(active_handles=True)
async def test_reset_uninstall_false(self):
await self.helm_conn.reset(self.cluster_uuid, force=False, uninstall_sw=False)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.file_delete.assert_called_once_with(self.cluster_id,
- ignore_non_exist=True)
+ self.helm_conn.fs.file_delete.assert_called_once_with(
+ self.cluster_id, ignore_non_exist=True
+ )
self.helm_conn._uninstall_sw.assert_not_called()
@asynctest.fail_on(active_handles=True)
async def test_reset_uninstall(self):
- kdu_instance = 'stable-openldap-0021099429'
+ kdu_instance = "stable-openldap-0021099429"
instances = [
{
- 'app_version': '2.4.48',
- 'chart': 'openldap-1.2.3',
- 'name': kdu_instance,
- 'namespace': self.namespace,
- 'revision': '1',
- 'status': 'deployed',
- 'updated': '2020-10-30 11:11:20.376744191 +0000 UTC'
+ "app_version": "2.4.48",
+ "chart": "openldap-1.2.3",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": "1",
+ "status": "deployed",
+ "updated": "2020-10-30 11:11:20.376744191 +0000 UTC",
}
]
self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
await self.helm_conn.reset(self.cluster_uuid, force=True, uninstall_sw=True)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.file_delete.assert_called_once_with(self.cluster_id,
- ignore_non_exist=True)
- self.helm_conn.instances_list.assert_called_once_with(cluster_uuid=self.cluster_uuid)
- self.helm_conn.uninstall.assert_called_once_with(cluster_uuid=self.cluster_uuid,
- kdu_instance=kdu_instance)
- self.helm_conn._uninstall_sw.assert_called_once_with(self.cluster_id, self.namespace)
+ self.helm_conn.fs.file_delete.assert_called_once_with(
+ self.cluster_id, ignore_non_exist=True
+ )
+ self.helm_conn.instances_list.assert_called_once_with(
+ cluster_uuid=self.cluster_uuid
+ )
+ self.helm_conn.uninstall.assert_called_once_with(
+ cluster_uuid=self.cluster_uuid, kdu_instance=kdu_instance
+ )
+ self.helm_conn._uninstall_sw.assert_called_once_with(
+ self.cluster_id, self.namespace
+ )
@asynctest.fail_on(active_handles=True)
async def test_uninstall_sw_namespace(self):
await self.helm_conn._uninstall_sw(self.cluster_id, self.namespace)
calls = self.helm_conn._local_async_exec.call_args_list
- self.assertEqual(len(calls), 3, "To uninstall should have executed three commands")
+ self.assertEqual(
+ len(calls), 3, "To uninstall should have executed three commands"
+ )
call0_kargs = calls[0][1]
- command_0 = "/usr/bin/helm --kubeconfig={} --home={} reset".format(self.kube_config,
- self.helm_home)
- self.assertEqual(call0_kargs,
- {"command": command_0,
- "raise_exception_on_error": True,
- "env": self.env}, "Invalid args for first call to local_exec")
+ command_0 = "/usr/bin/helm --kubeconfig={} --home={} reset".format(
+ self.kube_config, self.helm_home
+ )
+ self.assertEqual(
+ call0_kargs,
+ {"command": command_0, "raise_exception_on_error": True, "env": self.env},
+ "Invalid args for first call to local_exec",
+ )
call1_kargs = calls[1][1]
- command_1 = "/usr/bin/kubectl --kubeconfig={} delete " \
- "clusterrolebinding.rbac.authorization.k8s.io/osm-tiller-cluster-rule".\
- format(self.kube_config)
- self.assertEqual(call1_kargs,
- {"command": command_1,
- "raise_exception_on_error": False,
- "env": self.env}, "Invalid args for second call to local_exec")
+ command_1 = (
+ "/usr/bin/kubectl --kubeconfig={} delete "
+ "clusterrolebinding.rbac.authorization.k8s.io/osm-tiller-cluster-rule".format(
+ self.kube_config
+ )
+ )
+ self.assertEqual(
+ call1_kargs,
+ {"command": command_1, "raise_exception_on_error": False, "env": self.env},
+ "Invalid args for second call to local_exec",
+ )
call2_kargs = calls[2][1]
- command_2 = "/usr/bin/kubectl --kubeconfig={} --namespace kube-system delete " \
- "serviceaccount/{}".\
- format(self.kube_config, self.service_account)
- self.assertEqual(call2_kargs,
- {"command": command_2,
- "raise_exception_on_error": False,
- "env": self.env}, "Invalid args for third call to local_exec")
+ command_2 = (
+ "/usr/bin/kubectl --kubeconfig={} --namespace kube-system delete "
+ "serviceaccount/{}".format(self.kube_config, self.service_account)
+ )
+ self.assertEqual(
+ call2_kargs,
+ {"command": command_2, "raise_exception_on_error": False, "env": self.env},
+ "Invalid args for third call to local_exec",
+ )
self.k8s_juju_conn.libjuju.get_application_configs = AsyncMock()
def test_success(self):
- self.loop.run_until_complete(self.k8s_juju_conn.update_vca_status(
- self.vcaStatus, self.kdu_instance))
+ self.loop.run_until_complete(
+ self.k8s_juju_conn.update_vca_status(self.vcaStatus, self.kdu_instance)
+ )
self.k8s_juju_conn.libjuju.get_executed_actions.assert_called_once()
self.k8s_juju_conn.libjuju.get_actions.assert_called_once()
self.k8s_juju_conn.libjuju.get_application_configs.assert_called_once()
self.k8s_juju_conn.libjuju.get_model.return_value = None
self.k8s_juju_conn.libjuju.get_executed_actions.side_effect = Exception()
with self.assertRaises(Exception):
- self.loop.run_until_complete(self.k8s_juju_conn.update_vca_status(
- self.vcaStatus, self.kdu_instance))
+ self.loop.run_until_complete(
+ self.k8s_juju_conn.update_vca_status(self.vcaStatus, self.kdu_instance)
+ )
self.k8s_juju_conn.libjuju.get_executed_actions.assert_not_called()
self.k8s_juju_conn.libjuju.get_actions.assert_not_called_once()
self.k8s_juju_conn.libjuju.get_application_configs.assert_not_called_once()
self._scale = 2
self.k8s_juju_conn.libjuju.scale_application = AsyncMock()
- def test_success(
- self
- ):
+ def test_success(self):
self.loop.run_until_complete(
- self.k8s_juju_conn.scale(
- self.kdu_name,
- self._scale,
- self.application_name
- )
+ self.k8s_juju_conn.scale(self.kdu_name, self._scale, self.application_name)
)
self.k8s_juju_conn.libjuju.scale_application.assert_called_once()
with self.assertRaises(Exception):
self.loop.run_until_complete(
self.k8s_juju_conn.scale(
- self.kdu_name,
- self._scale,
- self.application_name
+ self.kdu_name, self._scale, self.application_name
)
)
self.k8s_juju_conn.libjuju.scale_application.assert_called_once()
executed_actions = self.loop.run_until_complete(
self.libjuju.get_executed_actions("model")
)
- expected_result = [{'id': 'id', 'action': 'action_name',
- 'status': 'status', 'output': 'completed'}]
+ expected_result = [
+ {
+ "id": "id",
+ "action": "action_name",
+ "status": "status",
+ "output": "completed",
+ }
+ ]
self.assertListEqual(expected_result, executed_actions)
self.assertIsInstance(executed_actions, list)
mock_get_model.return_value = None
with self.assertRaises(JujuError):
self.loop.run_until_complete(
- self.libjuju.get_application_configs("model", "app"))
+ self.libjuju.get_application_configs("model", "app")
+ )
mock_get_controller.assert_called_once()
mock_disconnect_controller.assert_called_once()
mock_get_controller,
):
mock_get_application.return_value = FakeApplication()
- application_configs = self.loop.run_until_complete(self.libjuju
- .get_application_configs("model", "app"))
+ application_configs = self.loop.run_until_complete(
+ self.libjuju.get_application_configs("model", "app")
+ )
self.assertEqual(application_configs, ["app_config"])
super(DestroyApplicationTest, self).setUp()
def test_success(
- self,
- mock_get_controller,
- mock_get_model,
- mock_disconnect_controller,
- mock_get_application,
- mock_disconnect_model,
+ self,
+ mock_get_controller,
+ mock_get_model,
+ mock_disconnect_controller,
+ mock_get_application,
+ mock_disconnect_model,
):
mock_get_application.return_value = FakeApplication()
mock_get_model.return_value = None
mock_disconnect_model.assert_called_once()
def test_no_application(
- self,
- mock_get_controller,
- mock_get_model,
- mock_disconnect_controller,
- mock_get_application,
- mock_disconnect_model,
+ self,
+ mock_get_controller,
+ mock_get_model,
+ mock_disconnect_controller,
+ mock_get_application,
+ mock_disconnect_model,
):
mock_get_model.return_value = None
mock_get_application.return_value = None
mock_get_application.assert_called()
def test_exception(
- self,
- mock_get_controller,
- mock_get_model,
- mock_disconnect_controller,
- mock_get_application,
- mock_disconnect_model,
+ self,
+ mock_get_controller,
+ mock_get_model,
+ mock_disconnect_controller,
+ mock_get_application,
+ mock_disconnect_model,
):
mock_get_application.return_value = FakeApplication
mock_get_model.return_value = None
):
mock_get_model.return_value = juju.model.Model()
mock_get_application.return_value = FakeApplication()
- self.loop.run_until_complete(
- self.libjuju.scale_application(
- "model",
- "app",
- 2
- )
- )
+ self.loop.run_until_complete(self.libjuju.scale_application("model", "app", 2))
mock_wait_for_model.assert_called_once()
mock_disconnect_controller.assert_called_once()
mock_disconnect_model.assert_called_once()
mock_get_model.return_value = juju.model.Model()
with self.assertRaises(JujuApplicationNotFound):
self.loop.run_until_complete(
- self.libjuju.scale_application(
- "model",
- "app",
- 2
- )
+ self.libjuju.scale_application("model", "app", 2)
)
mock_disconnect_controller.assert_called()
mock_disconnect_model.assert_called()
def test_exception(
- self,
- mock_wait_for,
- mock_disconnect_controller,
- mock_disconnect_model,
- mock_get_application,
- mock_get_model,
- mock_get_controller,
+ self,
+ mock_wait_for,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_application,
+ mock_get_model,
+ mock_get_controller,
):
mock_get_model.return_value = None
mock_get_application.return_value = FakeApplication()
with self.assertRaises(Exception):
self.loop.run_until_complete(
- self.libjuju.scale_application(
- "model",
- "app",
- 2,
- total_timeout=0
- )
+ self.libjuju.scale_application("model", "app", 2, total_timeout=0)
)
mock_disconnect_controller.assert_called_once()
try:
cacert = base64.b64decode(b64string).decode("utf-8")
- cacert = re.sub(r"\\n", r"\n", cacert,)
+ cacert = re.sub(
+ r"\\n",
+ r"\n",
+ cacert,
+ )
except binascii.Error as e:
raise N2VCInvalidCertificate(message="Invalid CA Certificate: {}".format(e))
packages=find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests"]),
include_package_data=True,
entry_points={"console_scripts": []},
- setup_requires=['setuptools-version-command']
+ setup_requires=["setuptools-version-command"],
)
skip_install = true
commands =
- black --check --diff n2vc/
+ - black --check --diff setup.py
#######################################################################################