return namespace, cluster_id
async def init_env(
- self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
+ self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Charts
:param namespace: optional namespace to be used for helm. By default,
'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
+ :param kwargs: Additional parameters (None yet)
:return: uuid of the K8s cluster and True if connector has installed some
software in the cluster
(on error, an exception will be raised)
self.fs.reverse_sync(from_path=cluster_id)
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False, **kwargs
) -> bool:
+ """Reset a cluster
+ Resets the Kubernetes cluster by removing the helm deployment that represents it.
+
+ :param cluster_uuid: The UUID of the cluster to reset
+ :param force: Boolean to force the reset
+ :param uninstall_sw: Boolean to force the reset
+ :param kwargs: Additional parameters (None yet)
+ :return: Returns True if successful or raises an exception.
+ """
namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
.format(cluster_id, uninstall_sw))
kdu_model: str,
paths: dict,
env: dict,
+ kdu_instance: str,
atomic: bool = True,
timeout: float = 300,
params: dict = None,
version = str(parts[1])
kdu_model = parts[0]
- # generate a name for the release. Then, check if already exists
- kdu_instance = None
- while kdu_instance is None:
- kdu_instance = self._generate_release_name(kdu_model)
- try:
- result = await self._status_kdu(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- namespace=namespace,
- show_error_log=False,
- )
- if result is not None:
- # instance already exists: generate a new one
- kdu_instance = None
- except K8sException:
- pass
-
command = self._get_install_command(kdu_model, kdu_instance, namespace,
params_str, version, atomic, timeout)
self.log.error(msg)
raise K8sException(msg)
- return kdu_instance
-
async def upgrade(
self,
cluster_uuid: str,
else:
return 0
- async def uninstall(self, cluster_uuid: str, kdu_instance: str):
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
"""
Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
(this call should happen after all _terminate-config-primitive_ of the VNF
:param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
:param kdu_instance: unique name for the KDU instance to be deleted
+ :param kwargs: Additional parameters (None yet)
:return: True if successful
"""
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ **kwargs,
) -> str:
"""Exec primitive (Juju action)
:param timeout: Timeout for action execution
:param params: Dictionary of all the parameters needed for the action
:db_dict: Dictionary for any additional data
+ :param kwargs: Additional parameters (None yet)
:return: Returns the output of the action
"""
return service
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
+ """
+ This call would retrieve tha current state of a given KDU instance. It would be
+ would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
+ values_ of the configuration parameters applied to a given instance. This call
+ would be based on the `status` call.
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :param kwargs: Additional parameters (None yet)
+ :return: If successful, it will return the following vector of arguments:
+ - K8s `namespace` in the cluster where the KDU lives
+ - `state` of the KDU instance. It can be:
+ - UNKNOWN
+ - DEPLOYED
+ - DELETED
+ - SUPERSEDED
+ - FAILED or
+ - DELETING
+ - List of `resources` (objects) that this release consists of, sorted by kind,
+ and the status of those resources
+ - Last `deployment_time`.
+
+ """
self.log.debug(
"status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
cluster_uuid, kdu_instance
return params_str
@staticmethod
- def _generate_release_name(chart_name: str):
+ def generate_kdu_instance_name(**kwargs):
+ chart_name = kwargs["kdu_model"]
# check embeded chart (file or dir)
if chart_name.startswith("/"):
# extract file or directory name