from pyrage import x25519
+import asyncio
+import traceback
import yaml
import base64
+from math import ceil
def gather_age_key(cluster):
return False, f"Error while cleaning items: {e}"
+async def check_kustomization_ready(self, kustomization_name, namespace, timeout):
+ self.logger.info("check_kustomization_ready Enter")
+ self.logger.info(
+ f"Kustomization {kustomization_name}. Namespace: {namespace}. Timeout: {timeout}"
+ )
+ counter = 1
+ retry_time = self._odu_checkloop_retry_time
+ max_iterations = ceil(timeout / retry_time)
+ while counter <= max_iterations:
+ kustomization = await self._kubectl.get_generic_object(
+ api_group="kustomize.toolkit.fluxcd.io",
+ api_plural="kustomizations",
+ api_version="v1",
+ namespace=namespace,
+ name=kustomization_name,
+ )
+ if kustomization:
+ # self.logger.debug(f"{yaml.safe_dump(kustomization)}")
+ conditions = kustomization.get("status", {}).get("conditions", [])
+ else:
+ conditions = []
+ self.logger.info(f"Kustomization status conditions: {conditions}")
+ result = next((item for item in conditions if item["type"] == "Ready"), {})
+ if result.get("status", "False") == "True":
+ self.logger.info(
+ f"Kustomization {kustomization_name} ready in {counter} iterations (aprox {counter*retry_time} seconds)"
+ )
+ return True, "COMPLETED"
+ await asyncio.sleep(retry_time)
+ counter += 1
+ return (
+ False,
+ "Kustomization {kustomization_name} did not complete after {max_iterations} iterations (aprox {timeout} seconds)",
+ )
+
+
+async def check_cluster_flag(self, cluster_name, cloud_type, flag, timeout):
+ self.logger.info("check_cluster_flag Enter")
+ self.logger.info(
+ f"Cluster {cluster_name}. Cloud: {cloud_type}. Flag: {flag}. Timeout: {timeout}"
+ )
+ cluster_api_map = {
+ "aws": {
+ "api_group": "eks.aws.upbound.io",
+ "api_plural": "clusters",
+ "api_version": "v1beta1",
+ },
+ "azure": {
+ "api_group": "containerservice.azure.upbound.io",
+ "api_plural": "kubernetesclusters",
+ "api_version": "v1beta1",
+ },
+ "gcp": {
+ "api_group": "container.gcp.upbound.io",
+ "api_plural": "clusters",
+ "api_version": "v1beta2",
+ },
+ }
+ api_group = cluster_api_map[cloud_type]["api_group"]
+ api_plural = cluster_api_map[cloud_type]["api_plural"]
+ api_version = cluster_api_map[cloud_type]["api_version"]
+ counter = 1
+ retry_time = self._odu_checkloop_retry_time
+ max_iterations = ceil(timeout / retry_time)
+ while counter <= max_iterations:
+ managed_cluster = await self._kubectl.get_generic_object(
+ api_group=api_group,
+ api_plural=api_plural,
+ api_version=api_version,
+ namespace="",
+ name=cluster_name,
+ )
+ if managed_cluster:
+ # self.logger.debug(f"{yaml.safe_dump(managed_cluster)}")
+ conditions = managed_cluster.get("status", {}).get("conditions", [])
+ else:
+ conditions = []
+ self.logger.info(f"Cluster status conditions (flag {flag}): {conditions}")
+ result = next((item for item in conditions if item["type"] == flag), {})
+ if result.get("status", "False") == "True":
+ self.logger.info(
+ f"Cluster {cluster_name} reached the status {flag} in {counter} iterations (aprox {counter*retry_time} seconds)"
+ )
+ return True, "COMPLETED"
+ await asyncio.sleep(retry_time)
+ counter += 1
+ return (
+ False,
+ "Cluster {cluster_name} did not reach the status {flag} after {max_iterations} seconds (aprox {timeout} seconds)",
+ )
+
+
async def check_create_cluster(self, op_id, op_params, content):
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
+ db_cluster = content["cluster"]
+ cluster_name = db_cluster["git_name"].lower()
+ cluster_kustomization_name = cluster_name
+ db_vim_account = content["vim_account"]
+ cloud_type = db_vim_account["vim_type"]
+ bootstrap = op_params.get("bootstrap", True)
+ if cloud_type not in ("azure", "gcp", "aws"):
+ return False, "Not suitable VIM account to check cluster status"
+
+ try:
+ # Kustomization ready
+ # kubectl get Kustomization/myakscluster01 -n managed-resources -o jsonpath='{.status.conditions}' | jq -r '.[] | select(.type=="Ready").status'
+ status, message = await self.check_kustomization_ready(
+ kustomization_name=cluster_kustomization_name,
+ namespace="managed-resources",
+ timeout=self._odu_checkloop_kustomization_timeout,
+ )
+ if not status:
+ return status, message
+ # Cluster managed resource synced
+ # kubectl get kubernetescluster.containerservice.azure.upbound.io/myakscluster01 -o jsonpath='{.status.conditions}' | jq -r '.[] | select(.type=="Synced").status'
+ status, message = await self.check_cluster_flag(
+ cluster_name=cluster_name,
+ cloud_type=cloud_type,
+ flag="Synced",
+ timeout=self._odu_checkloop_resource_timeout,
+ )
+ if not status:
+ return status, message
+ # Cluster managed resource ready
+ # kubectl get kubernetescluster.containerservice.azure.upbound.io/myakscluster01 -o jsonpath='{.status.conditions}' | jq -r '.[] | select(.type=="Ready").status'
+ status, message = await self.check_cluster_flag(
+ cluster_name=cluster_name,
+ cloud_type=cloud_type,
+ flag="Ready",
+ timeout=self._odu_checkloop_resource_timeout,
+ )
+ if not status:
+ return status, message
+ # Bootstrap is complete
+ # kubectl get Kustomization/myakscluster01-bstrp-fluxctrl -n managed-resources -o jsonpath='{.status.conditions}' | jq -r '.[] | select(.type=="Ready").status'
+ if bootstrap:
+ status, message = await self.check_kustomization_ready(
+ kustomization_name=f"{cluster_kustomization_name}-bstrp-fluxctrl",
+ namespace="managed-resources",
+ timeout=self._odu_checkloop_kustomization_timeout,
+ )
+ if not status:
+ return status, message
+ except Exception as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.debug(f"Exception: {e}", exc_info=True)
+ return False, f"Unexpected exception: {e}"
return True, "OK"