From ad6d1badee6f13f19da075251dac543a8050b6fd Mon Sep 17 00:00:00 2001 From: garciadeblas Date: Wed, 22 Jan 2025 16:02:18 +0100 Subject: [PATCH] Fix checks for KSU create and delete operations and for cluster delete operations Change-Id: I9fc2a68d636100295409b7d3ad875e54ba368d64 Signed-off-by: garciadeblas --- osm_lcm/k8s.py | 340 ++++++++++++++++++++++++---------- osm_lcm/n2vc/kubectl.py | 6 +- osm_lcm/odu_libs/workflows.py | 77 +++++--- osm_lcm/odu_workflows.py | 1 - 4 files changed, 299 insertions(+), 125 deletions(-) diff --git a/osm_lcm/k8s.py b/osm_lcm/k8s.py index b86e1337..e860a812 100644 --- a/osm_lcm/k8s.py +++ b/osm_lcm/k8s.py @@ -27,6 +27,8 @@ from copy import deepcopy from osm_lcm import odu_workflows from osm_lcm import vim_sdn from osm_lcm.data_utils.list_utils import find_in_list +from osm_lcm.n2vc.kubectl import Kubectl +import yaml MAP_PROFILE = { "infra_controller_profiles": "infra-controllers", @@ -212,7 +214,9 @@ class GitOpsLcm(LcmBase): db_content["current_operation"] = None return resource_status, db_content - async def common_check_list(self, op_id, checkings_list, db_collection, db_item): + async def common_check_list( + self, op_id, checkings_list, db_collection, db_item, kubectl=None + ): try: for checking in checkings_list: if checking["enable"]: @@ -220,8 +224,10 @@ class GitOpsLcm(LcmBase): item=checking["item"], name=checking["name"], namespace=checking["namespace"], - flag=checking["flag"], + flag=checking.get("flag"), + deleted=checking.get("deleted", False), timeout=checking["timeout"], + kubectl=kubectl, ) if not status: error_message = "Resources not ready: " @@ -241,8 +247,9 @@ class GitOpsLcm(LcmBase): async def check_resource_status(self, key, op_id, op_params, content): self.logger.info( - f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}" + f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}." ) + self.logger.debug(f"Check resource status. Content: {content}") check_resource_function = self._workflows.get(key, {}).get( "check_resource_function" ) @@ -266,6 +273,13 @@ class GitOpsLcm(LcmBase): ) return content_copy + def cluster_kubectl(self, db_cluster): + cluster_kubeconfig = db_cluster["credentials"] + kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml" + with open(kubeconfig_path, "w") as kubeconfig_file: + yaml.safe_dump(cluster_kubeconfig, kubeconfig_file) + return Kubectl(config_file=kubeconfig_path) + class ClusterLcm(GitOpsLcm): db_collection = "clusters" @@ -287,6 +301,9 @@ class ClusterLcm(GitOpsLcm): "update_cluster": { "check_resource_function": self.check_update_cluster, }, + "delete_cluster": { + "check_resource_function": self.check_delete_cluster, + }, } self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config) @@ -470,93 +487,6 @@ class ClusterLcm(GitOpsLcm): op_id, checkings_list, "clusters", db_cluster ) - async def check_register_cluster(self, op_id, op_params, content): - self.logger.info( - f"check_register_cluster Operation {op_id}. Params: {op_params}." - ) - # self.logger.debug(f"Content: {content}") - db_cluster = content["cluster"] - cluster_name = db_cluster["git_name"].lower() - cluster_kustomization_name = cluster_name - bootstrap = op_params.get("bootstrap", True) - checkings_list = [ - { - "item": "kustomization", - "name": f"{cluster_kustomization_name}-bstrp-fluxctrl", - "namespace": "managed-resources", - "flag": "Ready", - "timeout": self._checkloop_kustomization_timeout, - "enable": bootstrap, - "resourceState": "IN_PROGRESS.BOOTSTRAP_OK", - }, - ] - return await self.common_check_list( - op_id, checkings_list, "clusters", db_cluster - ) - - async def check_update_cluster(self, op_id, op_params, content): - self.logger.info( - f"check_create_cluster Operation {op_id}. Params: {op_params}." - ) - # self.logger.debug(f"Content: {content}") - db_cluster = content["cluster"] - cluster_name = db_cluster["git_name"].lower() - cluster_kustomization_name = cluster_name - db_vim_account = content["vim_account"] - cloud_type = db_vim_account["vim_type"] - nodepool_name = "" - if cloud_type == "aws": - nodepool_name = f"{cluster_name}-nodegroup" - cluster_name = f"{cluster_name}-cluster" - elif cloud_type == "gcp": - nodepool_name = f"nodepool-{cluster_name}" - if cloud_type in ("azure", "gcp", "aws"): - checkings_list = [ - { - "item": "kustomization", - "name": cluster_kustomization_name, - "namespace": "managed-resources", - "flag": "Ready", - "timeout": self._checkloop_kustomization_timeout, - "enable": True, - "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY", - }, - { - "item": f"cluster_{cloud_type}", - "name": cluster_name, - "namespace": "", - "flag": "Synced", - "timeout": self._checkloop_resource_timeout, - "enable": True, - "resourceState": "IN_PROGRESS.RESOURCE_SYNCED.CLUSTER", - }, - { - "item": f"cluster_{cloud_type}", - "name": cluster_name, - "namespace": "", - "flag": "Ready", - "timeout": self._checkloop_resource_timeout, - "enable": True, - "resourceState": "IN_PROGRESS.RESOURCE_READY.CLUSTER", - }, - ] - else: - return False, "Not suitable VIM account to check cluster status" - if nodepool_name: - nodepool_check = { - "item": f"nodepool_{cloud_type}", - "name": nodepool_name, - "namespace": "", - "flag": "Ready", - "timeout": self._checkloop_resource_timeout, - "enable": True, - "resourceState": "IN_PROGRESS.RESOURCE_READY.NODEPOOL", - } - checkings_list.append(nodepool_check) - return await self.common_check_list( - op_id, checkings_list, "clusters", db_cluster - ) - def update_profile_state(self, db_cluster, workflow_status, resource_status): profiles = [ "infra_controller_profiles", @@ -610,6 +540,10 @@ class ClusterLcm(GitOpsLcm): "cluster": self.decrypted_copy(db_cluster), } + # To get the vim account details + db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]}) + workflow_content["vim_account"] = db_vim + # TODO: workaround until NBI rejects cluster deletion requests for registered clusters # This if clause will be removed if db_cluster["created"] == "false": @@ -678,6 +612,45 @@ class ClusterLcm(GitOpsLcm): return + async def check_delete_cluster(self, op_id, op_params, content): + self.logger.info( + f"check_delete_cluster Operation {op_id}. Params: {op_params}." + ) + self.logger.debug(f"Content: {content}") + db_cluster = content["cluster"] + cluster_name = db_cluster["git_name"].lower() + cluster_kustomization_name = cluster_name + db_vim_account = content["vim_account"] + cloud_type = db_vim_account["vim_type"] + if cloud_type == "aws": + cluster_name = f"{cluster_name}-cluster" + if cloud_type in ("azure", "gcp", "aws"): + checkings_list = [ + { + "item": "kustomization", + "name": cluster_kustomization_name, + "namespace": "managed-resources", + "deleted": True, + "timeout": self._checkloop_kustomization_timeout, + "enable": True, + "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED", + }, + { + "item": f"cluster_{cloud_type}", + "name": cluster_name, + "namespace": "", + "deleted": True, + "timeout": self._checkloop_resource_timeout, + "enable": True, + "resourceState": "IN_PROGRESS.RESOURCE_DELETED.CLUSTER", + }, + ] + else: + return False, "Not suitable VIM account to check cluster status" + return await self.common_check_list( + op_id, checkings_list, "clusters", db_cluster + ) + def delete_cluster(self, db_cluster): # Actually, item_content is equal to db_cluster # item_content = self.db.get_one("clusters", {"_id": db_cluster["_id"]}) @@ -974,6 +947,30 @@ class ClusterLcm(GitOpsLcm): return + async def check_register_cluster(self, op_id, op_params, content): + self.logger.info( + f"check_register_cluster Operation {op_id}. Params: {op_params}." + ) + # self.logger.debug(f"Content: {content}") + db_cluster = content["cluster"] + cluster_name = db_cluster["git_name"].lower() + cluster_kustomization_name = cluster_name + bootstrap = op_params.get("bootstrap", True) + checkings_list = [ + { + "item": "kustomization", + "name": f"{cluster_kustomization_name}-bstrp-fluxctrl", + "namespace": "managed-resources", + "flag": "Ready", + "timeout": self._checkloop_kustomization_timeout, + "enable": bootstrap, + "resourceState": "IN_PROGRESS.BOOTSTRAP_OK", + }, + ] + return await self.common_check_list( + op_id, checkings_list, "clusters", db_cluster + ) + async def deregister(self, params, order_id): self.logger.info("cluster deregister enter") @@ -1155,6 +1152,32 @@ class ClusterLcm(GitOpsLcm): self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) return + async def check_update_cluster(self, op_id, op_params, content): + self.logger.info( + f"check_update_cluster Operation {op_id}. Params: {op_params}." + ) + self.logger.debug(f"Content: {content}") + return await self.check_dummy_operation(op_id, op_params, content) + # db_cluster = content["cluster"] + # cluster_id = db_cluster["_id"] + # cluster_kubectl = self.cluster_kubectl(db_cluster) + # cluster_name = db_cluster["git_name"].lower() + # cluster_kustomization_name = cluster_name + # checkings_list = [ + # { + # "item": "kustomization", + # "name": cluster_kustomization_name, + # "namespace": "managed-resources", + # "flag": "Ready", + # "timeout": self._checkloop_kustomization_timeout, + # "enable": True, + # "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY", + # }, + # ] + # return await self.common_check_list( + # op_id, checkings_list, "clusters", db_cluster, cluster_kubectl + # ) + class CloudCredentialsLcm(GitOpsLcm): db_collection = "vim_accounts" @@ -1707,6 +1730,26 @@ class KsuLcm(GitOpsLcm): :return: None """ super().__init__(msg, lcm_tasks, config) + self._workflows = { + "create_ksus": { + "check_resource_function": self.check_create_ksus, + }, + "delete_ksus": { + "check_resource_function": self.check_delete_ksus, + }, + } + + def get_dbclusters_from_profile(self, profile_id, profile_type): + cluster_list = [] + db_clusters = self.db.get_list("clusters") + self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}") + for db_cluster in db_clusters: + if profile_id in db_cluster.get(profile_type, []): + self.logger.info( + f"Profile {profile_id} found in cluster {db_cluster['name']}" + ) + cluster_list.append(db_cluster) + return cluster_list async def create(self, params, order_id): self.logger.info("ksu Create Enter") @@ -1743,19 +1786,28 @@ class KsuLcm(GitOpsLcm): ] = f"{oka_type}/{db_oka['git_name'].lower()}/templates" op_params.append(ksu_params) + # A single workflow is launched for all KSUs _, workflow_name = await self.odu.launch_workflow( "create_ksus", op_id, op_params, db_content ) + # Update workflow status in all KSUs + wf_status_list = [] for db_ksu, ksu_params in zip(db_content, op_params): workflow_status = await self.check_workflow_and_update_db( op_id, workflow_name, db_ksu ) - - if workflow_status: + wf_status_list.append(workflow_status) + # Update resource status in all KSUs + # TODO: Is an operation correct if n KSUs are right and 1 is not OK? + res_status_list = [] + for db_ksu, ksu_params, wf_status in zip(db_content, op_params, wf_status_list): + if wf_status: resource_status, db_ksu = await self.check_resource_and_update_db( "create_ksus", op_id, ksu_params, db_ksu ) - + else: + resource_status = False + res_status_list.append(resource_status) self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) # Clean items used in the workflow, no matter if the workflow succeeded @@ -1765,7 +1817,7 @@ class KsuLcm(GitOpsLcm): self.logger.info( f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" ) - self.logger.info(f"KSU Create EXIT with Resource Status {resource_status}") + self.logger.info(f"KSU Create EXIT with Resource Status {res_status_list}") return async def edit(self, params, order_id): @@ -1923,3 +1975,103 @@ class KsuLcm(GitOpsLcm): self.logger.info(f"KSU Move Exit with resource status: {resource_status}") return + + async def check_create_ksus(self, op_id, op_params, content): + self.logger.info(f"check_create_ksus Operation {op_id}. Params: {op_params}.") + self.logger.debug(f"Content: {content}") + db_ksu = content + kustomization_name = db_ksu["git_name"].lower() + oka_list = op_params["oka"] + oka_item = oka_list[0] + oka_params = oka_item.get("transformation", {}) + target_ns = oka_params.get("namespace", "default") + profile_id = op_params.get("profile", {}).get("_id") + profile_type = op_params.get("profile", {}).get("profile_type") + self.logger.info( + f"Checking status of KSU {db_ksu['name']} for profile {profile_id}." + ) + dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type) + if not dbcluster_list: + self.logger.info(f"No clusters found for profile {profile_id}.") + for db_cluster in dbcluster_list: + try: + self.logger.info( + f"Checking status of KSU in cluster {db_cluster['name']}." + ) + cluster_kubectl = self.cluster_kubectl(db_cluster) + checkings_list = [ + { + "item": "kustomization", + "name": kustomization_name, + "namespace": target_ns, + "flag": "Ready", + "timeout": self._checkloop_kustomization_timeout, + "enable": True, + "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY", + }, + ] + self.logger.info( + f"Checking status of KSU {db_ksu['name']} for profile {profile_id}." + ) + result, message = await self.common_check_list( + op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl + ) + if not result: + return False, message + except Exception as e: + self.logger.error( + f"Error checking KSU in cluster {db_cluster['name']}." + ) + self.logger.error(e) + return False, f"Error checking KSU in cluster {db_cluster['name']}." + return True, "OK" + + async def check_delete_ksus(self, op_id, op_params, content): + self.logger.info(f"check_delete_ksus Operation {op_id}. Params: {op_params}.") + self.logger.debug(f"Content: {content}") + db_ksu = content + kustomization_name = db_ksu["git_name"].lower() + oka_list = db_ksu["oka"] + oka_item = oka_list[0] + oka_params = oka_item.get("transformation", {}) + target_ns = oka_params.get("namespace", "default") + profile_id = op_params.get("profile", {}).get("_id") + profile_type = op_params.get("profile", {}).get("profile_type") + self.logger.info( + f"Checking status of KSU {db_ksu['name']} for profile {profile_id}." + ) + dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type) + if not dbcluster_list: + self.logger.info(f"No clusters found for profile {profile_id}.") + for db_cluster in dbcluster_list: + try: + self.logger.info( + f"Checking status of KSU in cluster {db_cluster['name']}." + ) + cluster_kubectl = self.cluster_kubectl(db_cluster) + checkings_list = [ + { + "item": "kustomization", + "name": kustomization_name, + "namespace": target_ns, + "deleted": True, + "timeout": self._checkloop_kustomization_timeout, + "enable": True, + "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED", + }, + ] + self.logger.info( + f"Checking status of KSU {db_ksu['name']} for profile {profile_id}." + ) + result, message = await self.common_check_list( + op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl + ) + if not result: + return False, message + except Exception as e: + self.logger.error( + f"Error checking KSU in cluster {db_cluster['name']}." + ) + self.logger.error(e) + return False, f"Error checking KSU in cluster {db_cluster['name']}." + return True, "OK" diff --git a/osm_lcm/n2vc/kubectl.py b/osm_lcm/n2vc/kubectl.py index 4190740b..eee63dcc 100644 --- a/osm_lcm/n2vc/kubectl.py +++ b/osm_lcm/n2vc/kubectl.py @@ -802,11 +802,7 @@ class Kubectl: except ApiException as e: self.logger.debug(f"Exception: {e}") info = json.loads(e.body) - if info.get("reason").lower() == "notfound": - self.logger.warning("Cannot get custom object: {}".format(e)) - return None - else: - raise e + raise f"Api Exception: {e}. Reason: {info.get('reason')}" async def list_generic_object( self, diff --git a/osm_lcm/odu_libs/workflows.py b/osm_lcm/odu_libs/workflows.py index 80527eda..d1b95826 100644 --- a/osm_lcm/odu_libs/workflows.py +++ b/osm_lcm/odu_libs/workflows.py @@ -30,16 +30,21 @@ async def check_workflow_status(self, workflow_name): name=workflow_name, namespace="osm-workflows", flag="Completed", + deleted=False, timeout=300, ) except Exception as e: return False, f"Unexpected exception: {e}" -async def readiness_loop(self, item, name, namespace, flag, timeout): +async def readiness_loop( + self, item, name, namespace, flag, deleted, timeout, kubectl=None +): + if kubectl is None: + kubectl = self._kubectl self.logger.info("readiness_loop Enter") self.logger.info( - f"{item} {name}. Namespace: '{namespace}'. Flag: {flag}. Timeout: {timeout}" + f"{item} {name}. Namespace: '{namespace}'. Flag: {flag}. Deleted: {deleted}. Timeout: {timeout}" ) item_api_map = { "workflow": { @@ -86,30 +91,52 @@ async def readiness_loop(self, item, name, namespace, flag, timeout): api_version = item_api_map[item]["api_version"] while counter <= max_iterations: - generic_object = await self._kubectl.get_generic_object( - api_group=api_group, - api_plural=api_plural, - api_version=api_version, - namespace=namespace, - name=name, - ) - if generic_object: - # self.logger.debug(f"{yaml.safe_dump(generic_object)}") - conditions = generic_object.get("status", {}).get("conditions", []) - else: - self.logger.info( - f"Could not find {api_plural}. Name: {name}. Namespace: '{namespace}'. API: {api_group}/{api_version}" - ) - conditions = [] - self.logger.info( - f"Iteration {counter}/{max_iterations}. {item} status conditions: {conditions}" - ) - result = next((item for item in conditions if item["type"] == flag), {}) - if result.get("status", "False") == "True": - self.logger.info( - f"{item} {name} reached the status {flag} in {counter} iterations (aprox {counter*retry_time} seconds)" + try: + self.logger.info(f"Iteration {counter}/{max_iterations}") + generic_object = await kubectl.get_generic_object( + api_group=api_group, + api_plural=api_plural, + api_version=api_version, + namespace=namespace, + name=name, ) - return True, "COMPLETED" + if deleted: + if generic_object: + self.logger.info( + f"Found {api_plural}. Name: {name}. Namespace: '{namespace}'. API: {api_group}/{api_version}" + ) + else: + self.logger.info( + f"{item} {name} deleted after {counter} iterations (aprox {counter*retry_time} seconds)" + ) + return True, "COMPLETED" + else: + if not flag: + return True, "Nothing to check" + if generic_object: + # self.logger.debug(f"{yaml.safe_dump(generic_object)}") + conditions = generic_object.get("status", {}).get("conditions", []) + self.logger.info(f"{item} status conditions: {conditions}") + else: + self.logger.info( + f"Could not find {api_plural}. Name: {name}. Namespace: '{namespace}'. API: {api_group}/{api_version}" + ) + conditions = [] + result = next((item for item in conditions if item["type"] == flag), {}) + if result.get("status", "False") == "True": + self.logger.info( + f"{item} {name} reached the status {flag} in {counter} iterations (aprox {counter*retry_time} seconds)" + ) + return True, "COMPLETED" + # TODO: Implement generic condition with jsonpath filter + # jsonpath_expr = parse(condition["jsonpath_filter"]) + # match = jsonpath_expr.find(generic_object) + # if match: + # value = match[0].value + # if condition["function"](value, condition["value"]): + # return True, "COMPLETED" + except Exception as e: + self.logger.error(f"Exception: {e}") await asyncio.sleep(retry_time) counter += 1 return ( diff --git a/osm_lcm/odu_workflows.py b/osm_lcm/odu_workflows.py index 6e9061ec..9c8e6e04 100644 --- a/osm_lcm/odu_workflows.py +++ b/osm_lcm/odu_workflows.py @@ -16,7 +16,6 @@ import logging from osm_lcm.lcm_utils import LcmBase - from osm_lcm.n2vc import kubectl -- 2.25.1