self.log.info("K8S Helm3 connector initialized")
+ async def install(
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
+ ):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ # for helm3 if namespace does not exist must create it
+ if namespace and namespace != "kube-system":
+ namespaces = await self._get_namespaces(cluster_id)
+ if namespace not in namespaces:
+ await self._create_namespace(cluster_id, namespace)
+
+ kdu_instance = await self._install_impl(cluster_id,
+ kdu_model,
+ paths,
+ env,
+ atomic=atomic,
+ timeout=timeout,
+ params=params,
+ db_dict=db_dict,
+ kdu_name=kdu_name,
+ namespace=namespace)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ self.log.debug("Returning kdu_instance {}".format(kdu_instance))
+ return kdu_instance
+
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
self.log.debug(
return "{} uninstall {} --namespace={}".format(
self._helm_command, kdu_instance, namespace)
+
+ def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+ repo_ids = []
+ cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
+ cluster = self.db.get_one("k8sclusters", cluster_filter)
+ if cluster:
+ repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
+ return repo_ids
+ else:
+ raise K8sException(
+ "k8cluster with helm-id : {} not found".format(cluster_uuid)
+ )
return True
- async def install(
+ async def _install_impl(
self,
- cluster_uuid: str,
+ cluster_id: str,
kdu_model: str,
+ paths: dict,
+ env: dict,
atomic: bool = True,
timeout: float = 300,
params: dict = None,
kdu_name: str = None,
namespace: str = None,
):
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- # init env, paths
- paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
# params to str
params_str, file_to_delete = self._params_to_file_option(
cluster_id=cluster_id, params=params
self.log.error(msg)
raise K8sException(msg)
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- self.log.debug("Returning kdu_instance {}".format(kdu_instance))
return kdu_instance
async def upgrade(
self.log.error("Error synchronizing repos: {}".format(e))
raise Exception("Error synchronizing repos: {}".format(e))
- def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
- repo_ids = []
- cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
- cluster = self.db.get_one("k8sclusters", cluster_filter)
- if cluster:
- repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
- return repo_ids
- else:
- raise K8sException(
- "k8cluster with helm-id : {} not found".format(cluster_uuid)
- )
-
def _get_db_repos_dict(self, repo_ids: list):
db_repos_dict = {}
for repo_id in repo_ids:
For Helm v3 it does nothing and does not need to be callled
"""
+ @abc.abstractmethod
+ def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+ """
+ Obtains the cluster repos identifiers
+ """
+
"""
####################################################################################
################################### P R I V A T E ##################################
self.log.info("K8S Helm2 connector initialized")
+ async def install(
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
+ ):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ kdu_instance = await self._install_impl(cluster_id,
+ kdu_model,
+ paths,
+ env,
+ atomic=atomic,
+ timeout=timeout,
+ params=params,
+ db_dict=db_dict,
+ kdu_name=kdu_name,
+ namespace=namespace)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ self.log.debug("Returning kdu_instance {}".format(kdu_instance))
+ return kdu_instance
+
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
self.log.debug(
return data
+ def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+ repo_ids = []
+ cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
+ cluster = self.db.get_one("k8sclusters", cluster_filter)
+ if cluster:
+ repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
+ return repo_ids
+ else:
+ raise K8sException(
+ "k8cluster with helm-id : {} not found".format(cluster_uuid)
+ )
+
async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
status = await self._status_kdu(
self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=None)
self.helm_conn._store_status = asynctest.CoroutineMock()
self.helm_conn._generate_release_name = Mock(return_value="stable-openldap-0005399828")
+ self.helm_conn._get_namespaces = asynctest.CoroutineMock(return_value=[])
+ self.helm_conn._create_namespace = asynctest.CoroutineMock()
kdu_instance = await self.helm_conn.install(self.cluster_uuid,
kdu_model,
namespace=self.namespace,
db_dict=db_dict)
+ self.helm_conn._get_namespaces.assert_called_once()
+ self.helm_conn._create_namespace.assert_called_once_with(self.cluster_id, self.namespace)
self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
self.helm_conn.fs.reverse_sync.assert_called_once_with(from_path=self.cluster_id)
self.helm_conn._generate_release_name.assert_called_once_with("stable/openldap")