From 948f7de7daf1313c054a5f6a5e76c8d5beacdaae Mon Sep 17 00:00:00 2001 From: rshri Date: Mon, 2 Dec 2024 03:42:35 +0000 Subject: [PATCH] Cluster Integration and Refactoring the LCM code Change-Id: I5c3a2acdb4dc3cc264293abd76a78dca3f77f323 Signed-off-by: rshri --- osm_lcm/k8s.py | 373 +++++++++++++++++++++++++++++++++++++++---------- osm_lcm/lcm.py | 120 +++------------- 2 files changed, 317 insertions(+), 176 deletions(-) diff --git a/osm_lcm/k8s.py b/osm_lcm/k8s.py index 17afe85c..892f2203 100644 --- a/osm_lcm/k8s.py +++ b/osm_lcm/k8s.py @@ -26,11 +26,19 @@ from osm_lcm.lcm_utils import LcmBase from copy import deepcopy from osm_lcm import odu_workflows from osm_lcm import vim_sdn +from osm_lcm.data_utils.list_utils import find_in_list class GitOpsLcm(LcmBase): db_collection = "gitops" + profile_collection_mapping = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "resource_profiles": "k8sresource", + "app_profiles": "k8sapp", + } + def __init__(self, msg, lcm_tasks, config): self.logger = logging.getLogger("lcm.gitops") self.lcm_tasks = lcm_tasks @@ -150,6 +158,13 @@ class GitOpsLcm(LcmBase): } return db_cluster_copy + def get_operation_params(self, item, operation_id): + operation_history = item.get("operationHistory", []) + operation = find_in_list( + operation_history, lambda op: op["op_id"] == operation_id + ) + return operation.get("operationParams", {}) + class ClusterLcm(GitOpsLcm): db_collection = "clusters" @@ -174,13 +189,24 @@ class ClusterLcm(GitOpsLcm): } self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config) - async def create(self, op_id, op_params, content): + async def create(self, params, order_id): self.logger.info("cluster Create Enter") - db_cluster = content["cluster"] + # To get the cluster details + cluster_id = params["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + + # To get the operation params details + op_id = params["operation_id"] + op_params = self.get_operation_params(db_cluster, op_id) + + # To initialize the operation states + self.initialize_operation(cluster_id, op_id) + + # To copy the cluster content and decrypting the key to use in workflows db_cluster_copy = self.decrypting_key(db_cluster) - # vim account details + # To get the vim account details db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]}) db_cluster_copy["vim_account"] = db_vim @@ -238,6 +264,23 @@ class ClusterLcm(GitOpsLcm): db_cluster["current_operation"] = None self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) self.update_profile_state(db_cluster, workflow_status, resource_status) + + db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]}) + + if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED": + result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster) + # To call the lcm.py for registering the cluster in k8scluster lcm. + db_register["credentials"] = cluster_creds + self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register) + register = await self.regist.create(db_register, order_id) + self.logger.debug(f"Register is : {register}") + else: + db_register["_admin"]["operationalState"] = "ERROR" + result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster) + # To call the lcm.py for registering the cluster in k8scluster lcm. + db_register["credentials"] = cluster_creds + self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register) + return async def check_create_cluster(self, op_id, op_params, content): @@ -407,20 +450,20 @@ class ClusterLcm(GitOpsLcm): "app_profiles", "resource_profiles", ] + """ profiles_collection = { "infra_controller_profiles": "k8sinfra_controller", "infra_config_profiles": "k8sinfra_config", "app_profiles": "k8sapp", "resource_profiles": "k8sresource", } + """ self.logger.info("the db_cluster is :{}".format(db_cluster)) for profile_type in profiles: profile_id = db_cluster[profile_type] - self.logger.info("profile id is : {}".format(profile_id)) - db_collection = profiles_collection[profile_type] - self.logger.info("the db_collection is :{}".format(db_collection)) + # db_collection = profiles_collection[profile_type] + db_collection = self.profile_collection_mapping[profile_type] db_profile = self.db.get_one(db_collection, {"_id": profile_id}) - self.logger.info("the db_profile is :{}".format(db_profile)) op_id = db_profile["operationHistory"][-1].get("op_id") db_profile["state"] = db_cluster["state"] db_profile["resourceState"] = db_cluster["resourceState"] @@ -430,17 +473,30 @@ class ClusterLcm(GitOpsLcm): db_profile = self.update_operation_history( db_profile, op_id, workflow_status, resource_status ) - self.logger.info("the db_profile is :{}".format(db_profile)) self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile) - async def delete(self, op_id, op_params, content): + async def delete(self, params, order_id): self.logger.info("cluster delete Enter") - db_cluster = content["cluster"] + + # To get the cluster details + cluster_id = params["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + + # To get the operation params details + op_id = params["operation_id"] + op_params = self.get_operation_params(db_cluster, op_id) + + # To initialize the operation states + self.initialize_operation(cluster_id, op_id) + + # To copy the cluster content and decrypting the key to use in workflows + db_cluster_copy = self.decrypting_key(db_cluster) + if db_cluster["created"] == "false": - return await self.deregister(op_id, op_params, content) + return await self.deregister(params, order_id) _, workflow_name = await self.odu.launch_workflow( - "delete_cluster", op_id, op_params, content + "delete_cluster", op_id, op_params, db_cluster_copy ) self.logger.info("workflow_name is :{}".format(workflow_name)) @@ -466,7 +522,7 @@ class ClusterLcm(GitOpsLcm): if workflow_status: resource_status, resource_msg = await self.check_resource_status( - "delete_cluster", op_id, op_params, content + "delete_cluster", op_id, op_params, db_cluster_copy ) self.logger.info( "resource_status is :{} and resource_msg is :{}".format( @@ -488,6 +544,10 @@ class ClusterLcm(GitOpsLcm): # To delete it from DB if db_cluster["state"] == "DELETED": self.delete_cluster(db_cluster) + + # To delete it from k8scluster collection + self.db.del_one("k8sclusters", {"name": db_cluster["name"]}) + return def delete_cluster(self, db_cluster): @@ -503,25 +563,21 @@ class ClusterLcm(GitOpsLcm): "app_profiles", "resource_profiles", ] + """ profiles_collection = { "infra_controller_profiles": "k8sinfra_controller", "infra_config_profiles": "k8sinfra_config", "app_profiles": "k8sapp", "resource_profiles": "k8sresource", } + """ for profile_type in profiles_to_detach: if db_cluster.get(profile_type): - self.logger.debug("the profile_type is :{}".format(profile_type)) profile_ids = db_cluster[profile_type] - self.logger.debug("the profile_ids is :{}".format(profile_ids)) profile_ids_copy = deepcopy(profile_ids) - self.logger.debug( - "the profile_ids_copy is :{}".format(profile_ids_copy) - ) for profile_id in profile_ids_copy: - self.logger.debug("the profile_id is :{}".format(profile_id)) - db_collection = profiles_collection[profile_type] - self.logger.debug("the db_collection is :{}".format(db_collection)) + # db_collection = profiles_collection[profile_type] + db_collection = self.profile_collection_mapping[profile_type] db_profile = self.db.get_one(db_collection, {"_id": profile_id}) self.logger.debug("the db_profile is :{}".format(db_profile)) self.logger.debug( @@ -531,30 +587,46 @@ class ClusterLcm(GitOpsLcm): "the db_profile name is :{}".format(db_profile["name"]) ) if db_cluster["name"] == db_profile["name"]: - self.logger.debug("it is getting into if default") self.db.del_one(db_collection, {"_id": profile_id}) else: - self.logger.debug("it is getting into else non default") profile_ids.remove(profile_id) update_dict = {profile_type: profile_ids} - self.logger.debug(f"the update dict is :{update_dict}") self.db.set_one( "clusters", {"_id": db_cluster["_id"]}, update_dict ) self.db.del_one("clusters", {"_id": db_cluster["_id"]}) - self.logger.debug("the id is :{}".format(db_cluster["_id"])) - async def attach_profile(self, op_id, op_params, content): + async def attach_profile(self, params, order_id): self.logger.info("profile attach Enter") - db_cluster = content["cluster"] - db_profile = content["profile"] - profile_type = db_profile["profile_type"] - profile_id = db_profile["_id"] - self.logger.info("profile type is : {}".format(profile_type)) - self.logger.info("profile id is : {}".format(profile_id)) + + # To get the cluster details + cluster_id = params["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + # content = { + # "cluster": db_cluster, + # } + + # To get the operation params details + op_id = params["operation_id"] + op_params = self.get_operation_params(db_cluster, op_id) + + # To initialize the operation states + self.initialize_operation(cluster_id, op_id) + + # To copy the cluster content and decrypting the key to use in workflows + db_cluster_copy = self.decrypting_key(db_cluster) + + # To get the profile details + profile_id = params["profile_id"] + profile_type = params["profile_type"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one(profile_collection, {"_id": profile_id}) + db_profile["profile_type"] = profile_type + # content["profile"] = db_profile + db_cluster_copy["profile"] = db_profile _, workflow_name = await self.odu.launch_workflow( - "attach_profile_to_cluster", op_id, op_params, content + "attach_profile_to_cluster", op_id, op_params, db_cluster_copy ) self.logger.info("workflow_name is :{}".format(workflow_name)) @@ -578,7 +650,7 @@ class ClusterLcm(GitOpsLcm): if workflow_status: resource_status, resource_msg = await self.check_resource_status( - "attach_profile_to_cluster", op_id, op_params, content + "attach_profile_to_cluster", op_id, op_params, db_cluster_copy ) self.logger.info( "resource_status is :{} and resource_msg is :{}".format( @@ -595,32 +667,45 @@ class ClusterLcm(GitOpsLcm): db_cluster, op_id, workflow_status, resource_status ) profile_list = db_cluster[profile_type] - self.logger.info("profile list is : {}".format(profile_list)) if resource_status: - self.logger.info("it is getting into resource status true") profile_list.append(profile_id) - self.logger.info("profile list is : {}".format(profile_list)) db_cluster[profile_type] = profile_list - self.logger.info("db cluster is : {}".format(db_cluster)) - # update_dict = {item: profile_list} - # self.logger.info("the update_dict is :{}".format(update_dict)) - # self.db.set_one(self.topic, filter_q, update_dict) db_cluster["current_operation"] = None self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) return - async def detach_profile(self, op_id, op_params, content): + async def detach_profile(self, params, order_id): self.logger.info("profile dettach Enter") - db_cluster = content["cluster"] - db_profile = content["profile"] - profile_type = db_profile["profile_type"] - profile_id = db_profile["_id"] - self.logger.info("profile type is : {}".format(profile_type)) - self.logger.info("profile id is : {}".format(profile_id)) + + # To get the cluster details + cluster_id = params["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + # content = { + # "cluster": db_cluster, + # } + + # To get the operation params details + op_id = params["operation_id"] + op_params = self.get_operation_params(db_cluster, op_id) + + # To initialize the operation states + self.initialize_operation(cluster_id, op_id) + + # To copy the cluster content and decrypting the key to use in workflows + db_cluster_copy = self.decrypting_key(db_cluster) + + # To get the profile details + profile_id = params["profile_id"] + profile_type = params["profile_type"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one(profile_collection, {"_id": profile_id}) + db_profile["profile_type"] = profile_type + # content["profile"] = db_profile + db_cluster_copy["profile"] = db_profile _, workflow_name = await self.odu.launch_workflow( - "detach_profile_from_cluster", op_id, op_params, content + "detach_profile_from_cluster", op_id, op_params, db_cluster_copy ) self.logger.info("workflow_name is :{}".format(workflow_name)) @@ -644,7 +729,7 @@ class ClusterLcm(GitOpsLcm): if workflow_status: resource_status, resource_msg = await self.check_resource_status( - "detach_profile_from_cluster", op_id, op_params, content + "detach_profile_from_cluster", op_id, op_params, db_cluster_copy ) self.logger.info( "resource_status is :{} and resource_msg is :{}".format( @@ -663,23 +748,31 @@ class ClusterLcm(GitOpsLcm): profile_list = db_cluster[profile_type] self.logger.info("profile list is : {}".format(profile_list)) if resource_status: - self.logger.info("it is getting into resource status true") profile_list.remove(profile_id) - self.logger.info("profile list is : {}".format(profile_list)) db_cluster[profile_type] = profile_list - self.logger.info("db cluster is : {}".format(db_cluster)) - # update_dict = {item: profile_list} - # self.logger.info("the update_dict is :{}".format(update_dict)) - # self.db.set_one(self.topic, filter_q, update_dict) db_cluster["current_operation"] = None self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) return - async def register(self, op_id, op_params, content): + async def register(self, params, order_id): self.logger.info("cluster register enter") - db_cluster = content["cluster"] + # To get the cluster details + cluster_id = params["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + # content = { + # "cluster": db_cluster, + # } + + # To get the operation params details + op_id = params["operation_id"] + op_params = self.get_operation_params(db_cluster, op_id) + + # To initialize the operation states + self.initialize_operation(cluster_id, op_id) + + # To copy the cluster content and decrypting the key to use in workflows db_cluster_copy = self.decrypting_key(db_cluster) _, workflow_name = await self.odu.launch_workflow( @@ -735,16 +828,43 @@ class ClusterLcm(GitOpsLcm): ) db_cluster["current_operation"] = None self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) + + db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]}) + db_register["credentials"] = db_cluster["credentials"] + self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register) + + if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED": + # To call the lcm.py for registering the cluster in k8scluster lcm. + register = await self.regist.create(db_register, order_id) + self.logger.debug(f"Register is : {register}") + else: + db_register["_admin"]["operationalState"] = "ERROR" + self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register) + return - async def deregister(self, op_id, op_params, content): + async def deregister(self, params, order_id): self.logger.info("cluster deregister enter") - db_cluster = content["cluster"] - self.logger.info("db_cluster is : {}".format(db_cluster)) + # To get the cluster details + cluster_id = params["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + # content = { + # "cluster": db_cluster, + # } + + # To get the operation params details + op_id = params["operation_id"] + op_params = self.get_operation_params(db_cluster, op_id) + + # To initialize the operation states + self.initialize_operation(cluster_id, op_id) + + # To copy the cluster content and decrypting the key to use in workflows + db_cluster_copy = self.decrypting_key(db_cluster) _, workflow_name = await self.odu.launch_workflow( - "deregister_cluster", op_id, op_params, content + "deregister_cluster", op_id, op_params, db_cluster_copy ) self.logger.info("workflow_name is :{}".format(workflow_name)) @@ -770,7 +890,7 @@ class ClusterLcm(GitOpsLcm): # Clean items used in the workflow or in the cluster, no matter if the workflow succeeded clean_status, clean_msg = await self.odu.clean_items_workflow( - "deregister_cluster", op_id, op_params, content + "deregister_cluster", op_id, op_params, db_cluster_copy ) self.logger.info( f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" @@ -778,7 +898,7 @@ class ClusterLcm(GitOpsLcm): if workflow_status: resource_status, resource_msg = await self.check_resource_status( - "deregister_cluster", op_id, op_params, content + "deregister_cluster", op_id, op_params, db_cluster_copy ) self.logger.info( "resource_status is :{} and resource_msg is :{}".format( @@ -800,10 +920,17 @@ class ClusterLcm(GitOpsLcm): # To delete it from DB if db_cluster["state"] == "DELETED": self.db.del_one("clusters", {"_id": db_cluster["_id"]}) + + # To delete it from k8scluster collection + self.db.del_one("k8sclusters", {"name": db_cluster["name"]}) + return - async def get_creds(self, op_id, db_cluster): + async def get_creds(self, params, order_id): self.logger.info("Cluster get creds Enter") + cluster_id = params["cluster_id"] + op_id = params["operation_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster) if result: db_cluster["credentials"] = cluster_creds @@ -815,11 +942,18 @@ class ClusterLcm(GitOpsLcm): op_len += 1 db_cluster["current_operation"] = None self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) + self.logger.info("Cluster Get Creds Exit") return - async def update(self, op_id, op_params, content): + async def update(self, params, order_id): self.logger.info("Cluster update Enter") - db_cluster = content["cluster"] + # To get the cluster details + cluster_id = params["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + + # To get the operation params details + op_id = params["operation_id"] + op_params = self.get_operation_params(db_cluster, op_id) db_cluster_copy = self.decrypting_key(db_cluster) @@ -1002,6 +1136,8 @@ class CloudCredentialsLcm(GitOpsLcm): class K8sAppLcm(GitOpsLcm): + db_collection = "k8sapp" + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging @@ -1010,9 +1146,20 @@ class K8sAppLcm(GitOpsLcm): """ super().__init__(msg, lcm_tasks, config) - async def create(self, op_id, op_params, content): + async def create(self, params, order_id): self.logger.info("App Create Enter") + op_id = params["operation_id"] + profile_id = params["profile_id"] + + # To initialize the operation states + self.initialize_operation(profile_id, op_id) + + content = self.db.get_one("k8sapp", {"_id": profile_id}) + content["profile_type"] = "applications" + op_params = self.get_operation_params(content, op_id) + self.db.set_one("k8sapp", {"_id": content["_id"]}, content) + _, workflow_name = await self.odu.launch_workflow( "create_profile", op_id, op_params, content ) @@ -1059,9 +1206,18 @@ class K8sAppLcm(GitOpsLcm): return - async def delete(self, op_id, op_params, content): + async def delete(self, params, order_id): self.logger.info("App delete Enter") + op_id = params["operation_id"] + profile_id = params["profile_id"] + + # To initialize the operation states + self.initialize_operation(profile_id, op_id) + + content = self.db.get_one("k8sapp", {"_id": profile_id}) + op_params = self.get_operation_params(content, op_id) + _, workflow_name = await self.odu.launch_workflow( "delete_profile", op_id, op_params, content ) @@ -1113,6 +1269,8 @@ class K8sAppLcm(GitOpsLcm): class K8sResourceLcm(GitOpsLcm): + db_collection = "k8sresource" + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging @@ -1121,9 +1279,20 @@ class K8sResourceLcm(GitOpsLcm): """ super().__init__(msg, lcm_tasks, config) - async def create(self, op_id, op_params, content): + async def create(self, params, order_id): self.logger.info("Resource Create Enter") + op_id = params["operation_id"] + profile_id = params["profile_id"] + + # To initialize the operation states + self.initialize_operation(profile_id, op_id) + + content = self.db.get_one("k8sresource", {"_id": profile_id}) + content["profile_type"] = "managed-resources" + op_params = self.get_operation_params(content, op_id) + self.db.set_one("k8sresource", {"_id": content["_id"]}, content) + _, workflow_name = await self.odu.launch_workflow( "create_profile", op_id, op_params, content ) @@ -1170,9 +1339,17 @@ class K8sResourceLcm(GitOpsLcm): return - async def delete(self, op_id, op_params, content): + async def delete(self, params, order_id): self.logger.info("Resource delete Enter") - content = self.db.get_one("k8sresource", {"_id": content["_id"]}) + + op_id = params["operation_id"] + profile_id = params["profile_id"] + + # To initialize the operation states + self.initialize_operation(profile_id, op_id) + + content = self.db.get_one("k8sresource", {"_id": profile_id}) + op_params = self.get_operation_params(content, op_id) _, workflow_name = await self.odu.launch_workflow( "delete_profile", op_id, op_params, content @@ -1225,6 +1402,8 @@ class K8sResourceLcm(GitOpsLcm): class K8sInfraControllerLcm(GitOpsLcm): + db_collection = "k8sinfra_controller" + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging @@ -1233,9 +1412,20 @@ class K8sInfraControllerLcm(GitOpsLcm): """ super().__init__(msg, lcm_tasks, config) - async def create(self, op_id, op_params, content): + async def create(self, params, order_id): self.logger.info("Infra controller Create Enter") + op_id = params["operation_id"] + profile_id = params["profile_id"] + + # To initialize the operation states + self.initialize_operation(profile_id, op_id) + + content = self.db.get_one("k8sinfra_controller", {"_id": profile_id}) + content["profile_type"] = "infra-controllers" + op_params = self.get_operation_params(content, op_id) + self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) + _, workflow_name = await self.odu.launch_workflow( "create_profile", op_id, op_params, content ) @@ -1282,9 +1472,18 @@ class K8sInfraControllerLcm(GitOpsLcm): return - async def delete(self, op_id, op_params, content): + async def delete(self, params, order_id): self.logger.info("Infra controller delete Enter") + op_id = params["operation_id"] + profile_id = params["profile_id"] + + # To initialize the operation states + self.initialize_operation(profile_id, op_id) + + content = self.db.get_one("k8sinfra_controller", {"_id": profile_id}) + op_params = self.get_operation_params(content, op_id) + _, workflow_name = await self.odu.launch_workflow( "delete_profile", op_id, op_params, content ) @@ -1336,6 +1535,8 @@ class K8sInfraControllerLcm(GitOpsLcm): class K8sInfraConfigLcm(GitOpsLcm): + db_collection = "k8sinfra_config" + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging @@ -1344,9 +1545,20 @@ class K8sInfraConfigLcm(GitOpsLcm): """ super().__init__(msg, lcm_tasks, config) - async def create(self, op_id, op_params, content): + async def create(self, params, order_id): self.logger.info("Infra config Create Enter") + op_id = params["operation_id"] + profile_id = params["profile_id"] + + # To initialize the operation states + self.initialize_operation(profile_id, op_id) + + content = self.db.get_one("k8sinfra_config", {"_id": profile_id}) + content["profile_type"] = "infra-configs" + op_params = self.get_operation_params(content, op_id) + self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) + _, workflow_name = await self.odu.launch_workflow( "create_profile", op_id, op_params, content ) @@ -1393,9 +1605,18 @@ class K8sInfraConfigLcm(GitOpsLcm): return - async def delete(self, op_id, op_params, content): + async def delete(self, params, order_id): self.logger.info("Infra config delete Enter") + op_id = params["operation_id"] + profile_id = params["profile_id"] + + # To initialize the operation states + self.initialize_operation(profile_id, op_id) + + content = self.db.get_one("k8sinfra_config", {"_id": profile_id}) + op_params = self.get_operation_params(content, op_id) + _, workflow_name = await self.odu.launch_workflow( "delete_profile", op_id, op_params, content ) diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 2e67db69..54f99542 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -758,112 +758,58 @@ class Lcm: elif command == "deleted": return # TODO cleaning of task just in case should be done elif topic == "cluster": - if command != "get_creds": - op_id = params["operation_id"] - cluster_id = params["cluster_id"] - self.cluster.initialize_operation(cluster_id, op_id) - db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) - """ - if command in ( - "create", - "created", - "register", - "registered", - "upgrade", - "scale", - ): - self.db.encrypt_decrypt_fields( - db_cluster, - "decrypt", - ["age_pubkey", "age_privkey"], - schema_version="1.11", - salt=cluster_id, - ) - """ - op_params = self.get_operation_params(db_cluster, op_id) - db_content = { - "cluster": db_cluster, - } + cluster_id = params["cluster_id"] + op_id = params["operation_id"] if command == "create" or command == "created": self.logger.debug("cluster_id = {}".format(cluster_id)) - # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]}) - """ - db_vim = self.db.get_one( - "vim_accounts", {"name": db_cluster["vim_account"]} - ) - db_content["vim_account"] = db_vim - """ - task = asyncio.ensure_future( - self.cluster.create(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.cluster.create(params, order_id)) self.lcm_tasks.register( "cluster", cluster_id, op_id, "cluster_create", task ) return elif command == "delete" or command == "deleted": - task = asyncio.ensure_future( - self.cluster.delete(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.cluster.delete(params, order_id)) self.lcm_tasks.register( "cluster", cluster_id, op_id, "cluster_delete", task ) return elif command == "add" or command == "added": - profile_type = params["profile_type"] - profile_collection = self.profile_collection_mapping[profile_type] - db_profile = self.db.get_one( - profile_collection, {"_id": params["profile_id"]} - ) - db_profile["profile_type"] = profile_type - db_content["profile"] = db_profile task = asyncio.ensure_future( - self.cluster.attach_profile(op_id, op_params, db_content) + self.cluster.attach_profile(params, order_id) ) self.lcm_tasks.register( "cluster", cluster_id, op_id, "profile_add", task ) return elif command == "remove" or command == "removed": - profile_type = params["profile_type"] - profile_collection = self.profile_collection_mapping[profile_type] - db_profile = self.db.get_one( - profile_collection, {"_id": params["profile_id"]} - ) - db_profile["profile_type"] = profile_type - db_content["profile"] = db_profile task = asyncio.ensure_future( - self.cluster.detach_profile(op_id, op_params, db_content) + self.cluster.detach_profile(params, order_id) ) self.lcm_tasks.register( "cluster", cluster_id, op_id, "profile_remove", task ) return elif command == "register" or command == "registered": - task = asyncio.ensure_future( - self.cluster.register(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.cluster.register(params, order_id)) self.lcm_tasks.register( "cluster", cluster_id, op_id, "cluster_register", task ) return elif command == "deregister" or command == "deregistered": - task = asyncio.ensure_future( - self.cluster.deregister(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.cluster.deregister(params, order_id)) self.lcm_tasks.register( "cluster", cluster_id, op_id, "cluster_deregister", task ) return elif command == "get_creds": - cluster_id = params["cluster_id"] - op_id = params["operation_id"] - db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) - task = asyncio.ensure_future(self.cluster.get_creds(op_id, db_cluster)) + task = asyncio.ensure_future(self.cluster.get_creds(params, order_id)) self.lcm_tasks.register( "cluster", cluster_id, cluster_id, "cluster_get_credentials", task ) return elif command == "upgrade" or command == "scale" or command == "update": + cluster_id = params["cluster_id"] + op_id = params["operation_id"] # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]}) """ db_vim = self.db.get_one( @@ -871,9 +817,7 @@ class Lcm: ) db_content["vim_account"] = db_vim """ - task = asyncio.ensure_future( - self.cluster.update(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.cluster.update(params, order_id)) self.lcm_tasks.register( "cluster", cluster_id, op_id, "cluster_update", task ) @@ -881,24 +825,16 @@ class Lcm: elif topic == "k8s_app": op_id = params["operation_id"] profile_id = params["profile_id"] - self.k8s_app.initialize_operation(profile_id, op_id) - db_profile = self.db.get_one("k8sapp", {"_id": profile_id}) - db_profile["profile_type"] = "applications" - op_params = self.get_operation_params(db_profile, op_id) if command == "profile_create" or command == "profile_created": self.logger.debug("Create k8s_app_id = {}".format(profile_id)) - task = asyncio.ensure_future( - self.k8s_app.create(op_id, op_params, db_profile) - ) + task = asyncio.ensure_future(self.k8s_app.create(params, order_id)) self.lcm_tasks.register( "k8s_app", profile_id, op_id, "k8s_app_create", task ) return elif command == "delete" or command == "deleted": self.logger.debug("Delete k8s_app_id = {}".format(profile_id)) - task = asyncio.ensure_future( - self.k8s_app.delete(op_id, op_params, db_profile) - ) + task = asyncio.ensure_future(self.k8s_app.delete(params, order_id)) self.lcm_tasks.register( "k8s_app", profile_id, op_id, "k8s_app_delete", task ) @@ -906,15 +842,9 @@ class Lcm: elif topic == "k8s_resource": op_id = params["operation_id"] profile_id = params["profile_id"] - self.k8s_resource.initialize_operation(profile_id, op_id) - db_profile = self.db.get_one("k8sresource", {"_id": profile_id}) - db_profile["profile_type"] = "managed-resources" - op_params = self.get_operation_params(db_profile, op_id) if command == "profile_create" or command == "profile_created": self.logger.debug("Create k8s_resource_id = {}".format(profile_id)) - task = asyncio.ensure_future( - self.k8s_resource.create(op_id, op_params, db_profile) - ) + task = asyncio.ensure_future(self.k8s_resource.create(params, order_id)) self.lcm_tasks.register( "k8s_resource", profile_id, @@ -925,9 +855,7 @@ class Lcm: return elif command == "delete" or command == "deleted": self.logger.debug("Delete k8s_resource_id = {}".format(profile_id)) - task = asyncio.ensure_future( - self.k8s_resource.delete(op_id, op_params, db_profile) - ) + task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id)) self.lcm_tasks.register( "k8s_resource", profile_id, @@ -940,16 +868,12 @@ class Lcm: elif topic == "k8s_infra_controller": op_id = params["operation_id"] profile_id = params["profile_id"] - self.k8s_infra_controller.initialize_operation(profile_id, op_id) - db_profile = self.db.get_one("k8sinfra_controller", {"_id": profile_id}) - db_profile["profile_type"] = "infra-controllers" - op_params = self.get_operation_params(db_profile, op_id) if command == "profile_create" or command == "profile_created": self.logger.debug( "Create k8s_infra_controller_id = {}".format(profile_id) ) task = asyncio.ensure_future( - self.k8s_infra_controller.create(op_id, op_params, db_profile) + self.k8s_infra_controller.create(params, order_id) ) self.lcm_tasks.register( "k8s_infra_controller", @@ -964,7 +888,7 @@ class Lcm: "Delete k8s_infra_controller_id = {}".format(profile_id) ) task = asyncio.ensure_future( - self.k8s_infra_controller.delete(op_id, op_params, db_profile) + self.k8s_infra_controller.delete(params, order_id) ) self.lcm_tasks.register( "k8s_infra_controller", @@ -978,14 +902,10 @@ class Lcm: elif topic == "k8s_infra_config": op_id = params["operation_id"] profile_id = params["profile_id"] - self.k8s_infra_config.initialize_operation(profile_id, op_id) - db_profile = self.db.get_one("k8sinfra_config", {"_id": profile_id}) - db_profile["profile_type"] = "infra-configs" - op_params = self.get_operation_params(db_profile, op_id) if command == "profile_create" or command == "profile_created": self.logger.debug("Create k8s_infra_config_id = {}".format(profile_id)) task = asyncio.ensure_future( - self.k8s_infra_config.create(op_id, op_params, db_profile) + self.k8s_infra_config.create(params, order_id) ) self.lcm_tasks.register( "k8s_infra_config", @@ -998,7 +918,7 @@ class Lcm: elif command == "delete" or command == "deleted": self.logger.debug("Delete k8s_infra_config_id = {}".format(profile_id)) task = asyncio.ensure_future( - self.k8s_infra_config.delete(op_id, op_params, db_profile) + self.k8s_infra_config.delete(params, order_id) ) self.lcm_tasks.register( "k8s_infra_config", -- 2.25.1