Cluster Integration and Refactoring the LCM code 85/14785/3
authorrshri <shrinithi.r@tataelxsi.co.in>
Mon, 2 Dec 2024 03:42:35 +0000 (03:42 +0000)
committerrshri <shrinithi.r@tataelxsi.co.in>
Mon, 9 Dec 2024 08:55:21 +0000 (08:55 +0000)
Change-Id: I5c3a2acdb4dc3cc264293abd76a78dca3f77f323
Signed-off-by: rshri <shrinithi.r@tataelxsi.co.in>
osm_lcm/k8s.py
osm_lcm/lcm.py

index 17afe85..892f220 100644 (file)
@@ -26,11 +26,19 @@ from osm_lcm.lcm_utils import LcmBase
 from copy import deepcopy
 from osm_lcm import odu_workflows
 from osm_lcm import vim_sdn
+from osm_lcm.data_utils.list_utils import find_in_list
 
 
 class GitOpsLcm(LcmBase):
     db_collection = "gitops"
 
+    profile_collection_mapping = {
+        "infra_controller_profiles": "k8sinfra_controller",
+        "infra_config_profiles": "k8sinfra_config",
+        "resource_profiles": "k8sresource",
+        "app_profiles": "k8sapp",
+    }
+
     def __init__(self, msg, lcm_tasks, config):
         self.logger = logging.getLogger("lcm.gitops")
         self.lcm_tasks = lcm_tasks
@@ -150,6 +158,13 @@ class GitOpsLcm(LcmBase):
         }
         return db_cluster_copy
 
+    def get_operation_params(self, item, operation_id):
+        operation_history = item.get("operationHistory", [])
+        operation = find_in_list(
+            operation_history, lambda op: op["op_id"] == operation_id
+        )
+        return operation.get("operationParams", {})
+
 
 class ClusterLcm(GitOpsLcm):
     db_collection = "clusters"
@@ -174,13 +189,24 @@ class ClusterLcm(GitOpsLcm):
         }
         self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
 
-    async def create(self, op_id, op_params, content):
+    async def create(self, params, order_id):
         self.logger.info("cluster Create Enter")
-        db_cluster = content["cluster"]
 
+        # To get the cluster details
+        cluster_id = params["cluster_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+
+        # To get the operation params details
+        op_id = params["operation_id"]
+        op_params = self.get_operation_params(db_cluster, op_id)
+
+        # To initialize the operation states
+        self.initialize_operation(cluster_id, op_id)
+
+        # To copy the cluster content and decrypting the key to use in workflows
         db_cluster_copy = self.decrypting_key(db_cluster)
 
-        # vim account details
+        # To get the vim account details
         db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
         db_cluster_copy["vim_account"] = db_vim
 
@@ -238,6 +264,23 @@ class ClusterLcm(GitOpsLcm):
         db_cluster["current_operation"] = None
         self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
         self.update_profile_state(db_cluster, workflow_status, resource_status)
+
+        db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]})
+
+        if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED":
+            result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
+            # To call the lcm.py for registering the cluster in k8scluster lcm.
+            db_register["credentials"] = cluster_creds
+            self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
+            register = await self.regist.create(db_register, order_id)
+            self.logger.debug(f"Register is : {register}")
+        else:
+            db_register["_admin"]["operationalState"] = "ERROR"
+            result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
+            # To call the lcm.py for registering the cluster in k8scluster lcm.
+            db_register["credentials"] = cluster_creds
+            self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
+
         return
 
     async def check_create_cluster(self, op_id, op_params, content):
@@ -407,20 +450,20 @@ class ClusterLcm(GitOpsLcm):
             "app_profiles",
             "resource_profiles",
         ]
+        """
         profiles_collection = {
             "infra_controller_profiles": "k8sinfra_controller",
             "infra_config_profiles": "k8sinfra_config",
             "app_profiles": "k8sapp",
             "resource_profiles": "k8sresource",
         }
+        """
         self.logger.info("the db_cluster is :{}".format(db_cluster))
         for profile_type in profiles:
             profile_id = db_cluster[profile_type]
-            self.logger.info("profile id is : {}".format(profile_id))
-            db_collection = profiles_collection[profile_type]
-            self.logger.info("the db_collection is :{}".format(db_collection))
+            # db_collection = profiles_collection[profile_type]
+            db_collection = self.profile_collection_mapping[profile_type]
             db_profile = self.db.get_one(db_collection, {"_id": profile_id})
-            self.logger.info("the db_profile is :{}".format(db_profile))
             op_id = db_profile["operationHistory"][-1].get("op_id")
             db_profile["state"] = db_cluster["state"]
             db_profile["resourceState"] = db_cluster["resourceState"]
@@ -430,17 +473,30 @@ class ClusterLcm(GitOpsLcm):
             db_profile = self.update_operation_history(
                 db_profile, op_id, workflow_status, resource_status
             )
-            self.logger.info("the db_profile is :{}".format(db_profile))
             self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
 
-    async def delete(self, op_id, op_params, content):
+    async def delete(self, params, order_id):
         self.logger.info("cluster delete Enter")
-        db_cluster = content["cluster"]
+
+        # To get the cluster details
+        cluster_id = params["cluster_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+
+        # To get the operation params details
+        op_id = params["operation_id"]
+        op_params = self.get_operation_params(db_cluster, op_id)
+
+        # To initialize the operation states
+        self.initialize_operation(cluster_id, op_id)
+
+        # To copy the cluster content and decrypting the key to use in workflows
+        db_cluster_copy = self.decrypting_key(db_cluster)
+
         if db_cluster["created"] == "false":
-            return await self.deregister(op_id, op_params, content)
+            return await self.deregister(params, order_id)
 
         _, workflow_name = await self.odu.launch_workflow(
-            "delete_cluster", op_id, op_params, content
+            "delete_cluster", op_id, op_params, db_cluster_copy
         )
         self.logger.info("workflow_name is :{}".format(workflow_name))
 
@@ -466,7 +522,7 @@ class ClusterLcm(GitOpsLcm):
 
         if workflow_status:
             resource_status, resource_msg = await self.check_resource_status(
-                "delete_cluster", op_id, op_params, content
+                "delete_cluster", op_id, op_params, db_cluster_copy
             )
             self.logger.info(
                 "resource_status is :{} and resource_msg is :{}".format(
@@ -488,6 +544,10 @@ class ClusterLcm(GitOpsLcm):
         # To delete it from DB
         if db_cluster["state"] == "DELETED":
             self.delete_cluster(db_cluster)
+
+        # To delete it from k8scluster collection
+        self.db.del_one("k8sclusters", {"name": db_cluster["name"]})
+
         return
 
     def delete_cluster(self, db_cluster):
@@ -503,25 +563,21 @@ class ClusterLcm(GitOpsLcm):
             "app_profiles",
             "resource_profiles",
         ]
+        """
         profiles_collection = {
             "infra_controller_profiles": "k8sinfra_controller",
             "infra_config_profiles": "k8sinfra_config",
             "app_profiles": "k8sapp",
             "resource_profiles": "k8sresource",
         }
+        """
         for profile_type in profiles_to_detach:
             if db_cluster.get(profile_type):
-                self.logger.debug("the profile_type is :{}".format(profile_type))
                 profile_ids = db_cluster[profile_type]
-                self.logger.debug("the profile_ids is :{}".format(profile_ids))
                 profile_ids_copy = deepcopy(profile_ids)
-                self.logger.debug(
-                    "the profile_ids_copy is :{}".format(profile_ids_copy)
-                )
                 for profile_id in profile_ids_copy:
-                    self.logger.debug("the profile_id is :{}".format(profile_id))
-                    db_collection = profiles_collection[profile_type]
-                    self.logger.debug("the db_collection is :{}".format(db_collection))
+                    # db_collection = profiles_collection[profile_type]
+                    db_collection = self.profile_collection_mapping[profile_type]
                     db_profile = self.db.get_one(db_collection, {"_id": profile_id})
                     self.logger.debug("the db_profile is :{}".format(db_profile))
                     self.logger.debug(
@@ -531,30 +587,46 @@ class ClusterLcm(GitOpsLcm):
                         "the db_profile name is :{}".format(db_profile["name"])
                     )
                     if db_cluster["name"] == db_profile["name"]:
-                        self.logger.debug("it is getting into if default")
                         self.db.del_one(db_collection, {"_id": profile_id})
                     else:
-                        self.logger.debug("it is getting into else non default")
                         profile_ids.remove(profile_id)
                         update_dict = {profile_type: profile_ids}
-                        self.logger.debug(f"the update dict is :{update_dict}")
                         self.db.set_one(
                             "clusters", {"_id": db_cluster["_id"]}, update_dict
                         )
         self.db.del_one("clusters", {"_id": db_cluster["_id"]})
-        self.logger.debug("the id is :{}".format(db_cluster["_id"]))
 
-    async def attach_profile(self, op_id, op_params, content):
+    async def attach_profile(self, params, order_id):
         self.logger.info("profile attach Enter")
-        db_cluster = content["cluster"]
-        db_profile = content["profile"]
-        profile_type = db_profile["profile_type"]
-        profile_id = db_profile["_id"]
-        self.logger.info("profile type is : {}".format(profile_type))
-        self.logger.info("profile id is : {}".format(profile_id))
+
+        # To get the cluster details
+        cluster_id = params["cluster_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+        # content = {
+        #     "cluster": db_cluster,
+        # }
+
+        # To get the operation params details
+        op_id = params["operation_id"]
+        op_params = self.get_operation_params(db_cluster, op_id)
+
+        # To initialize the operation states
+        self.initialize_operation(cluster_id, op_id)
+
+        # To copy the cluster content and decrypting the key to use in workflows
+        db_cluster_copy = self.decrypting_key(db_cluster)
+
+        # To get the profile details
+        profile_id = params["profile_id"]
+        profile_type = params["profile_type"]
+        profile_collection = self.profile_collection_mapping[profile_type]
+        db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+        db_profile["profile_type"] = profile_type
+        # content["profile"] = db_profile
+        db_cluster_copy["profile"] = db_profile
 
         _, workflow_name = await self.odu.launch_workflow(
-            "attach_profile_to_cluster", op_id, op_params, content
+            "attach_profile_to_cluster", op_id, op_params, db_cluster_copy
         )
         self.logger.info("workflow_name is :{}".format(workflow_name))
 
@@ -578,7 +650,7 @@ class ClusterLcm(GitOpsLcm):
 
         if workflow_status:
             resource_status, resource_msg = await self.check_resource_status(
-                "attach_profile_to_cluster", op_id, op_params, content
+                "attach_profile_to_cluster", op_id, op_params, db_cluster_copy
             )
             self.logger.info(
                 "resource_status is :{} and resource_msg is :{}".format(
@@ -595,32 +667,45 @@ class ClusterLcm(GitOpsLcm):
             db_cluster, op_id, workflow_status, resource_status
         )
         profile_list = db_cluster[profile_type]
-        self.logger.info("profile list is : {}".format(profile_list))
         if resource_status:
-            self.logger.info("it is getting into resource status true")
             profile_list.append(profile_id)
-            self.logger.info("profile list is : {}".format(profile_list))
             db_cluster[profile_type] = profile_list
-            self.logger.info("db cluster is : {}".format(db_cluster))
-        # update_dict = {item: profile_list}
-        # self.logger.info("the update_dict is :{}".format(update_dict))
-        # self.db.set_one(self.topic, filter_q, update_dict)
         db_cluster["current_operation"] = None
         self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
 
         return
 
-    async def detach_profile(self, op_id, op_params, content):
+    async def detach_profile(self, params, order_id):
         self.logger.info("profile dettach Enter")
-        db_cluster = content["cluster"]
-        db_profile = content["profile"]
-        profile_type = db_profile["profile_type"]
-        profile_id = db_profile["_id"]
-        self.logger.info("profile type is : {}".format(profile_type))
-        self.logger.info("profile id is : {}".format(profile_id))
+
+        # To get the cluster details
+        cluster_id = params["cluster_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+        # content = {
+        #     "cluster": db_cluster,
+        # }
+
+        # To get the operation params details
+        op_id = params["operation_id"]
+        op_params = self.get_operation_params(db_cluster, op_id)
+
+        # To initialize the operation states
+        self.initialize_operation(cluster_id, op_id)
+
+        # To copy the cluster content and decrypting the key to use in workflows
+        db_cluster_copy = self.decrypting_key(db_cluster)
+
+        # To get the profile details
+        profile_id = params["profile_id"]
+        profile_type = params["profile_type"]
+        profile_collection = self.profile_collection_mapping[profile_type]
+        db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+        db_profile["profile_type"] = profile_type
+        # content["profile"] = db_profile
+        db_cluster_copy["profile"] = db_profile
 
         _, workflow_name = await self.odu.launch_workflow(
-            "detach_profile_from_cluster", op_id, op_params, content
+            "detach_profile_from_cluster", op_id, op_params, db_cluster_copy
         )
         self.logger.info("workflow_name is :{}".format(workflow_name))
 
@@ -644,7 +729,7 @@ class ClusterLcm(GitOpsLcm):
 
         if workflow_status:
             resource_status, resource_msg = await self.check_resource_status(
-                "detach_profile_from_cluster", op_id, op_params, content
+                "detach_profile_from_cluster", op_id, op_params, db_cluster_copy
             )
             self.logger.info(
                 "resource_status is :{} and resource_msg is :{}".format(
@@ -663,23 +748,31 @@ class ClusterLcm(GitOpsLcm):
         profile_list = db_cluster[profile_type]
         self.logger.info("profile list is : {}".format(profile_list))
         if resource_status:
-            self.logger.info("it is getting into resource status true")
             profile_list.remove(profile_id)
-            self.logger.info("profile list is : {}".format(profile_list))
             db_cluster[profile_type] = profile_list
-            self.logger.info("db cluster is : {}".format(db_cluster))
-        # update_dict = {item: profile_list}
-        # self.logger.info("the update_dict is :{}".format(update_dict))
-        # self.db.set_one(self.topic, filter_q, update_dict)
         db_cluster["current_operation"] = None
         self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
 
         return
 
-    async def register(self, op_id, op_params, content):
+    async def register(self, params, order_id):
         self.logger.info("cluster register enter")
-        db_cluster = content["cluster"]
 
+        # To get the cluster details
+        cluster_id = params["cluster_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+        # content = {
+        #     "cluster": db_cluster,
+        # }
+
+        # To get the operation params details
+        op_id = params["operation_id"]
+        op_params = self.get_operation_params(db_cluster, op_id)
+
+        # To initialize the operation states
+        self.initialize_operation(cluster_id, op_id)
+
+        # To copy the cluster content and decrypting the key to use in workflows
         db_cluster_copy = self.decrypting_key(db_cluster)
 
         _, workflow_name = await self.odu.launch_workflow(
@@ -735,16 +828,43 @@ class ClusterLcm(GitOpsLcm):
         )
         db_cluster["current_operation"] = None
         self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+        db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]})
+        db_register["credentials"] = db_cluster["credentials"]
+        self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
+
+        if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED":
+            # To call the lcm.py for registering the cluster in k8scluster lcm.
+            register = await self.regist.create(db_register, order_id)
+            self.logger.debug(f"Register is : {register}")
+        else:
+            db_register["_admin"]["operationalState"] = "ERROR"
+            self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
+
         return
 
-    async def deregister(self, op_id, op_params, content):
+    async def deregister(self, params, order_id):
         self.logger.info("cluster deregister enter")
-        db_cluster = content["cluster"]
 
-        self.logger.info("db_cluster is : {}".format(db_cluster))
+        # To get the cluster details
+        cluster_id = params["cluster_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+        # content = {
+        #     "cluster": db_cluster,
+        # }
+
+        # To get the operation params details
+        op_id = params["operation_id"]
+        op_params = self.get_operation_params(db_cluster, op_id)
+
+        # To initialize the operation states
+        self.initialize_operation(cluster_id, op_id)
+
+        # To copy the cluster content and decrypting the key to use in workflows
+        db_cluster_copy = self.decrypting_key(db_cluster)
 
         _, workflow_name = await self.odu.launch_workflow(
-            "deregister_cluster", op_id, op_params, content
+            "deregister_cluster", op_id, op_params, db_cluster_copy
         )
         self.logger.info("workflow_name is :{}".format(workflow_name))
 
@@ -770,7 +890,7 @@ class ClusterLcm(GitOpsLcm):
 
         # Clean items used in the workflow or in the cluster, no matter if the workflow succeeded
         clean_status, clean_msg = await self.odu.clean_items_workflow(
-            "deregister_cluster", op_id, op_params, content
+            "deregister_cluster", op_id, op_params, db_cluster_copy
         )
         self.logger.info(
             f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
@@ -778,7 +898,7 @@ class ClusterLcm(GitOpsLcm):
 
         if workflow_status:
             resource_status, resource_msg = await self.check_resource_status(
-                "deregister_cluster", op_id, op_params, content
+                "deregister_cluster", op_id, op_params, db_cluster_copy
             )
             self.logger.info(
                 "resource_status is :{} and resource_msg is :{}".format(
@@ -800,10 +920,17 @@ class ClusterLcm(GitOpsLcm):
         # To delete it from DB
         if db_cluster["state"] == "DELETED":
             self.db.del_one("clusters", {"_id": db_cluster["_id"]})
+
+        # To delete it from k8scluster collection
+        self.db.del_one("k8sclusters", {"name": db_cluster["name"]})
+
         return
 
-    async def get_creds(self, op_id, db_cluster):
+    async def get_creds(self, params, order_id):
         self.logger.info("Cluster get creds Enter")
+        cluster_id = params["cluster_id"]
+        op_id = params["operation_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
         result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
         if result:
             db_cluster["credentials"] = cluster_creds
@@ -815,11 +942,18 @@ class ClusterLcm(GitOpsLcm):
             op_len += 1
         db_cluster["current_operation"] = None
         self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+        self.logger.info("Cluster Get Creds Exit")
         return
 
-    async def update(self, op_id, op_params, content):
+    async def update(self, params, order_id):
         self.logger.info("Cluster update Enter")
-        db_cluster = content["cluster"]
+        # To get the cluster details
+        cluster_id = params["cluster_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+
+        # To get the operation params details
+        op_id = params["operation_id"]
+        op_params = self.get_operation_params(db_cluster, op_id)
 
         db_cluster_copy = self.decrypting_key(db_cluster)
 
@@ -1002,6 +1136,8 @@ class CloudCredentialsLcm(GitOpsLcm):
 
 
 class K8sAppLcm(GitOpsLcm):
+    db_collection = "k8sapp"
+
     def __init__(self, msg, lcm_tasks, config):
         """
         Init, Connect to database, filesystem storage, and messaging
@@ -1010,9 +1146,20 @@ class K8sAppLcm(GitOpsLcm):
         """
         super().__init__(msg, lcm_tasks, config)
 
-    async def create(self, op_id, op_params, content):
+    async def create(self, params, order_id):
         self.logger.info("App Create Enter")
 
+        op_id = params["operation_id"]
+        profile_id = params["profile_id"]
+
+        # To initialize the operation states
+        self.initialize_operation(profile_id, op_id)
+
+        content = self.db.get_one("k8sapp", {"_id": profile_id})
+        content["profile_type"] = "applications"
+        op_params = self.get_operation_params(content, op_id)
+        self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+
         _, workflow_name = await self.odu.launch_workflow(
             "create_profile", op_id, op_params, content
         )
@@ -1059,9 +1206,18 @@ class K8sAppLcm(GitOpsLcm):
 
         return
 
-    async def delete(self, op_id, op_params, content):
+    async def delete(self, params, order_id):
         self.logger.info("App delete Enter")
 
+        op_id = params["operation_id"]
+        profile_id = params["profile_id"]
+
+        # To initialize the operation states
+        self.initialize_operation(profile_id, op_id)
+
+        content = self.db.get_one("k8sapp", {"_id": profile_id})
+        op_params = self.get_operation_params(content, op_id)
+
         _, workflow_name = await self.odu.launch_workflow(
             "delete_profile", op_id, op_params, content
         )
@@ -1113,6 +1269,8 @@ class K8sAppLcm(GitOpsLcm):
 
 
 class K8sResourceLcm(GitOpsLcm):
+    db_collection = "k8sresource"
+
     def __init__(self, msg, lcm_tasks, config):
         """
         Init, Connect to database, filesystem storage, and messaging
@@ -1121,9 +1279,20 @@ class K8sResourceLcm(GitOpsLcm):
         """
         super().__init__(msg, lcm_tasks, config)
 
-    async def create(self, op_id, op_params, content):
+    async def create(self, params, order_id):
         self.logger.info("Resource Create Enter")
 
+        op_id = params["operation_id"]
+        profile_id = params["profile_id"]
+
+        # To initialize the operation states
+        self.initialize_operation(profile_id, op_id)
+
+        content = self.db.get_one("k8sresource", {"_id": profile_id})
+        content["profile_type"] = "managed-resources"
+        op_params = self.get_operation_params(content, op_id)
+        self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+
         _, workflow_name = await self.odu.launch_workflow(
             "create_profile", op_id, op_params, content
         )
@@ -1170,9 +1339,17 @@ class K8sResourceLcm(GitOpsLcm):
 
         return
 
-    async def delete(self, op_id, op_params, content):
+    async def delete(self, params, order_id):
         self.logger.info("Resource delete Enter")
-        content = self.db.get_one("k8sresource", {"_id": content["_id"]})
+
+        op_id = params["operation_id"]
+        profile_id = params["profile_id"]
+
+        # To initialize the operation states
+        self.initialize_operation(profile_id, op_id)
+
+        content = self.db.get_one("k8sresource", {"_id": profile_id})
+        op_params = self.get_operation_params(content, op_id)
 
         _, workflow_name = await self.odu.launch_workflow(
             "delete_profile", op_id, op_params, content
@@ -1225,6 +1402,8 @@ class K8sResourceLcm(GitOpsLcm):
 
 
 class K8sInfraControllerLcm(GitOpsLcm):
+    db_collection = "k8sinfra_controller"
+
     def __init__(self, msg, lcm_tasks, config):
         """
         Init, Connect to database, filesystem storage, and messaging
@@ -1233,9 +1412,20 @@ class K8sInfraControllerLcm(GitOpsLcm):
         """
         super().__init__(msg, lcm_tasks, config)
 
-    async def create(self, op_id, op_params, content):
+    async def create(self, params, order_id):
         self.logger.info("Infra controller Create Enter")
 
+        op_id = params["operation_id"]
+        profile_id = params["profile_id"]
+
+        # To initialize the operation states
+        self.initialize_operation(profile_id, op_id)
+
+        content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
+        content["profile_type"] = "infra-controllers"
+        op_params = self.get_operation_params(content, op_id)
+        self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+
         _, workflow_name = await self.odu.launch_workflow(
             "create_profile", op_id, op_params, content
         )
@@ -1282,9 +1472,18 @@ class K8sInfraControllerLcm(GitOpsLcm):
 
         return
 
-    async def delete(self, op_id, op_params, content):
+    async def delete(self, params, order_id):
         self.logger.info("Infra controller delete Enter")
 
+        op_id = params["operation_id"]
+        profile_id = params["profile_id"]
+
+        # To initialize the operation states
+        self.initialize_operation(profile_id, op_id)
+
+        content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
+        op_params = self.get_operation_params(content, op_id)
+
         _, workflow_name = await self.odu.launch_workflow(
             "delete_profile", op_id, op_params, content
         )
@@ -1336,6 +1535,8 @@ class K8sInfraControllerLcm(GitOpsLcm):
 
 
 class K8sInfraConfigLcm(GitOpsLcm):
+    db_collection = "k8sinfra_config"
+
     def __init__(self, msg, lcm_tasks, config):
         """
         Init, Connect to database, filesystem storage, and messaging
@@ -1344,9 +1545,20 @@ class K8sInfraConfigLcm(GitOpsLcm):
         """
         super().__init__(msg, lcm_tasks, config)
 
-    async def create(self, op_id, op_params, content):
+    async def create(self, params, order_id):
         self.logger.info("Infra config Create Enter")
 
+        op_id = params["operation_id"]
+        profile_id = params["profile_id"]
+
+        # To initialize the operation states
+        self.initialize_operation(profile_id, op_id)
+
+        content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
+        content["profile_type"] = "infra-configs"
+        op_params = self.get_operation_params(content, op_id)
+        self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+
         _, workflow_name = await self.odu.launch_workflow(
             "create_profile", op_id, op_params, content
         )
@@ -1393,9 +1605,18 @@ class K8sInfraConfigLcm(GitOpsLcm):
 
         return
 
-    async def delete(self, op_id, op_params, content):
+    async def delete(self, params, order_id):
         self.logger.info("Infra config delete Enter")
 
+        op_id = params["operation_id"]
+        profile_id = params["profile_id"]
+
+        # To initialize the operation states
+        self.initialize_operation(profile_id, op_id)
+
+        content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
+        op_params = self.get_operation_params(content, op_id)
+
         _, workflow_name = await self.odu.launch_workflow(
             "delete_profile", op_id, op_params, content
         )
index 2e67db6..54f9954 100644 (file)
@@ -758,112 +758,58 @@ class Lcm:
             elif command == "deleted":
                 return  # TODO cleaning of task just in case should be done
         elif topic == "cluster":
-            if command != "get_creds":
-                op_id = params["operation_id"]
-                cluster_id = params["cluster_id"]
-                self.cluster.initialize_operation(cluster_id, op_id)
-                db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
-                """
-                if command in (
-                    "create",
-                    "created",
-                    "register",
-                    "registered",
-                    "upgrade",
-                    "scale",
-                ):
-                    self.db.encrypt_decrypt_fields(
-                        db_cluster,
-                        "decrypt",
-                        ["age_pubkey", "age_privkey"],
-                        schema_version="1.11",
-                        salt=cluster_id,
-                    )
-                """
-                op_params = self.get_operation_params(db_cluster, op_id)
-                db_content = {
-                    "cluster": db_cluster,
-                }
+            cluster_id = params["cluster_id"]
+            op_id = params["operation_id"]
             if command == "create" or command == "created":
                 self.logger.debug("cluster_id = {}".format(cluster_id))
-                # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
-                """
-                db_vim = self.db.get_one(
-                    "vim_accounts", {"name": db_cluster["vim_account"]}
-                )
-                db_content["vim_account"] = db_vim
-                """
-                task = asyncio.ensure_future(
-                    self.cluster.create(op_id, op_params, db_content)
-                )
+                task = asyncio.ensure_future(self.cluster.create(params, order_id))
                 self.lcm_tasks.register(
                     "cluster", cluster_id, op_id, "cluster_create", task
                 )
                 return
             elif command == "delete" or command == "deleted":
-                task = asyncio.ensure_future(
-                    self.cluster.delete(op_id, op_params, db_content)
-                )
+                task = asyncio.ensure_future(self.cluster.delete(params, order_id))
                 self.lcm_tasks.register(
                     "cluster", cluster_id, op_id, "cluster_delete", task
                 )
                 return
             elif command == "add" or command == "added":
-                profile_type = params["profile_type"]
-                profile_collection = self.profile_collection_mapping[profile_type]
-                db_profile = self.db.get_one(
-                    profile_collection, {"_id": params["profile_id"]}
-                )
-                db_profile["profile_type"] = profile_type
-                db_content["profile"] = db_profile
                 task = asyncio.ensure_future(
-                    self.cluster.attach_profile(op_id, op_params, db_content)
+                    self.cluster.attach_profile(params, order_id)
                 )
                 self.lcm_tasks.register(
                     "cluster", cluster_id, op_id, "profile_add", task
                 )
                 return
             elif command == "remove" or command == "removed":
-                profile_type = params["profile_type"]
-                profile_collection = self.profile_collection_mapping[profile_type]
-                db_profile = self.db.get_one(
-                    profile_collection, {"_id": params["profile_id"]}
-                )
-                db_profile["profile_type"] = profile_type
-                db_content["profile"] = db_profile
                 task = asyncio.ensure_future(
-                    self.cluster.detach_profile(op_id, op_params, db_content)
+                    self.cluster.detach_profile(params, order_id)
                 )
                 self.lcm_tasks.register(
                     "cluster", cluster_id, op_id, "profile_remove", task
                 )
                 return
             elif command == "register" or command == "registered":
-                task = asyncio.ensure_future(
-                    self.cluster.register(op_id, op_params, db_content)
-                )
+                task = asyncio.ensure_future(self.cluster.register(params, order_id))
                 self.lcm_tasks.register(
                     "cluster", cluster_id, op_id, "cluster_register", task
                 )
                 return
             elif command == "deregister" or command == "deregistered":
-                task = asyncio.ensure_future(
-                    self.cluster.deregister(op_id, op_params, db_content)
-                )
+                task = asyncio.ensure_future(self.cluster.deregister(params, order_id))
                 self.lcm_tasks.register(
                     "cluster", cluster_id, op_id, "cluster_deregister", task
                 )
                 return
             elif command == "get_creds":
-                cluster_id = params["cluster_id"]
-                op_id = params["operation_id"]
-                db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
-                task = asyncio.ensure_future(self.cluster.get_creds(op_id, db_cluster))
+                task = asyncio.ensure_future(self.cluster.get_creds(params, order_id))
                 self.lcm_tasks.register(
                     "cluster", cluster_id, cluster_id, "cluster_get_credentials", task
                 )
                 return
             elif command == "upgrade" or command == "scale" or command == "update":
+                cluster_id = params["cluster_id"]
+                op_id = params["operation_id"]
                 # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
                 """
                 db_vim = self.db.get_one(
@@ -871,9 +817,7 @@ class Lcm:
                 )
                 db_content["vim_account"] = db_vim
                 """
-                task = asyncio.ensure_future(
-                    self.cluster.update(op_id, op_params, db_content)
-                )
+                task = asyncio.ensure_future(self.cluster.update(params, order_id))
                 self.lcm_tasks.register(
                     "cluster", cluster_id, op_id, "cluster_update", task
                 )
@@ -881,24 +825,16 @@ class Lcm:
         elif topic == "k8s_app":
             op_id = params["operation_id"]
             profile_id = params["profile_id"]
-            self.k8s_app.initialize_operation(profile_id, op_id)
-            db_profile = self.db.get_one("k8sapp", {"_id": profile_id})
-            db_profile["profile_type"] = "applications"
-            op_params = self.get_operation_params(db_profile, op_id)
             if command == "profile_create" or command == "profile_created":
                 self.logger.debug("Create k8s_app_id = {}".format(profile_id))
-                task = asyncio.ensure_future(
-                    self.k8s_app.create(op_id, op_params, db_profile)
-                )
+                task = asyncio.ensure_future(self.k8s_app.create(params, order_id))
                 self.lcm_tasks.register(
                     "k8s_app", profile_id, op_id, "k8s_app_create", task
                 )
                 return
             elif command == "delete" or command == "deleted":
                 self.logger.debug("Delete k8s_app_id = {}".format(profile_id))
-                task = asyncio.ensure_future(
-                    self.k8s_app.delete(op_id, op_params, db_profile)
-                )
+                task = asyncio.ensure_future(self.k8s_app.delete(params, order_id))
                 self.lcm_tasks.register(
                     "k8s_app", profile_id, op_id, "k8s_app_delete", task
                 )
@@ -906,15 +842,9 @@ class Lcm:
         elif topic == "k8s_resource":
             op_id = params["operation_id"]
             profile_id = params["profile_id"]
-            self.k8s_resource.initialize_operation(profile_id, op_id)
-            db_profile = self.db.get_one("k8sresource", {"_id": profile_id})
-            db_profile["profile_type"] = "managed-resources"
-            op_params = self.get_operation_params(db_profile, op_id)
             if command == "profile_create" or command == "profile_created":
                 self.logger.debug("Create k8s_resource_id = {}".format(profile_id))
-                task = asyncio.ensure_future(
-                    self.k8s_resource.create(op_id, op_params, db_profile)
-                )
+                task = asyncio.ensure_future(self.k8s_resource.create(params, order_id))
                 self.lcm_tasks.register(
                     "k8s_resource",
                     profile_id,
@@ -925,9 +855,7 @@ class Lcm:
                 return
             elif command == "delete" or command == "deleted":
                 self.logger.debug("Delete k8s_resource_id = {}".format(profile_id))
-                task = asyncio.ensure_future(
-                    self.k8s_resource.delete(op_id, op_params, db_profile)
-                )
+                task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id))
                 self.lcm_tasks.register(
                     "k8s_resource",
                     profile_id,
@@ -940,16 +868,12 @@ class Lcm:
         elif topic == "k8s_infra_controller":
             op_id = params["operation_id"]
             profile_id = params["profile_id"]
-            self.k8s_infra_controller.initialize_operation(profile_id, op_id)
-            db_profile = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
-            db_profile["profile_type"] = "infra-controllers"
-            op_params = self.get_operation_params(db_profile, op_id)
             if command == "profile_create" or command == "profile_created":
                 self.logger.debug(
                     "Create k8s_infra_controller_id = {}".format(profile_id)
                 )
                 task = asyncio.ensure_future(
-                    self.k8s_infra_controller.create(op_id, op_params, db_profile)
+                    self.k8s_infra_controller.create(params, order_id)
                 )
                 self.lcm_tasks.register(
                     "k8s_infra_controller",
@@ -964,7 +888,7 @@ class Lcm:
                     "Delete k8s_infra_controller_id = {}".format(profile_id)
                 )
                 task = asyncio.ensure_future(
-                    self.k8s_infra_controller.delete(op_id, op_params, db_profile)
+                    self.k8s_infra_controller.delete(params, order_id)
                 )
                 self.lcm_tasks.register(
                     "k8s_infra_controller",
@@ -978,14 +902,10 @@ class Lcm:
         elif topic == "k8s_infra_config":
             op_id = params["operation_id"]
             profile_id = params["profile_id"]
-            self.k8s_infra_config.initialize_operation(profile_id, op_id)
-            db_profile = self.db.get_one("k8sinfra_config", {"_id": profile_id})
-            db_profile["profile_type"] = "infra-configs"
-            op_params = self.get_operation_params(db_profile, op_id)
             if command == "profile_create" or command == "profile_created":
                 self.logger.debug("Create k8s_infra_config_id = {}".format(profile_id))
                 task = asyncio.ensure_future(
-                    self.k8s_infra_config.create(op_id, op_params, db_profile)
+                    self.k8s_infra_config.create(params, order_id)
                 )
                 self.lcm_tasks.register(
                     "k8s_infra_config",
@@ -998,7 +918,7 @@ class Lcm:
             elif command == "delete" or command == "deleted":
                 self.logger.debug("Delete k8s_infra_config_id = {}".format(profile_id))
                 task = asyncio.ensure_future(
-                    self.k8s_infra_config.delete(op_id, op_params, db_profile)
+                    self.k8s_infra_config.delete(params, order_id)
                 )
                 self.lcm_tasks.register(
                     "k8s_infra_config",