blob: bc66bc6ad9dc30cd5ea20bebbb322500974df01d [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
import asyncio
from math import ceil
async def check_workflow_status(self, workflow_name):
self.logger.info(f"check_workflow_status Enter: {workflow_name}")
timeout = 300
retry_time = 15
counter = 1
max_iterations = ceil(timeout / retry_time)
while counter <= max_iterations:
workflow = await self._kubectl.get_generic_object(
api_group="argoproj.io",
api_plural="workflows",
api_version="v1alpha1",
namespace="osm-workflows",
name=workflow_name,
)
# self.logger.info(f"Workflow: {workflow}")
# self.logger.info(f"Workflow status: {workflow.get('status')}")
conditions = workflow.get("status", {}).get("conditions", [])
self.logger.info(f"Workflow status conditions: {conditions}")
result = next((item for item in conditions if item["type"] == "Completed"), {})
if result.get("status", "False") == "True":
self.logger.info(
f"Workflow {workflow_name} completed in {counter} iterations (aprox {counter*retry_time} seconds)"
)
return True, "COMPLETED"
await asyncio.sleep(retry_time)
counter += 1
return (
False,
"Workflow {workflow_name} did not complete in {max_iterations} iterations (aprox {timeout} seconds)",
)
async def readiness_loop(self, item, name, namespace, flag, timeout):
self.logger.info("readiness_loop Enter")
self.logger.info(
f"{item} {name}. Namespace: {namespace}. Flag: {flag}. Timeout: {timeout}"
)
item_api_map = {
"kustomization": {
"api_group": "kustomize.toolkit.fluxcd.io",
"api_plural": "kustomizations",
"api_version": "v1",
},
"cluster_aws": {
"api_group": "eks.aws.upbound.io",
"api_plural": "clusters",
"api_version": "v1beta1",
},
"cluster_azure": {
"api_group": "containerservice.azure.upbound.io",
"api_plural": "kubernetesclusters",
"api_version": "v1beta1",
},
"cluster_gcp": {
"api_group": "container.gcp.upbound.io",
"api_plural": "clusters",
"api_version": "v1beta2",
},
}
counter = 1
retry_time = self._odu_checkloop_retry_time
max_iterations = ceil(timeout / retry_time)
api_group = item_api_map["api_group"]
api_plural = item_api_map["api_plural"]
api_version = item_api_map["api_version"]
while counter <= max_iterations:
generic_object = await self._kubectl.get_generic_object(
api_group=api_group,
api_plural=api_plural,
api_version=api_version,
namespace=namespace,
name=name,
)
if generic_object:
# self.logger.debug(f"{yaml.safe_dump(generic_object)}")
conditions = generic_object.get("status", {}).get("conditions", [])
else:
conditions = []
self.logger.info(f"{item} status conditions: {conditions}")
result = next((item for item in conditions if item["type"] == flag), {})
if result.get("status", "False") == "True":
self.logger.info(
f"{item} {name} reached the status {flag} in {counter} iterations (aprox {counter*retry_time} seconds)"
)
return True, "COMPLETED"
await asyncio.sleep(retry_time)
counter += 1
return (
False,
"{item} {name} was not ready after {max_iterations} iterations (aprox {timeout} seconds)",
)