Fix checks for KSU create and delete operations and for cluster delete operations 92/14892/16
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Wed, 22 Jan 2025 15:02:18 +0000 (16:02 +0100)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Mon, 27 Jan 2025 16:58:50 +0000 (17:58 +0100)
Change-Id: I9fc2a68d636100295409b7d3ad875e54ba368d64
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
osm_lcm/k8s.py
osm_lcm/n2vc/kubectl.py
osm_lcm/odu_libs/workflows.py
osm_lcm/odu_workflows.py

index b86e133..e860a81 100644 (file)
@@ -27,6 +27,8 @@ from copy import deepcopy
 from osm_lcm import odu_workflows
 from osm_lcm import vim_sdn
 from osm_lcm.data_utils.list_utils import find_in_list
+from osm_lcm.n2vc.kubectl import Kubectl
+import yaml
 
 MAP_PROFILE = {
     "infra_controller_profiles": "infra-controllers",
@@ -212,7 +214,9 @@ class GitOpsLcm(LcmBase):
         db_content["current_operation"] = None
         return resource_status, db_content
 
-    async def common_check_list(self, op_id, checkings_list, db_collection, db_item):
+    async def common_check_list(
+        self, op_id, checkings_list, db_collection, db_item, kubectl=None
+    ):
         try:
             for checking in checkings_list:
                 if checking["enable"]:
@@ -220,8 +224,10 @@ class GitOpsLcm(LcmBase):
                         item=checking["item"],
                         name=checking["name"],
                         namespace=checking["namespace"],
-                        flag=checking["flag"],
+                        flag=checking.get("flag"),
+                        deleted=checking.get("deleted", False),
                         timeout=checking["timeout"],
+                        kubectl=kubectl,
                     )
                     if not status:
                         error_message = "Resources not ready: "
@@ -241,8 +247,9 @@ class GitOpsLcm(LcmBase):
 
     async def check_resource_status(self, key, op_id, op_params, content):
         self.logger.info(
-            f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}"
+            f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
         )
+        self.logger.debug(f"Check resource status. Content: {content}")
         check_resource_function = self._workflows.get(key, {}).get(
             "check_resource_function"
         )
@@ -266,6 +273,13 @@ class GitOpsLcm(LcmBase):
         )
         return content_copy
 
+    def cluster_kubectl(self, db_cluster):
+        cluster_kubeconfig = db_cluster["credentials"]
+        kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
+        with open(kubeconfig_path, "w") as kubeconfig_file:
+            yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
+        return Kubectl(config_file=kubeconfig_path)
+
 
 class ClusterLcm(GitOpsLcm):
     db_collection = "clusters"
@@ -287,6 +301,9 @@ class ClusterLcm(GitOpsLcm):
             "update_cluster": {
                 "check_resource_function": self.check_update_cluster,
             },
+            "delete_cluster": {
+                "check_resource_function": self.check_delete_cluster,
+            },
         }
         self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
 
@@ -470,93 +487,6 @@ class ClusterLcm(GitOpsLcm):
             op_id, checkings_list, "clusters", db_cluster
         )
 
-    async def check_register_cluster(self, op_id, op_params, content):
-        self.logger.info(
-            f"check_register_cluster Operation {op_id}. Params: {op_params}."
-        )
-        # self.logger.debug(f"Content: {content}")
-        db_cluster = content["cluster"]
-        cluster_name = db_cluster["git_name"].lower()
-        cluster_kustomization_name = cluster_name
-        bootstrap = op_params.get("bootstrap", True)
-        checkings_list = [
-            {
-                "item": "kustomization",
-                "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
-                "namespace": "managed-resources",
-                "flag": "Ready",
-                "timeout": self._checkloop_kustomization_timeout,
-                "enable": bootstrap,
-                "resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
-            },
-        ]
-        return await self.common_check_list(
-            op_id, checkings_list, "clusters", db_cluster
-        )
-
-    async def check_update_cluster(self, op_id, op_params, content):
-        self.logger.info(
-            f"check_create_cluster Operation {op_id}. Params: {op_params}."
-        )
-        # self.logger.debug(f"Content: {content}")
-        db_cluster = content["cluster"]
-        cluster_name = db_cluster["git_name"].lower()
-        cluster_kustomization_name = cluster_name
-        db_vim_account = content["vim_account"]
-        cloud_type = db_vim_account["vim_type"]
-        nodepool_name = ""
-        if cloud_type == "aws":
-            nodepool_name = f"{cluster_name}-nodegroup"
-            cluster_name = f"{cluster_name}-cluster"
-        elif cloud_type == "gcp":
-            nodepool_name = f"nodepool-{cluster_name}"
-        if cloud_type in ("azure", "gcp", "aws"):
-            checkings_list = [
-                {
-                    "item": "kustomization",
-                    "name": cluster_kustomization_name,
-                    "namespace": "managed-resources",
-                    "flag": "Ready",
-                    "timeout": self._checkloop_kustomization_timeout,
-                    "enable": True,
-                    "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
-                },
-                {
-                    "item": f"cluster_{cloud_type}",
-                    "name": cluster_name,
-                    "namespace": "",
-                    "flag": "Synced",
-                    "timeout": self._checkloop_resource_timeout,
-                    "enable": True,
-                    "resourceState": "IN_PROGRESS.RESOURCE_SYNCED.CLUSTER",
-                },
-                {
-                    "item": f"cluster_{cloud_type}",
-                    "name": cluster_name,
-                    "namespace": "",
-                    "flag": "Ready",
-                    "timeout": self._checkloop_resource_timeout,
-                    "enable": True,
-                    "resourceState": "IN_PROGRESS.RESOURCE_READY.CLUSTER",
-                },
-            ]
-        else:
-            return False, "Not suitable VIM account to check cluster status"
-        if nodepool_name:
-            nodepool_check = {
-                "item": f"nodepool_{cloud_type}",
-                "name": nodepool_name,
-                "namespace": "",
-                "flag": "Ready",
-                "timeout": self._checkloop_resource_timeout,
-                "enable": True,
-                "resourceState": "IN_PROGRESS.RESOURCE_READY.NODEPOOL",
-            }
-            checkings_list.append(nodepool_check)
-        return await self.common_check_list(
-            op_id, checkings_list, "clusters", db_cluster
-        )
-
     def update_profile_state(self, db_cluster, workflow_status, resource_status):
         profiles = [
             "infra_controller_profiles",
@@ -610,6 +540,10 @@ class ClusterLcm(GitOpsLcm):
             "cluster": self.decrypted_copy(db_cluster),
         }
 
+        # To get the vim account details
+        db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
+        workflow_content["vim_account"] = db_vim
+
         # TODO: workaround until NBI rejects cluster deletion requests for registered clusters
         # This if clause will be removed
         if db_cluster["created"] == "false":
@@ -678,6 +612,45 @@ class ClusterLcm(GitOpsLcm):
 
         return
 
+    async def check_delete_cluster(self, op_id, op_params, content):
+        self.logger.info(
+            f"check_delete_cluster Operation {op_id}. Params: {op_params}."
+        )
+        self.logger.debug(f"Content: {content}")
+        db_cluster = content["cluster"]
+        cluster_name = db_cluster["git_name"].lower()
+        cluster_kustomization_name = cluster_name
+        db_vim_account = content["vim_account"]
+        cloud_type = db_vim_account["vim_type"]
+        if cloud_type == "aws":
+            cluster_name = f"{cluster_name}-cluster"
+        if cloud_type in ("azure", "gcp", "aws"):
+            checkings_list = [
+                {
+                    "item": "kustomization",
+                    "name": cluster_kustomization_name,
+                    "namespace": "managed-resources",
+                    "deleted": True,
+                    "timeout": self._checkloop_kustomization_timeout,
+                    "enable": True,
+                    "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
+                },
+                {
+                    "item": f"cluster_{cloud_type}",
+                    "name": cluster_name,
+                    "namespace": "",
+                    "deleted": True,
+                    "timeout": self._checkloop_resource_timeout,
+                    "enable": True,
+                    "resourceState": "IN_PROGRESS.RESOURCE_DELETED.CLUSTER",
+                },
+            ]
+        else:
+            return False, "Not suitable VIM account to check cluster status"
+        return await self.common_check_list(
+            op_id, checkings_list, "clusters", db_cluster
+        )
+
     def delete_cluster(self, db_cluster):
         # Actually, item_content is equal to db_cluster
         # item_content = self.db.get_one("clusters", {"_id": db_cluster["_id"]})
@@ -974,6 +947,30 @@ class ClusterLcm(GitOpsLcm):
 
         return
 
+    async def check_register_cluster(self, op_id, op_params, content):
+        self.logger.info(
+            f"check_register_cluster Operation {op_id}. Params: {op_params}."
+        )
+        # self.logger.debug(f"Content: {content}")
+        db_cluster = content["cluster"]
+        cluster_name = db_cluster["git_name"].lower()
+        cluster_kustomization_name = cluster_name
+        bootstrap = op_params.get("bootstrap", True)
+        checkings_list = [
+            {
+                "item": "kustomization",
+                "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
+                "namespace": "managed-resources",
+                "flag": "Ready",
+                "timeout": self._checkloop_kustomization_timeout,
+                "enable": bootstrap,
+                "resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
+            },
+        ]
+        return await self.common_check_list(
+            op_id, checkings_list, "clusters", db_cluster
+        )
+
     async def deregister(self, params, order_id):
         self.logger.info("cluster deregister enter")
 
@@ -1155,6 +1152,32 @@ class ClusterLcm(GitOpsLcm):
         self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
         return
 
+    async def check_update_cluster(self, op_id, op_params, content):
+        self.logger.info(
+            f"check_update_cluster Operation {op_id}. Params: {op_params}."
+        )
+        self.logger.debug(f"Content: {content}")
+        return await self.check_dummy_operation(op_id, op_params, content)
+        # db_cluster = content["cluster"]
+        # cluster_id = db_cluster["_id"]
+        # cluster_kubectl = self.cluster_kubectl(db_cluster)
+        # cluster_name = db_cluster["git_name"].lower()
+        # cluster_kustomization_name = cluster_name
+        # checkings_list = [
+        #     {
+        #         "item": "kustomization",
+        #         "name": cluster_kustomization_name,
+        #         "namespace": "managed-resources",
+        #         "flag": "Ready",
+        #         "timeout": self._checkloop_kustomization_timeout,
+        #         "enable": True,
+        #         "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
+        #     },
+        # ]
+        # return await self.common_check_list(
+        #     op_id, checkings_list, "clusters", db_cluster, cluster_kubectl
+        # )
+
 
 class CloudCredentialsLcm(GitOpsLcm):
     db_collection = "vim_accounts"
@@ -1707,6 +1730,26 @@ class KsuLcm(GitOpsLcm):
         :return: None
         """
         super().__init__(msg, lcm_tasks, config)
+        self._workflows = {
+            "create_ksus": {
+                "check_resource_function": self.check_create_ksus,
+            },
+            "delete_ksus": {
+                "check_resource_function": self.check_delete_ksus,
+            },
+        }
+
+    def get_dbclusters_from_profile(self, profile_id, profile_type):
+        cluster_list = []
+        db_clusters = self.db.get_list("clusters")
+        self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}")
+        for db_cluster in db_clusters:
+            if profile_id in db_cluster.get(profile_type, []):
+                self.logger.info(
+                    f"Profile {profile_id} found in cluster {db_cluster['name']}"
+                )
+                cluster_list.append(db_cluster)
+        return cluster_list
 
     async def create(self, params, order_id):
         self.logger.info("ksu Create Enter")
@@ -1743,19 +1786,28 @@ class KsuLcm(GitOpsLcm):
                     ] = f"{oka_type}/{db_oka['git_name'].lower()}/templates"
             op_params.append(ksu_params)
 
+        # A single workflow is launched for all KSUs
         _, workflow_name = await self.odu.launch_workflow(
             "create_ksus", op_id, op_params, db_content
         )
+        # Update workflow status in all KSUs
+        wf_status_list = []
         for db_ksu, ksu_params in zip(db_content, op_params):
             workflow_status = await self.check_workflow_and_update_db(
                 op_id, workflow_name, db_ksu
             )
-
-            if workflow_status:
+            wf_status_list.append(workflow_status)
+        # Update resource status in all KSUs
+        # TODO: Is an operation correct if n KSUs are right and 1 is not OK?
+        res_status_list = []
+        for db_ksu, ksu_params, wf_status in zip(db_content, op_params, wf_status_list):
+            if wf_status:
                 resource_status, db_ksu = await self.check_resource_and_update_db(
                     "create_ksus", op_id, ksu_params, db_ksu
                 )
-
+            else:
+                resource_status = False
+            res_status_list.append(resource_status)
             self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
 
         # Clean items used in the workflow, no matter if the workflow succeeded
@@ -1765,7 +1817,7 @@ class KsuLcm(GitOpsLcm):
         self.logger.info(
             f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
         )
-        self.logger.info(f"KSU Create EXIT with Resource Status {resource_status}")
+        self.logger.info(f"KSU Create EXIT with Resource Status {res_status_list}")
         return
 
     async def edit(self, params, order_id):
@@ -1923,3 +1975,103 @@ class KsuLcm(GitOpsLcm):
 
         self.logger.info(f"KSU Move Exit with resource status: {resource_status}")
         return
+
+    async def check_create_ksus(self, op_id, op_params, content):
+        self.logger.info(f"check_create_ksus Operation {op_id}. Params: {op_params}.")
+        self.logger.debug(f"Content: {content}")
+        db_ksu = content
+        kustomization_name = db_ksu["git_name"].lower()
+        oka_list = op_params["oka"]
+        oka_item = oka_list[0]
+        oka_params = oka_item.get("transformation", {})
+        target_ns = oka_params.get("namespace", "default")
+        profile_id = op_params.get("profile", {}).get("_id")
+        profile_type = op_params.get("profile", {}).get("profile_type")
+        self.logger.info(
+            f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
+        )
+        dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
+        if not dbcluster_list:
+            self.logger.info(f"No clusters found for profile {profile_id}.")
+        for db_cluster in dbcluster_list:
+            try:
+                self.logger.info(
+                    f"Checking status of KSU in cluster {db_cluster['name']}."
+                )
+                cluster_kubectl = self.cluster_kubectl(db_cluster)
+                checkings_list = [
+                    {
+                        "item": "kustomization",
+                        "name": kustomization_name,
+                        "namespace": target_ns,
+                        "flag": "Ready",
+                        "timeout": self._checkloop_kustomization_timeout,
+                        "enable": True,
+                        "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
+                    },
+                ]
+                self.logger.info(
+                    f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
+                )
+                result, message = await self.common_check_list(
+                    op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl
+                )
+                if not result:
+                    return False, message
+            except Exception as e:
+                self.logger.error(
+                    f"Error checking KSU in cluster {db_cluster['name']}."
+                )
+                self.logger.error(e)
+                return False, f"Error checking KSU in cluster {db_cluster['name']}."
+        return True, "OK"
+
+    async def check_delete_ksus(self, op_id, op_params, content):
+        self.logger.info(f"check_delete_ksus Operation {op_id}. Params: {op_params}.")
+        self.logger.debug(f"Content: {content}")
+        db_ksu = content
+        kustomization_name = db_ksu["git_name"].lower()
+        oka_list = db_ksu["oka"]
+        oka_item = oka_list[0]
+        oka_params = oka_item.get("transformation", {})
+        target_ns = oka_params.get("namespace", "default")
+        profile_id = op_params.get("profile", {}).get("_id")
+        profile_type = op_params.get("profile", {}).get("profile_type")
+        self.logger.info(
+            f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
+        )
+        dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
+        if not dbcluster_list:
+            self.logger.info(f"No clusters found for profile {profile_id}.")
+        for db_cluster in dbcluster_list:
+            try:
+                self.logger.info(
+                    f"Checking status of KSU in cluster {db_cluster['name']}."
+                )
+                cluster_kubectl = self.cluster_kubectl(db_cluster)
+                checkings_list = [
+                    {
+                        "item": "kustomization",
+                        "name": kustomization_name,
+                        "namespace": target_ns,
+                        "deleted": True,
+                        "timeout": self._checkloop_kustomization_timeout,
+                        "enable": True,
+                        "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
+                    },
+                ]
+                self.logger.info(
+                    f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
+                )
+                result, message = await self.common_check_list(
+                    op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl
+                )
+                if not result:
+                    return False, message
+            except Exception as e:
+                self.logger.error(
+                    f"Error checking KSU in cluster {db_cluster['name']}."
+                )
+                self.logger.error(e)
+                return False, f"Error checking KSU in cluster {db_cluster['name']}."
+        return True, "OK"
index 4190740..eee63dc 100644 (file)
@@ -802,11 +802,7 @@ class Kubectl:
         except ApiException as e:
             self.logger.debug(f"Exception: {e}")
             info = json.loads(e.body)
-            if info.get("reason").lower() == "notfound":
-                self.logger.warning("Cannot get custom object: {}".format(e))
-                return None
-            else:
-                raise e
+            raise f"Api Exception: {e}. Reason: {info.get('reason')}"
 
     async def list_generic_object(
         self,
index 80527ed..d1b9582 100644 (file)
@@ -30,16 +30,21 @@ async def check_workflow_status(self, workflow_name):
             name=workflow_name,
             namespace="osm-workflows",
             flag="Completed",
+            deleted=False,
             timeout=300,
         )
     except Exception as e:
         return False, f"Unexpected exception: {e}"
 
 
-async def readiness_loop(self, item, name, namespace, flag, timeout):
+async def readiness_loop(
+    self, item, name, namespace, flag, deleted, timeout, kubectl=None
+):
+    if kubectl is None:
+        kubectl = self._kubectl
     self.logger.info("readiness_loop Enter")
     self.logger.info(
-        f"{item} {name}. Namespace: '{namespace}'. Flag: {flag}. Timeout: {timeout}"
+        f"{item} {name}. Namespace: '{namespace}'. Flag: {flag}. Deleted: {deleted}. Timeout: {timeout}"
     )
     item_api_map = {
         "workflow": {
@@ -86,30 +91,52 @@ async def readiness_loop(self, item, name, namespace, flag, timeout):
     api_version = item_api_map[item]["api_version"]
 
     while counter <= max_iterations:
-        generic_object = await self._kubectl.get_generic_object(
-            api_group=api_group,
-            api_plural=api_plural,
-            api_version=api_version,
-            namespace=namespace,
-            name=name,
-        )
-        if generic_object:
-            # self.logger.debug(f"{yaml.safe_dump(generic_object)}")
-            conditions = generic_object.get("status", {}).get("conditions", [])
-        else:
-            self.logger.info(
-                f"Could not find {api_plural}. Name: {name}. Namespace: '{namespace}'. API: {api_group}/{api_version}"
-            )
-            conditions = []
-        self.logger.info(
-            f"Iteration {counter}/{max_iterations}. {item} status conditions: {conditions}"
-        )
-        result = next((item for item in conditions if item["type"] == flag), {})
-        if result.get("status", "False") == "True":
-            self.logger.info(
-                f"{item} {name} reached the status {flag} in {counter} iterations (aprox {counter*retry_time} seconds)"
+        try:
+            self.logger.info(f"Iteration {counter}/{max_iterations}")
+            generic_object = await kubectl.get_generic_object(
+                api_group=api_group,
+                api_plural=api_plural,
+                api_version=api_version,
+                namespace=namespace,
+                name=name,
             )
-            return True, "COMPLETED"
+            if deleted:
+                if generic_object:
+                    self.logger.info(
+                        f"Found {api_plural}. Name: {name}. Namespace: '{namespace}'. API: {api_group}/{api_version}"
+                    )
+                else:
+                    self.logger.info(
+                        f"{item} {name} deleted after {counter} iterations (aprox {counter*retry_time} seconds)"
+                    )
+                    return True, "COMPLETED"
+            else:
+                if not flag:
+                    return True, "Nothing to check"
+                if generic_object:
+                    # self.logger.debug(f"{yaml.safe_dump(generic_object)}")
+                    conditions = generic_object.get("status", {}).get("conditions", [])
+                    self.logger.info(f"{item} status conditions: {conditions}")
+                else:
+                    self.logger.info(
+                        f"Could not find {api_plural}. Name: {name}. Namespace: '{namespace}'. API: {api_group}/{api_version}"
+                    )
+                    conditions = []
+                result = next((item for item in conditions if item["type"] == flag), {})
+                if result.get("status", "False") == "True":
+                    self.logger.info(
+                        f"{item} {name} reached the status {flag} in {counter} iterations (aprox {counter*retry_time} seconds)"
+                    )
+                    return True, "COMPLETED"
+                # TODO: Implement generic condition with jsonpath filter
+                # jsonpath_expr = parse(condition["jsonpath_filter"])
+                # match = jsonpath_expr.find(generic_object)
+                # if match:
+                #     value = match[0].value
+                #     if condition["function"](value, condition["value"]):
+                #         return True, "COMPLETED"
+        except Exception as e:
+            self.logger.error(f"Exception: {e}")
         await asyncio.sleep(retry_time)
         counter += 1
     return (
index 6e9061e..9c8e6e0 100644 (file)
@@ -16,7 +16,6 @@
 
 import logging
 from osm_lcm.lcm_utils import LcmBase
-
 from osm_lcm.n2vc import kubectl