####################################################################################
"""
service_account = "osm"
+ _STABLE_REPO_URL = "https://charts.helm.sh/stable"
def __init__(
self,
helm_command: str = "/usr/bin/helm",
log: object = None,
on_update_db=None,
+ vca_config: dict = None,
):
"""
self._helm_command = helm_command
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+ # obtain stable repo url from config or apply default
+ if not vca_config or not vca_config.get("stablerepourl"):
+ self._stable_repo_url = self._STABLE_REPO_URL
+ else:
+ self._stable_repo_url = vca_config.get("stablerepourl")
+
@staticmethod
def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
"""
return namespace, cluster_id
async def init_env(
- self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
+ self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Charts
:param namespace: optional namespace to be used for helm. By default,
'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
+ :param kwargs: Additional parameters (None yet)
:return: uuid of the K8s cluster and True if connector has installed some
software in the cluster
(on error, an exception will be raised)
self.fs.reverse_sync(from_path=cluster_id)
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False, **kwargs
) -> bool:
+ """Reset a cluster
+
+ Resets the Kubernetes cluster by removing the helm deployment that represents it.
+ :param cluster_uuid: The UUID of the cluster to reset
+ :param force: Boolean to force the reset
+ :param uninstall_sw: Boolean to force the reset
+ :param kwargs: Additional parameters (None yet)
+ :return: Returns True if successful or raises an exception.
+ """
namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
.format(cluster_id, uninstall_sw))
return True
- async def install(
+ async def _install_impl(
self,
- cluster_uuid: str,
+ cluster_id: str,
kdu_model: str,
+ paths: dict,
+ env: dict,
+ kdu_instance: str,
atomic: bool = True,
timeout: float = 300,
params: dict = None,
kdu_name: str = None,
namespace: str = None,
):
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- # init env, paths
- paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
# params to str
params_str, file_to_delete = self._params_to_file_option(
cluster_id=cluster_id, params=params
version = str(parts[1])
kdu_model = parts[0]
- # generate a name for the release. Then, check if already exists
- kdu_instance = None
- while kdu_instance is None:
- kdu_instance = self._generate_release_name(kdu_model)
- try:
- result = await self._status_kdu(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- namespace=namespace,
- show_error_log=False,
- )
- if result is not None:
- # instance already exists: generate a new one
- kdu_instance = None
- except K8sException:
- pass
-
command = self._get_install_command(kdu_model, kdu_instance, namespace,
params_str, version, atomic, timeout)
self.log.error(msg)
raise K8sException(msg)
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- self.log.debug("Returning kdu_instance {}".format(kdu_instance))
- return kdu_instance
-
async def upgrade(
self,
cluster_uuid: str,
else:
return 0
+ async def scale(
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
+ ):
+ raise NotImplementedError("Method not implemented")
+
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
+ ):
+ raise NotImplementedError("Method not implemented")
+
async def rollback(
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
else:
return 0
- async def uninstall(self, cluster_uuid: str, kdu_instance: str):
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
"""
Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
(this call should happen after all _terminate-config-primitive_ of the VNF
:param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
:param kdu_instance: unique name for the KDU instance to be deleted
+ :param kwargs: Additional parameters (None yet)
:return: True if successful
"""
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ **kwargs,
) -> str:
"""Exec primitive (Juju action)
:param timeout: Timeout for action execution
:param params: Dictionary of all the parameters needed for the action
:db_dict: Dictionary for any additional data
+ :param kwargs: Additional parameters (None yet)
:return: Returns the output of the action
"""
return service
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
+ """
+ This call would retrieve tha current state of a given KDU instance. It would be
+ would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
+ values_ of the configuration parameters applied to a given instance. This call
+ would be based on the `status` call.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :param kwargs: Additional parameters (None yet)
+ :return: If successful, it will return the following vector of arguments:
+ - K8s `namespace` in the cluster where the KDU lives
+ - `state` of the KDU instance. It can be:
+ - UNKNOWN
+ - DEPLOYED
+ - DELETED
+ - SUPERSEDED
+ - FAILED or
+ - DELETING
+ - List of `resources` (objects) that this release consists of, sorted by kind,
+ and the status of those resources
+ - Last `deployment_time`.
+ """
self.log.debug(
"status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
cluster_uuid, kdu_instance
self.log.error("Error synchronizing repos: {}".format(e))
raise Exception("Error synchronizing repos: {}".format(e))
- def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
- repo_ids = []
- cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
- cluster = self.db.get_one("k8sclusters", cluster_filter)
- if cluster:
- repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
- return repo_ids
- else:
- raise K8sException(
- "k8cluster with helm-id : {} not found".format(cluster_uuid)
- )
-
def _get_db_repos_dict(self, repo_ids: list):
db_repos_dict = {}
for repo_id in repo_ids:
For Helm v3 it does nothing and does not need to be callled
"""
+ @abc.abstractmethod
+ def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+ """
+ Obtains the cluster repos identifiers
+ """
+
"""
####################################################################################
################################### P R I V A T E ##################################
return params_str
@staticmethod
- def _generate_release_name(chart_name: str):
+ def generate_kdu_instance_name(**kwargs):
+ chart_name = kwargs["kdu_model"]
# check embeded chart (file or dir)
if chart_name.startswith("/"):
# extract file or directory name