From: yshah Date: Fri, 29 Nov 2024 07:33:32 +0000 (+0000) Subject: Refactoring LCM code for vim_account, ksu and oka X-Git-Tag: v17.0.0~19 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=564ec9c17c6bd005db243c65259f008df63db1ca;p=osm%2FLCM.git Refactoring LCM code for vim_account, ksu and oka Change-Id: I48215a8dc7933d9cc710acb01ed6706bacfbd5a9 Signed-off-by: yshah --- diff --git a/osm_lcm/k8s.py b/osm_lcm/k8s.py index 892f2203..37e638da 100644 --- a/osm_lcm/k8s.py +++ b/osm_lcm/k8s.py @@ -31,6 +31,15 @@ from osm_lcm.data_utils.list_utils import find_in_list class GitOpsLcm(LcmBase): db_collection = "gitops" + workflow_status = None + resource_status = None + + profile_collection_mapping = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "resource_profiles": "k8sresource", + "app_profiles": "k8sapp", + } profile_collection_mapping = { "infra_controller_profiles": "k8sinfra_controller", @@ -65,6 +74,20 @@ class GitOpsLcm(LcmBase): db_item["current_operation"] = operation["op_id"] self.db.set_one(self.db_collection, {"_id": item_id}, db_item) + def get_operation_params(self, item, operation_id): + operation_history = item.get("operationHistory", []) + operation = find_in_list( + operation_history, lambda op: op["op_id"] == operation_id + ) + return operation.get("operationParams", {}) + + def get_operation_type(self, item, operation_id): + operation_history = item.get("operationHistory", []) + operation = find_in_list( + operation_history, lambda op: op["op_id"] == operation_id + ) + return operation.get("operationType", {}) + def update_operation_history( self, content, op_id, workflow_status=None, resource_status=None ): @@ -103,6 +126,60 @@ class GitOpsLcm(LcmBase): return content + async def check_workflow(self, op_id, workflow_name, db_content): + workflow_status, workflow_msg = await self.odu.check_workflow_status( + workflow_name + ) + self.logger.info( + "Workflow Status: {} Workflow Message: {}".format( + workflow_status, workflow_msg + ) + ) + operation_type = self.get_operation_type(db_content, op_id) + if operation_type == "create" and workflow_status: + db_content["state"] = "CREATED" + elif operation_type == "create" and not workflow_status: + db_content["state"] = "FAILED_CREATION" + elif operation_type == "delete" and workflow_status: + db_content["state"] = "DELETED" + elif operation_type == "delete" and not workflow_status: + db_content["state"] = "FAILED_DELETION" + + if workflow_status: + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, op_id, workflow_status, None + ) + self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + return workflow_status + + async def check_resource(self, resource_name, op_id, op_params, db_content): + workflow_status = True + + resource_status, resource_msg = await self.check_resource_status( + resource_name, op_id, op_params, db_content + ) + self.logger.info( + "Resource Status: {} Resource Message: {}".format( + resource_status, resource_msg + ) + ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, op_id, workflow_status, resource_status + ) + db_content["operatingState"] = "IDLE" + db_content["current_operation"] = None + return resource_status, db_content + async def common_check_list(self, op_id, checkings_list, db_collection, db_item): try: for checking in checkings_list: @@ -158,13 +235,6 @@ class GitOpsLcm(LcmBase): } return db_cluster_copy - def get_operation_params(self, item, operation_id): - operation_history = item.get("operationHistory", []) - operation = find_in_list( - operation_history, lambda op: op["op_id"] == operation_id - ) - return operation.get("operationParams", {}) - class ClusterLcm(GitOpsLcm): db_collection = "clusters" @@ -1039,10 +1109,23 @@ class CloudCredentialsLcm(GitOpsLcm): """ super().__init__(msg, lcm_tasks, config) - async def add(self, op_id, op_params, content): + async def add(self, params, order_id): self.logger.info("Cloud Credentials create") + vim_id = params["_id"] + op_id = vim_id + op_params = params + db_content = self.db.get_one(self.db_collection, {"_id": vim_id}) + vim_config = db_content.get("config", {}) + self.db.encrypt_decrypt_fields( + vim_config.get("credentials"), + "decrypt", + ["password", "secret"], + schema_version=db_content["schema_version"], + salt=vim_id, + ) + _, workflow_name = await self.odu.launch_workflow( - "create_cloud_credentials", op_id, op_params, content + "create_cloud_credentials", op_id, op_params, db_content ) workflow_status, workflow_msg = await self.odu.check_workflow_status( @@ -1055,7 +1138,7 @@ class CloudCredentialsLcm(GitOpsLcm): # Clean items used in the workflow, no matter if the workflow succeeded clean_status, clean_msg = await self.odu.clean_items_workflow( - "create_cloud_credentials", op_id, op_params, content + "create_cloud_credentials", op_id, op_params, db_content ) self.logger.info( f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" @@ -1063,7 +1146,7 @@ class CloudCredentialsLcm(GitOpsLcm): if workflow_status: resource_status, resource_msg = await self.check_resource_status( - "create_cloud_credentials", op_id, op_params, content + "create_cloud_credentials", op_id, op_params, db_content ) self.logger.info( "Resource Status: {} Resource Message: {}".format( @@ -1071,18 +1154,31 @@ class CloudCredentialsLcm(GitOpsLcm): ) ) - content["_admin"]["operationalState"] = "ENABLED" - for operation in content["_admin"]["operations"]: + db_content["_admin"]["operationalState"] = "ENABLED" + for operation in db_content["_admin"]["operations"]: if operation["lcmOperationType"] == "create": operation["operationState"] = "ENABLED" - self.logger.info("Content : {}".format(content)) - self.db.set_one("vim_accounts", {"_id": content["_id"]}, content) - + self.logger.info("Content : {}".format(db_content)) + self.db.set_one("vim_accounts", {"_id": db_content["_id"]}, db_content) return - async def edit(self, op_id, op_params, content): + async def edit(self, params, order_id): + self.logger.info("Cloud Credentials Update") + vim_id = params["_id"] + op_id = vim_id + op_params = params + db_content = self.db.get_one("vim_accounts", {"_id": vim_id}) + vim_config = db_content.get("config", {}) + self.db.encrypt_decrypt_fields( + vim_config.get("credentials"), + "decrypt", + ["password", "secret"], + schema_version=db_content["schema_version"], + salt=vim_id, + ) + _, workflow_name = await self.odu.launch_workflow( - "update_cloud_credentials", op_id, op_params, content + "update_cloud_credentials", op_id, op_params, db_content ) workflow_status, workflow_msg = await self.odu.check_workflow_status( workflow_name @@ -1093,7 +1189,7 @@ class CloudCredentialsLcm(GitOpsLcm): # Clean items used in the workflow, no matter if the workflow succeeded clean_status, clean_msg = await self.odu.clean_items_workflow( - "update_cloud_credentials", op_id, op_params, content + "update_cloud_credentials", op_id, op_params, db_content ) self.logger.info( f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" @@ -1101,7 +1197,7 @@ class CloudCredentialsLcm(GitOpsLcm): if workflow_status: resource_status, resource_msg = await self.check_resource_status( - "update_cloud_credentials", op_id, op_params, content + "update_cloud_credentials", op_id, op_params, db_content ) self.logger.info( "Resource Status: {} Resource Message: {}".format( @@ -1110,10 +1206,15 @@ class CloudCredentialsLcm(GitOpsLcm): ) return - async def remove(self, op_id, op_params, content): - self.logger.info("Cloud Credentials delete") + async def remove(self, params, order_id): + self.logger.info("Cloud Credentials remove") + vim_id = params["_id"] + op_id = vim_id + op_params = params + db_content = self.db.get_one("vim_accounts", {"_id": vim_id}) + _, workflow_name = await self.odu.launch_workflow( - "delete_cloud_credentials", op_id, op_params, content + "delete_cloud_credentials", op_id, op_params, db_content ) workflow_status, workflow_msg = await self.odu.check_workflow_status( workflow_name @@ -1124,14 +1225,14 @@ class CloudCredentialsLcm(GitOpsLcm): if workflow_status: resource_status, resource_msg = await self.check_resource_status( - "delete_cloud_credentials", op_id, op_params, content + "delete_cloud_credentials", op_id, op_params, db_content ) self.logger.info( "Resource Status: {} Resource Message: {}".format( resource_status, resource_msg ) ) - self.db.del_one(self.db_collection, {"_id": content["_id"]}) + self.db.del_one(self.db_collection, {"_id": db_content["_id"]}) return @@ -1165,45 +1266,14 @@ class K8sAppLcm(GitOpsLcm): ) self.logger.info("workflow_name is :{}".format(workflow_name)) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "workflow_status is :{} and workflow_msg is :{}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - content["state"] = "CREATED" - content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - content["state"] = "FAILED_CREATION" - content["resourceState"] = "ERROR" - # has to call update_operation_history return content - content = self.update_operation_history(content, op_id, workflow_status, None) - self.db.set_one("k8sapp", {"_id": content["_id"]}, content) + workflow_status = await self.check_workflow(op_id, workflow_name, content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, content = await self.check_resource( "create_profile", op_id, op_params, content ) - self.logger.info( - "resource_status is :{} and resource_msg is :{}".format( - resource_status, resource_msg - ) - ) - if resource_status: - content["resourceState"] = "READY" - else: - content["resourceState"] = "ERROR" - - content["operatingState"] = "IDLE" - content = self.update_operation_history( - content, op_id, workflow_status, resource_status - ) - content["current_operation"] = None - self.db.set_one("k8sapp", {"_id": content["_id"]}, content) - + self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) + self.logger.info(f"App Create Exit with resource status: {resource_status}") return async def delete(self, params, order_id): @@ -1223,48 +1293,18 @@ class K8sAppLcm(GitOpsLcm): ) self.logger.info("workflow_name is :{}".format(workflow_name)) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "workflow_status is :{} and workflow_msg is :{}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - content["state"] = "DELETED" - content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - content["state"] = "FAILED_DELETION" - content["resourceState"] = "ERROR" - # has to call update_operation_history return content - content = self.update_operation_history(content, op_id, workflow_status, None) - self.db.set_one("k8sapp", {"_id": content["_id"]}, content) + workflow_status = await self.check_workflow(op_id, workflow_name, content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, content = await self.check_resource( "delete_profile", op_id, op_params, content ) - self.logger.info( - "resource_status is :{} and resource_msg is :{}".format( - resource_status, resource_msg - ) - ) - if resource_status: - content["resourceState"] = "READY" - else: - content["resourceState"] = "ERROR" - - content["operatingState"] = "IDLE" - content = self.update_operation_history( - content, op_id, workflow_status, resource_status - ) - content["current_operation"] = None - self.db.set_one("k8sapp", {"_id": content["_id"]}, content) - # To delete it from DB - if content["state"] == "DELETED": - self.db.del_one("k8sapp", {"_id": content["_id"]}) + if resource_status: + content["state"] = "DELETED" + self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) + self.db.del_one(self.db_collection, {"_id": content["_id"]}) + self.logger.info(f"App Delete Exit with resource status: {resource_status}") return @@ -1298,45 +1338,16 @@ class K8sResourceLcm(GitOpsLcm): ) self.logger.info("workflow_name is :{}".format(workflow_name)) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "workflow_status is :{} and workflow_msg is :{}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - content["state"] = "CREATED" - content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - content["state"] = "FAILED_CREATION" - content["resourceState"] = "ERROR" - # has to call update_operation_history return content - content = self.update_operation_history(content, op_id, workflow_status, None) - self.db.set_one("k8sresource", {"_id": content["_id"]}, content) + workflow_status = await self.check_workflow(op_id, workflow_name, content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, content = await self.check_resource( "create_profile", op_id, op_params, content ) - self.logger.info( - "resource_status is :{} and resource_msg is :{}".format( - resource_status, resource_msg - ) - ) - if resource_status: - content["resourceState"] = "READY" - else: - content["resourceState"] = "ERROR" - - content["operatingState"] = "IDLE" - content = self.update_operation_history( - content, op_id, workflow_status, resource_status + self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) + self.logger.info( + f"Resource Create Exit with resource status: {resource_status}" ) - content["current_operation"] = None - self.db.set_one("k8sresource", {"_id": content["_id"]}, content) - return async def delete(self, params, order_id): @@ -1356,48 +1367,20 @@ class K8sResourceLcm(GitOpsLcm): ) self.logger.info("workflow_name is :{}".format(workflow_name)) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "workflow_status is :{} and workflow_msg is :{}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - content["state"] = "DELETED" - content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - content["state"] = "FAILED_DELETION" - content["resourceState"] = "ERROR" - # has to call update_operation_history return content - content = self.update_operation_history(content, op_id, workflow_status, None) - self.db.set_one("k8sresource", {"_id": content["_id"]}, content) + workflow_status = await self.check_workflow(op_id, workflow_name, content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, content = await self.check_resource( "delete_profile", op_id, op_params, content ) - self.logger.info( - "resource_status is :{} and resource_msg is :{}".format( - resource_status, resource_msg - ) - ) - if resource_status: - content["resourceState"] = "READY" - else: - content["resourceState"] = "ERROR" - content["operatingState"] = "IDLE" - content = self.update_operation_history( - content, op_id, workflow_status, resource_status + if resource_status: + content["state"] = "DELETED" + self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) + self.db.del_one(self.db_collection, {"_id": content["_id"]}) + self.logger.info( + f"Resource Delete Exit with resource status: {resource_status}" ) - content["current_operation"] = None - self.db.set_one("k8sresource", {"_id": content["_id"]}, content) - - # To delete it from DB - if content["state"] == "DELETED": - self.db.del_one("k8sresource", {"_id": content["_id"]}) return @@ -1431,45 +1414,16 @@ class K8sInfraControllerLcm(GitOpsLcm): ) self.logger.info("workflow_name is :{}".format(workflow_name)) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "workflow_status is :{} and workflow_msg is :{}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - content["state"] = "CREATED" - content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - content["state"] = "FAILED_CREATION" - content["resourceState"] = "ERROR" - # has to call update_operation_history return content - content = self.update_operation_history(content, op_id, workflow_status, None) - self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) + workflow_status = await self.check_workflow(op_id, workflow_name, content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, content = await self.check_resource( "create_profile", op_id, op_params, content ) - self.logger.info( - "resource_status is :{} and resource_msg is :{}".format( - resource_status, resource_msg - ) - ) - if resource_status: - content["resourceState"] = "READY" - else: - content["resourceState"] = "ERROR" - - content["operatingState"] = "IDLE" - content = self.update_operation_history( - content, op_id, workflow_status, resource_status + self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) + self.logger.info( + f"Infra Controller Create Exit with resource status: {resource_status}" ) - content["current_operation"] = None - self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) - return async def delete(self, params, order_id): @@ -1489,48 +1443,20 @@ class K8sInfraControllerLcm(GitOpsLcm): ) self.logger.info("workflow_name is :{}".format(workflow_name)) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "workflow_status is :{} and workflow_msg is :{}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - content["state"] = "DELETED" - content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - content["state"] = "FAILED_DELETION" - content["resourceState"] = "ERROR" - # has to call update_operation_history return content - content = self.update_operation_history(content, op_id, workflow_status, None) - self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) + workflow_status = await self.check_workflow(op_id, workflow_name, content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, content = await self.check_resource( "delete_profile", op_id, op_params, content ) - self.logger.info( - "resource_status is :{} and resource_msg is :{}".format( - resource_status, resource_msg - ) - ) - if resource_status: - content["resourceState"] = "READY" - else: - content["resourceState"] = "ERROR" - content["operatingState"] = "IDLE" - content = self.update_operation_history( - content, op_id, workflow_status, resource_status + if resource_status: + content["state"] = "DELETED" + self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) + self.db.del_one(self.db_collection, {"_id": content["_id"]}) + self.logger.info( + f"Infra Controller Delete Exit with resource status: {resource_status}" ) - content["current_operation"] = None - self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) - - # To delete it from DB - if content["state"] == "DELETED": - self.db.del_one("k8sinfra_controller", {"_id": content["_id"]}) return @@ -1564,45 +1490,16 @@ class K8sInfraConfigLcm(GitOpsLcm): ) self.logger.info("workflow_name is :{}".format(workflow_name)) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "workflow_status is :{} and workflow_msg is :{}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - content["state"] = "CREATED" - content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - content["state"] = "FAILED_CREATION" - content["resourceState"] = "ERROR" - # has to call update_operation_history return content - content = self.update_operation_history(content, op_id, workflow_status, None) - self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) + workflow_status = await self.check_workflow(op_id, workflow_name, content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, content = await self.check_resource( "create_profile", op_id, op_params, content ) - self.logger.info( - "resource_status is :{} and resource_msg is :{}".format( - resource_status, resource_msg - ) - ) - if resource_status: - content["resourceState"] = "READY" - else: - content["resourceState"] = "ERROR" - - content["operatingState"] = "IDLE" - content = self.update_operation_history( - content, op_id, workflow_status, resource_status + self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) + self.logger.info( + f"Infra Config Create Exit with resource status: {resource_status}" ) - content["current_operation"] = None - self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) - return async def delete(self, params, order_id): @@ -1622,47 +1519,21 @@ class K8sInfraConfigLcm(GitOpsLcm): ) self.logger.info("workflow_name is :{}".format(workflow_name)) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "workflow_status is :{} and workflow_msg is :{}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - content["state"] = "DELETED" - content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - content["state"] = "FAILED_DELETION" - content["resourceState"] = "ERROR" - # has to call update_operation_history return content - content = self.update_operation_history(content, op_id, workflow_status, None) - self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) + workflow_status = await self.check_workflow(op_id, workflow_name, content) - resource_status, resource_msg = await self.check_resource_status( - "delete_profile", op_id, op_params, content - ) - self.logger.info( - "resource_status is :{} and resource_msg is :{}".format( - resource_status, resource_msg + if workflow_status: + resource_status, content = await self.check_resource( + "delete_profile", op_id, op_params, content ) - ) - if resource_status: - content["resourceState"] = "READY" - else: - content["resourceState"] = "ERROR" - content["operatingState"] = "IDLE" - content = self.update_operation_history( - content, op_id, workflow_status, resource_status + if resource_status: + content["state"] = "DELETED" + self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) + self.db.del_one(self.db_collection, {"_id": content["_id"]}) + self.logger.info( + f"Infra Config Delete Exit with resource status: {resource_status}" ) - content["current_operation"] = None - self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) - # To delete it from DB - if content["state"] == "DELETED": - self.db.del_one("k8sinfra_config", {"_id": content["_id"]}) return @@ -1677,169 +1548,83 @@ class OkaLcm(GitOpsLcm): """ super().__init__(msg, lcm_tasks, config) - async def create(self, op_id, op_params, content): + async def create(self, params, order_id): self.logger.info("OKA Create Enter") - db_content = content + op_id = params["operation_id"] + oka_id = params["oka_id"] + self.initialize_operation(oka_id, op_id) + db_content = self.db.get_one(self.db_collection, {"_id": oka_id}) + op_params = self.get_operation_params(db_content, op_id) _, workflow_name = await self.odu.launch_workflow( "create_oka", op_id, op_params, db_content ) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - db_content["state"] = "CREATED" - db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_content["state"] = "FAILED_CREATION" - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, None - ) - self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + workflow_status = await self.check_workflow(op_id, workflow_name, db_content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, db_content = await self.check_resource( "create_oka", op_id, op_params, db_content ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - - if resource_status: - db_content["resourceState"] = "READY" - else: - db_content["resourceState"] = "ERROR" - - # self.logger.info("Db content: {}".format(db_content)) - db_content = self.update_operation_history( - db_content, op_id, workflow_status, resource_status - ) - - db_content["operatingState"] = "IDLE" - db_content["current_operation"] = None self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) - + self.logger.info(f"OKA Create Exit with resource status: {resource_status}") return - async def edit(self, op_id, op_params, content): + async def edit(self, params, order_id): self.logger.info("OKA Edit Enter") - db_content = content + op_id = params["operation_id"] + oka_id = params["oka_id"] + self.initialize_operation(oka_id, op_id) + db_content = self.db.get_one(self.db_collection, {"_id": oka_id}) + op_params = self.get_operation_params(db_content, op_id) _, workflow_name = await self.odu.launch_workflow( - "update_oka", op_id, op_params, content - ) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) + "update_oka", op_id, op_params, db_content ) + workflow_status = await self.check_workflow(op_id, workflow_name, db_content) if workflow_status: - db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, None - ) - # self.logger.info("Db content: {}".format(db_content)) - self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) - - if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, db_content = await self.check_resource( "update_oka", op_id, op_params, db_content ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - - if resource_status: - db_content["resourceState"] = "READY" - else: - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, resource_status - ) - - db_content["operatingState"] = "IDLE" - db_content["current_operation"] = None self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + self.logger.info(f"OKA Update Exit with resource status: {resource_status}") return - async def delete(self, op_id, op_params, content): + async def delete(self, params, order_id): self.logger.info("OKA delete Enter") - db_content = content + op_id = params["operation_id"] + oka_id = params["oka_id"] + self.initialize_operation(oka_id, op_id) + db_content = self.db.get_one(self.db_collection, {"_id": oka_id}) + op_params = self.get_operation_params(db_content, op_id) _, workflow_name = await self.odu.launch_workflow( - "delete_oka", op_id, op_params, content - ) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) - ) - - if workflow_status: - db_content["state"] = "DELETED" - db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_content["state"] = "FAILED_DELETION" - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, None + "delete_oka", op_id, op_params, db_content ) - self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + workflow_status = await self.check_workflow(op_id, workflow_name, db_content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, db_content = await self.check_resource( "delete_oka", op_id, op_params, db_content ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - if resource_status: - db_content["resourceState"] = "READY" - else: - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, resource_status - ) - - db_content["operatingState"] = "IDLE" - db_content["current_operation"] = None - self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) - - if db_content["state"] == "DELETED": + if resource_status: + db_content["state"] == "DELETED" + self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) self.db.del_one(self.db_collection, {"_id": db_content["_id"]}) + self.logger.info(f"OKA Delete Exit with resource status: {resource_status}") return class KsuLcm(GitOpsLcm): db_collection = "ksus" + profile_collection_mapping = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "resource_profiles": "k8sresource", + "app_profiles": "k8sapp", + } def __init__(self, msg, lcm_tasks, config): """ @@ -1849,121 +1634,95 @@ class KsuLcm(GitOpsLcm): """ super().__init__(msg, lcm_tasks, config) - async def create(self, op_id, op_params, content): + async def create(self, params, order_id): self.logger.info("ksu Create Enter") + db_content = [] + op_params = [] + op_id = params["operation_id"] + for ksu_id in params["ksus_list"]: + self.logger.info("Ksu ID: {}".format(ksu_id)) + self.initialize_operation(ksu_id, op_id) + db_ksu = self.db.get_one(self.db_collection, {"_id": ksu_id}) + self.logger.info("Db KSU: {}".format(db_ksu)) + db_content.append(db_ksu) + ksu_params = {} + ksu_params = self.get_operation_params(db_ksu, op_id) + self.logger.info("Operation Params: {}".format(ksu_params)) + # Update ksu_params["profile"] with profile name and age-pubkey + profile_type = ksu_params["profile"]["profile_type"] + profile_id = ksu_params["profile"]["_id"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one(profile_collection, {"_id": profile_id}) + ksu_params["profile"]["name"] = db_profile["name"] + ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "") + # Update ksu_params["oka"] with sw_catalog_path (when missing) + for oka in ksu_params["oka"]: + if "sw_catalog_path" not in oka: + oka_id = oka["_id"] + db_oka = self.db.get_one("okas", {"_id": oka_id}) + oka["sw_catalog_path"] = f"infra-controllers/{db_oka['git_name']}" + op_params.append(ksu_params) _, workflow_name = await self.odu.launch_workflow( - "create_ksus", op_id, op_params, content - ) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) + "create_ksus", op_id, op_params, db_content ) + for db_ksu, ksu_params in zip(db_content, op_params): + workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu) - for db_ksu in content: if workflow_status: - db_ksu["state"] = "CREATED" - db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_ksu["state"] = "FAILED_CREATION" - db_ksu["resourceState"] = "ERROR" + resource_status, db_ksu = await self.check_resource( + "create_ksus", op_id, ksu_params, db_ksu + ) - db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None) self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) # Clean items used in the workflow, no matter if the workflow succeeded clean_status, clean_msg = await self.odu.clean_items_workflow( - "create_ksus", op_id, op_params, content + "create_ksus", op_id, op_params, db_content ) self.logger.info( f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" ) - - if workflow_status: - resource_status, resource_msg = await self.check_resource_status( - "create_ksus", op_id, op_params, content - ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - - for db_ksu in content: - if resource_status: - db_ksu["resourceState"] = "READY" - else: - db_ksu["resourceState"] = "ERROR" - - db_ksu = self.update_operation_history( - db_ksu, op_id, workflow_status, resource_status - ) - - for db_ksu in content: - db_ksu["operatingState"] = "IDLE" - db_ksu["current_operation"] = None - self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) - + self.logger.info(f"KSU Create EXIT with Resource Status {resource_status}") return - async def edit(self, op_id, op_params, content): + async def edit(self, params, order_id): self.logger.info("ksu edit Enter") + db_content = [] + op_params = [] + op_id = params["operation_id"] + for ksu_id in params["ksus_list"]: + self.initialize_operation(ksu_id, op_id) + db_ksu = self.db.get_one("ksus", {"_id": ksu_id}) + db_content.append(db_ksu) + ksu_params = {} + ksu_params = self.get_operation_params(db_ksu, op_id) + # Update ksu_params["profile"] with profile name and age-pubkey + profile_type = ksu_params["profile"]["profile_type"] + profile_id = ksu_params["profile"]["_id"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one(profile_collection, {"_id": profile_id}) + ksu_params["profile"]["name"] = db_profile["name"] + ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "") + # Update ksu_params["oka"] with sw_catalog_path (when missing) + for oka in ksu_params["oka"]: + if "sw_catalog_path" not in oka: + oka_id = oka["_id"] + db_oka = self.db.get_one("okas", {"_id": oka_id}) + oka["sw_catalog_path"] = f"infra-controllers/{db_oka['git_name']}" + op_params.append(ksu_params) _, workflow_name = await self.odu.launch_workflow( - "update_ksus", op_id, op_params, content + "update_ksus", op_id, op_params, db_content ) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) - ) - - for db_ksu in content: - if workflow_status: - db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_ksu["resourceState"] = "ERROR" - db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None) - self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) + for db_ksu, ksu_params in zip(db_content, op_params): + workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu) - # Clean items used in the workflow, no matter if the workflow succeeded - clean_status, clean_msg = await self.odu.clean_items_workflow( - "create_ksus", op_id, op_params, content - ) - self.logger.info( - f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" - ) - if workflow_status: - resource_status, resource_msg = await self.check_resource_status( - "update_ksus", op_id, op_params, content - ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - - for db_ksu in content: - if resource_status: - db_ksu["resourceState"] = "READY" - else: - db_ksu["resourceState"] = "ERROR" - - db_ksu = self.update_operation_history( - db_ksu, op_id, workflow_status, resource_status - ) - - for db_ksu, ksu_params in zip(content, op_params): - db_ksu["operatingState"] = "IDLE" if workflow_status: + resource_status, db_ksu = await self.check_resource( + "update_ksus", op_id, ksu_params, db_ksu + ) db_ksu["name"] = ksu_params["name"] db_ksu["description"] = ksu_params["description"] db_ksu["profile"]["profile_type"] = ksu_params["profile"][ @@ -1971,159 +1730,100 @@ class KsuLcm(GitOpsLcm): ] db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"] db_ksu["oka"] = ksu_params["oka"] - db_ksu["current_operation"] = None self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) + # Clean items used in the workflow, no matter if the workflow succeeded + clean_status, clean_msg = await self.odu.clean_items_workflow( + "create_ksus", op_id, op_params, db_content + ) + self.logger.info( + f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" + ) + self.logger.info(f"KSU Update EXIT with Resource Status {resource_status}") return - async def delete(self, op_id, op_params, content): + async def delete(self, params, order_id): self.logger.info("ksu delete Enter") + db_content = [] + op_params = [] + op_id = params["operation_id"] + for ksu_id in params["ksus_list"]: + self.initialize_operation(ksu_id, op_id) + db_ksu = self.db.get_one("ksus", {"_id": ksu_id}) + db_content.append(db_ksu) + ksu_params = {} + ksu_params["profile"] = {} + ksu_params["profile"]["profile_type"] = db_ksu["profile"]["profile_type"] + ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"] + # Update ksu_params["profile"] with profile name and age-pubkey + profile_type = ksu_params["profile"]["profile_type"] + profile_id = ksu_params["profile"]["_id"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one(profile_collection, {"_id": profile_id}) + ksu_params["profile"]["name"] = db_profile["name"] + ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "") + op_params.append(ksu_params) _, workflow_name = await self.odu.launch_workflow( - "delete_ksus", op_id, op_params, content - ) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) + "delete_ksus", op_id, op_params, db_content ) - for db_ksu in content: - if workflow_status: - db_ksu["state"] = "DELETED" - db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_ksu["state"] = "FAILED_DELETION" - db_ksu["resourceState"] = "ERROR" + for db_ksu, ksu_params in zip(db_content, op_params): + workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu) - db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None) - self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) - - if workflow_status: - resource_status, resource_msg = await self.check_resource_status( - "delete_ksus", op_id, op_params, content - ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - - for db_ksu in content: - if resource_status: - db_ksu["resourceState"] = "READY" - else: - db_ksu["resourceState"] = "ERROR" - - db_ksu = self.update_operation_history( - db_ksu, op_id, workflow_status, resource_status + if workflow_status: + resource_status, db_ksu = await self.check_resource( + "delete_ksus", op_id, ksu_params, db_ksu ) - for db_ksu in content: - db_ksu["operatingState"] = "IDLE" - db_ksu["current_operation"] = None - self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) - - if db_ksu["state"] == "DELETED": + if resource_status: + db_ksu["state"] == "DELETED" + self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]}) + + self.logger.info(f"KSU Delete Exit with resource status: {resource_status}") return - async def clone(self, op_id, op_params, db_content): + async def clone(self, params, order_id): self.logger.info("ksu clone Enter") - + op_id = params["operation_id"] + ksus_id = params["ksus_list"][0] + self.initialize_operation(ksus_id, op_id) + db_content = self.db.get_one(self.db_collection, {"_id": ksus_id}) + op_params = self.get_operation_params(db_content, op_id) _, workflow_name = await self.odu.launch_workflow( "clone_ksus", op_id, op_params, db_content ) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) - ) - if workflow_status: - db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, None - ) - self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + workflow_status = await self.check_workflow(op_id, workflow_name, db_content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, db_content = await self.check_resource( "clone_ksus", op_id, op_params, db_content ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - - if resource_status: - db_content["resourceState"] = "READY" - else: - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, resource_status - ) - - db_content["operatingState"] = "IDLE" - db_content["current_operation"] = None self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + + self.logger.info(f"KSU Clone Exit with resource status: {resource_status}") return - async def move(self, op_id, op_params, db_content): + async def move(self, params, order_id): self.logger.info("ksu move Enter") - + op_id = params["operation_id"] + ksus_id = params["ksus_list"][0] + self.initialize_operation(ksus_id, op_id) + db_content = self.db.get_one(self.db_collection, {"_id": ksus_id}) + op_params = self.get_operation_params(db_content, op_id) _, workflow_name = await self.odu.launch_workflow( "move_ksus", op_id, op_params, db_content ) - workflow_status, workflow_msg = await self.odu.check_workflow_status( - workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) - ) - - if workflow_status: - db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_content["resourceState"] = "ERROR" - db_content = self.update_operation_history( - db_content, op_id, workflow_status, None - ) - self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + workflow_status = await self.check_workflow(op_id, workflow_name, db_content) if workflow_status: - resource_status, resource_msg = await self.check_resource_status( + resource_status, db_content = await self.check_resource( "move_ksus", op_id, op_params, db_content ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - if resource_status: - db_content["resourceState"] = "READY" - else: - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, resource_status - ) - - db_content["operatingState"] = "IDLE" - db_content["current_operation"] = None self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + + self.logger.info(f"KSU Move Exit with resource status: {resource_status}") return diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 54f99542..7d12e37d 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -644,24 +644,14 @@ class Lcm: elif topic == "vim_account": vim_id = params["_id"] op_id = vim_id - op_params = params db_vim = self.db.get_one("vim_accounts", {"_id": vim_id}) vim_config = db_vim.get("config", {}) - if command in ("create", "created", "edit", "edited"): - self.db.encrypt_decrypt_fields( - vim_config.get("credentials"), - "decrypt", - ["password", "secret"], - schema_version=db_vim["schema_version"], - salt=vim_id, - ) - self.logger.debug("Db Vim: {}".format(db_vim)) if command in ("create", "created"): self.logger.debug("Main config: {}".format(self.main_config.to_dict())) if "credentials" or "credentials_base64" in vim_config: self.logger.info("Vim add cloud credentials") task = asyncio.ensure_future( - self.cloud_credentials.add(op_id, op_params, db_vim) + self.cloud_credentials.add(params, order_id) ) self.lcm_tasks.register( "vim_account", vim_id, op_id, "cloud_credentials_add", task @@ -678,7 +668,7 @@ class Lcm: if "credentials" in vim_config: self.logger.info("Vim remove cloud credentials") task = asyncio.ensure_future( - self.cloud_credentials.remove(op_id, op_params, db_vim) + self.cloud_credentials.remove(params, order_id) ) self.lcm_tasks.register( "vim_account", vim_id, op_id, "cloud_credentials_remove", task @@ -696,7 +686,7 @@ class Lcm: if "credentials" in vim_config: self.logger.info("Vim update cloud credentials") task = asyncio.ensure_future( - self.cloud_credentials.edit(op_id, op_params, db_vim) + self.cloud_credentials.edit(params, order_id) ) self.lcm_tasks.register( "vim_account", vim_id, op_id, "cloud_credentials_update", task @@ -931,98 +921,39 @@ class Lcm: elif topic == "oka": op_id = params["operation_id"] oka_id = params["oka_id"] - self.oka.initialize_operation(oka_id, op_id) - db_oka = self.db.get_one("okas", {"_id": oka_id}) - op_params = self.get_operation_params(db_oka, op_id) if command == "create": - task = asyncio.ensure_future(self.oka.create(op_id, op_params, db_oka)) + task = asyncio.ensure_future(self.oka.create(params, order_id)) self.lcm_tasks.register("oka", oka_id, op_id, "oka_create", task) return elif command == "edit": - task = asyncio.ensure_future(self.oka.edit(op_id, op_params, db_oka)) + task = asyncio.ensure_future(self.oka.edit(params, order_id)) self.lcm_tasks.register("oka", oka_id, op_id, "oka_edit", task) return elif command == "delete": - task = asyncio.ensure_future(self.oka.delete(op_id, op_params, db_oka)) + task = asyncio.ensure_future(self.oka.delete(params, order_id)) self.lcm_tasks.register("oka", oka_id, op_id, "oka_delete", task) return elif topic == "ksu": op_id = params["operation_id"] - op_params = None - db_content = None - if not (command == "clone" or command == "move"): - # op_params is a list - # db_content is a list of KSU - db_content = [] - op_params = [] - for ksu_id in params["ksus_list"]: - self.ksu.initialize_operation(ksu_id, op_id) - db_ksu = self.db.get_one("ksus", {"_id": ksu_id}) - db_content.append(db_ksu) - ksu_params = {} - if command == "delete": - ksu_params["profile"] = {} - ksu_params["profile"]["profile_type"] = db_ksu["profile"][ - "profile_type" - ] - ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"] - else: - ksu_params = self.get_operation_params(db_ksu, op_id) - # Update ksu_params["profile"] with profile name and age-pubkey - profile_type = ksu_params["profile"]["profile_type"] - profile_id = ksu_params["profile"]["_id"] - profile_collection = self.profile_collection_mapping[profile_type] - db_profile = self.db.get_one( - profile_collection, {"_id": profile_id} - ) - ksu_params["profile"]["name"] = db_profile["name"] - ksu_params["profile"]["age_pubkey"] = db_profile.get( - "age_pubkey", "" - ) - if command == "create" or command == "edit" or command == "edited": - # Update ksu_params["oka"] with sw_catalog_path (when missing) - for oka in ksu_params["oka"]: - if "sw_catalog_path" not in oka: - oka_id = oka["_id"] - db_oka = self.db.get_one("okas", {"_id": oka_id}) - oka[ - "sw_catalog_path" - ] = f"infra-controllers/{db_oka['git_name']}" - op_params.append(ksu_params) - else: - # db_content and op_params are single items - self.ksu.initialize_operation(params["_id"], op_id) - db_content = self.db.get_one("ksus", {"_id": params["_id"]}) - db_content = db_ksu - op_params = self.get_operation_params(db_ksu, op_id) + ksu_id = op_id if command == "create": - task = asyncio.ensure_future( - self.ksu.create(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.ksu.create(params, order_id)) self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_create", task) return elif command == "edit" or command == "edited": - task = asyncio.ensure_future( - self.ksu.edit(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.ksu.edit(params, order_id)) self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_edit", task) return elif command == "delete": - task = asyncio.ensure_future( - self.ksu.delete(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.ksu.delete(params, order_id)) self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_delete", task) return elif command == "clone": - task = asyncio.ensure_future( - self.ksu.clone(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.ksu.clone(params, order_id)) self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_clone", task) return elif command == "move": - task = asyncio.ensure_future( - self.ksu.move(op_id, op_params, db_content) - ) + task = asyncio.ensure_future(self.ksu.move(params, order_id)) self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task) return