##
import abc
import asyncio
+from typing import Union
import random
import time
import shlex
if self._stable_repo_url == "None":
self._stable_repo_url = None
- def _get_namespace(self, cluster_uuid: str) -> str:
+ @staticmethod
+ def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
"""
- Obtains the namespace used by the cluster with the uuid passed by argument
-
- param: cluster_uuid: cluster's uuid
+ Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
+ cluster_id for backward compatibility
"""
-
- # first, obtain the cluster corresponding to the uuid passed by argument
- k8scluster = self.db.get_one(
- "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
- )
- return k8scluster.get("namespace")
+ namespace, _, cluster_id = cluster_uuid.rpartition(":")
+ return namespace, cluster_id
async def init_env(
self,
"""
if reuse_cluster_uuid:
- cluster_id = reuse_cluster_uuid
+ namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
+ namespace = namespace_ or namespace
else:
cluster_id = str(uuid4())
+ cluster_uuid = "{}:{}".format(namespace, cluster_id)
self.log.debug(
"Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
self.log.info("Cluster {} initialized".format(cluster_id))
- return cluster_id, n2vc_installed_sw
+ return cluster_uuid, n2vc_installed_sw
async def repo_add(
self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"Cluster {}, adding {} repository {}. URL: {}".format(
- cluster_uuid, repo_type, name, url
+ cluster_id, repo_type, name, url
)
)
# init_env
paths, env = self._init_paths_env(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
+
+ # helm repo add name url
+ command = "env KUBECONFIG={} {} repo add {} {}".format(
+ paths["kube_config"], self._helm_command, name, url
+ )
+ self.log.debug("adding repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
# helm repo update
- command = "env KUBECONFIG={} {} repo update".format(
- paths["kube_config"], self._helm_command
+ command = "env KUBECONFIG={} {} repo update {}".format(
+ paths["kube_config"], self._helm_command, name
)
self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
- # helm repo add name url
- command = "env KUBECONFIG={} {} repo add {} {}".format(
- paths["kube_config"], self._helm_command, name, url
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ async def repo_update(self, cluster_id: str, name: str, repo_type: str = "chart"):
+ self.log.debug(
+ "Cluster {}, updating {} repository {}".format(cluster_id, repo_type, name)
)
- self.log.debug("adding repo: {}".format(command))
+
+ # init_env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # helm repo update
+ command = "{} repo update {}".format(self._helm_command, name)
+ self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(
- command=command, raise_exception_on_error=True, env=env
+ command=command, raise_exception_on_error=False, env=env
)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
async def repo_list(self, cluster_uuid: str) -> list:
"""
:return: list of registered repositories: [ (name, url) .... ]
"""
- self.log.debug("list repositories for cluster {}".format(cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list repositories for cluster {}".format(cluster_id))
# config filename
paths, env = self._init_paths_env(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
command = "env KUBECONFIG={} {} repo list --output yaml".format(
paths["kube_config"], self._helm_command
)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
if _rc == 0:
if output and len(output) > 0:
return []
async def repo_remove(self, cluster_uuid: str, name: str):
- self.log.debug(
- "remove {} repositories for cluster {}".format(name, cluster_uuid)
- )
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
command = "env KUBECONFIG={} {} repo remove {}".format(
paths["kube_config"], self._helm_command, name
)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
async def reset(
self,
:param kwargs: Additional parameters (None yet)
:return: Returns True if successful or raises an exception.
"""
- namespace = self._get_namespace(cluster_uuid=cluster_uuid)
+ namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"Resetting K8s environment. cluster uuid: {} uninstall={}".format(
- cluster_uuid, uninstall_sw
+ cluster_id, uninstall_sw
)
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
# uninstall releases if needed.
if uninstall_sw:
else:
msg = (
"Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
- ).format(cluster_uuid)
+ ).format(cluster_id)
self.log.warn(msg)
uninstall_sw = (
False # Allow to remove k8s cluster without removing Tiller
)
if uninstall_sw:
- await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
+ await self._uninstall_sw(cluster_id, namespace)
# delete cluster directory
- self.log.debug("Removing directory {}".format(cluster_uuid))
- self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
+ self.log.debug("Removing directory {}".format(cluster_id))
+ self.fs.file_delete(cluster_id, ignore_non_exist=True)
# Remove also local directorio if still exist
- direct = self.fs.path + "/" + cluster_uuid
+ direct = self.fs.path + "/" + cluster_id
shutil.rmtree(direct, ignore_errors=True)
return True
version = str(parts[1])
kdu_model = parts[0]
+ repo = self._split_repo(kdu_model)
+ if repo:
+ await self.repo_update(cluster_id, repo)
+
command = self._get_install_command(
kdu_model,
kdu_instance,
namespace=namespace,
db_dict=db_dict,
operation="install",
- run_once=False,
)
)
namespace=namespace,
db_dict=db_dict,
operation="install",
- run_once=True,
- check_every=0,
)
if rc != 0:
params: dict = None,
db_dict: dict = None,
):
- self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
# look for instance to obtain namespace
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
# params to str
params_str, file_to_delete = self._params_to_file_option(
- cluster_id=cluster_uuid, params=params
+ cluster_id=cluster_id, params=params
)
# version
version = str(parts[1])
kdu_model = parts[0]
+ repo = self._split_repo(kdu_model)
+ if repo:
+ await self.repo_update(cluster_id, repo)
+
command = self._get_upgrade_command(
kdu_model,
kdu_instance,
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_id=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="upgrade",
- run_once=False,
)
)
# write final status
await self._store_status(
- cluster_id=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="upgrade",
- run_once=True,
- check_every=0,
)
if rc != 0:
raise K8sException(msg)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
# return new revision number
instance = await self.get_instance_info(
async def rollback(
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"rollback kdu_instance {} to revision {} from cluster {}".format(
- kdu_instance, revision, cluster_uuid
+ kdu_instance, revision, cluster_id
)
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
# look for instance to obtain namespace
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
command = self._get_rollback_command(
kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_id=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="rollback",
- run_once=False,
)
)
# write final status
await self._store_status(
- cluster_id=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="rollback",
- run_once=True,
- check_every=0,
)
if rc != 0:
raise K8sException(msg)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
# return new revision number
instance = await self.get_instance_info(
:return: True if successful
"""
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
- "uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_uuid
- )
+ "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
# look for instance to obtain namespace
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
return True
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
command = self._get_uninstall_command(
kdu_instance, instance_info["namespace"], paths["kube_config"]
)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
return self._output_to_table(output)
:return:
"""
- self.log.debug("list releases for cluster {}".format(cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list releases for cluster {}".format(cluster_id))
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
# execute internal command
- result = await self._instances_list(cluster_uuid)
+ result = await self._instances_list(cluster_id)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
return result
- `external_ip` List of external ips (in case they are available)
"""
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"get_services: cluster_uuid: {}, kdu_instance: {}".format(
cluster_uuid, kdu_instance
# init env, paths
paths, env = self._init_paths_env(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
# get list of services names for kdu
service_names = await self._get_services(
- cluster_uuid, kdu_instance, namespace, paths["kube_config"]
+ cluster_id, kdu_instance, namespace, paths["kube_config"]
)
service_list = []
for service in service_names:
- service = await self._get_service(cluster_uuid, service, namespace)
+ service = await self._get_service(cluster_id, service, namespace)
service_list.append(service)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
return service_list
)
)
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
- service = await self._get_service(cluster_uuid, service_name, namespace)
+ service = await self._get_service(cluster_id, service_name, namespace)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
return service
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
+ async def status_kdu(
+ self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
+ ) -> Union[str, dict]:
"""
This call would retrieve tha current state of a given KDU instance. It would be
would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
:param kwargs: Additional parameters (None yet)
+ :param yaml_format: if the return shall be returned as an YAML string or as a
+ dictionary
:return: If successful, it will return the following vector of arguments:
- K8s `namespace` in the cluster where the KDU lives
- `state` of the KDU instance. It can be:
)
)
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+
# sync local dir
- self.fs.sync(from_path=cluster_uuid)
+ self.fs.sync(from_path=cluster_id)
# get instance: needed to obtain namespace
- instances = await self._instances_list(cluster_id=cluster_uuid)
+ instances = await self._instances_list(cluster_id=cluster_id)
for instance in instances:
if instance.get("name") == kdu_instance:
break
# instance does not exist
raise K8sException(
"Instance name: {} not found in cluster: {}".format(
- kdu_instance, cluster_uuid
+ kdu_instance, cluster_id
)
)
status = await self._status_kdu(
- cluster_id=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
namespace=instance["namespace"],
+ yaml_format=yaml_format,
show_error_log=True,
- return_text=True,
)
# sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
+ self.fs.reverse_sync(from_path=cluster_id)
return status
cluster_id: str,
kdu_instance: str,
namespace: str = None,
+ yaml_format: bool = False,
show_error_log: bool = False,
- return_text: bool = False,
- ):
+ ) -> Union[str, dict]:
"""
Implements the helm version dependent method to obtain status of a helm instance
"""
operation: str,
kdu_instance: str,
namespace: str = None,
- check_every: float = 10,
db_dict: dict = None,
- run_once: bool = False,
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self._status_kdu(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- namespace=namespace,
- return_text=False,
- )
- status = detailed_status.get("info").get("description")
- self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation,
- )
- if not result:
- self.log.info("Error writing in database. Task exiting...")
- return
- except asyncio.CancelledError:
- self.log.debug("Task cancelled")
- return
- except Exception as e:
- self.log.debug(
- "_store_status exception: {}".format(str(e)), exc_info=True
- )
- pass
- finally:
- if run_once:
- return
+ ) -> None:
+ """
+ Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
+
+ :param cluster_id (str): the cluster where the KDU instance is deployed
+ :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
+ :param kdu_instance (str): The KDU instance in relation to which the status is obtained
+ :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
+ :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
+ values for the keys:
+ - "collection": The Mongo DB collection to write to
+ - "filter": The query filter to use in the update process
+ - "path": The dot separated keys which targets the object to be updated
+ Defaults to None.
+ """
+
+ try:
+ detailed_status = await self._status_kdu(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ yaml_format=False,
+ namespace=namespace,
+ )
+
+ status = detailed_status.get("info").get("description")
+ self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
+
+ # write status to db
+ result = await self.write_app_status_to_db(
+ db_dict=db_dict,
+ status=str(status),
+ detailed_status=str(detailed_status),
+ operation=operation,
+ )
+
+ if not result:
+ self.log.info("Error writing in database. Task exiting...")
+
+ except asyncio.CancelledError as e:
+ self.log.warning(
+ f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
+ )
+ except Exception as e:
+ self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
name = name + get_random_number()
return name.lower()
+
+ def _split_repo(self, kdu_model: str) -> str:
+ repo_name = None
+ idx = kdu_model.find("/")
+ if idx >= 0:
+ repo_name = kdu_model[:idx]
+ return repo_name