import asyncio
-from time import time
+from math import ceil
async def check_workflow_status(self, workflow_name):
- self.logger.info(f"Async check workflow status Enter: {workflow_name}")
- start_time = time()
+ self.logger.info(f"check_workflow_status Enter: {workflow_name}")
timeout = 300
retry_time = 15
- # TODO: Maybe it's better not to measure time, but controlling retries
- # retries = 0
- # total_retries = int(timeout/retry_time)
- while time() <= start_time + timeout:
- # workflow_list = await self._kubectl.list_generic_object(
- # api_group="argoproj.io",
- # api_plural="workflows",
- # api_version="v1alpha1",
- # namespace="osm-workflows",
- # )
- # self.logger.info(f"Workflow_list: { workflow_list }")
- # kubectl get workflow/${WORKFLOW_NAME} -n osm-workflows -o jsonpath='{.status.conditions}' | jq -r '.[] | select(.type=="Completed").status'
+ counter = 1
+ max_iterations = ceil(timeout / retry_time)
+ while counter <= max_iterations:
workflow = await self._kubectl.get_generic_object(
api_group="argoproj.io",
api_plural="workflows",
result = next((item for item in conditions if item["type"] == "Completed"), {})
if result.get("status", "False") == "True":
self.logger.info(
- f"Workflow {workflow_name} completed in {time() - start_time} seconds"
+ f"Workflow {workflow_name} completed in {counter} iterations (aprox {counter*retry_time} seconds)"
)
return True, "COMPLETED"
await asyncio.sleep(retry_time)
- return False, "Workflow {workflow_name} did not complete in {timeout} seconds"
+ counter += 1
+ return (
+ False,
+ "Workflow {workflow_name} did not complete in {max_iterations} iterations (aprox {timeout} seconds)",
+ )
+
+
+async def readiness_loop(self, item, name, namespace, flag, timeout):
+ self.logger.info("readiness_loop Enter")
+ self.logger.info(
+ f"{item} {name}. Namespace: {namespace}. Flag: {flag}. Timeout: {timeout}"
+ )
+ item_api_map = {
+ "kustomization": {
+ "api_group": "kustomize.toolkit.fluxcd.io",
+ "api_plural": "kustomizations",
+ "api_version": "v1",
+ },
+ "cluster_aws": {
+ "api_group": "eks.aws.upbound.io",
+ "api_plural": "clusters",
+ "api_version": "v1beta1",
+ },
+ "cluster_azure": {
+ "api_group": "containerservice.azure.upbound.io",
+ "api_plural": "kubernetesclusters",
+ "api_version": "v1beta1",
+ },
+ "cluster_gcp": {
+ "api_group": "container.gcp.upbound.io",
+ "api_plural": "clusters",
+ "api_version": "v1beta2",
+ },
+ }
+ counter = 1
+ retry_time = self._odu_checkloop_retry_time
+ max_iterations = ceil(timeout / retry_time)
+ api_group = item_api_map["api_group"]
+ api_plural = item_api_map["api_plural"]
+ api_version = item_api_map["api_version"]
+
+ while counter <= max_iterations:
+ generic_object = await self._kubectl.get_generic_object(
+ api_group=api_group,
+ api_plural=api_plural,
+ api_version=api_version,
+ namespace=namespace,
+ name=name,
+ )
+ if generic_object:
+ # self.logger.debug(f"{yaml.safe_dump(generic_object)}")
+ conditions = generic_object.get("status", {}).get("conditions", [])
+ else:
+ conditions = []
+ self.logger.info(f"{item} status conditions: {conditions}")
+ result = next((item for item in conditions if item["type"] == flag), {})
+ if result.get("status", "False") == "True":
+ self.logger.info(
+ f"{item} {name} reached the status {flag} in {counter} iterations (aprox {counter*retry_time} seconds)"
+ )
+ return True, "COMPLETED"
+ await asyncio.sleep(retry_time)
+ counter += 1
+ return (
+ False,
+ "{item} {name} was not ready after {max_iterations} iterations (aprox {timeout} seconds)",
+ )