else:
return await self.check_dummy_operation(op_id, op_params, content)
+ def check_force_delete_and_delete_from_db(
+ self, _id, workflow_status, resource_status, force
+ ):
+ self.logger.info(
+ f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}"
+ )
+ if force and (not workflow_status or not resource_status):
+ self.db.del_one(self.db_collection, {"_id": _id})
+ return True
+ return False
+
def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
# This deep copy is intended to be passed to ODU workflows.
content_copy = copy.deepcopy(content)
db_cluster["current_operation"] = None
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+ force = params.get("force", False)
+ if force:
+ force_delete_status = self.check_force_delete_and_delete_from_db(
+ cluster_id, workflow_status, resource_status, force
+ )
+ if force_delete_status:
+ return
+
# To delete it from DB
if db_cluster["state"] == "DELETED":
self.delete_cluster(db_cluster)
"delete_profile", op_id, op_params, content
)
+ force = params.get("force", False)
+ if force:
+ force_delete_status = self.check_force_delete_and_delete_from_db(
+ profile_id, workflow_status, resource_status, force
+ )
+ if force_delete_status:
+ return
+
+ self.logger.info(f"Resource status: {resource_status}")
if resource_status:
content["state"] = "DELETED"
profile_type = self.profile_type_mapping[content["profile_type"]]
"delete_profile", op_id, op_params, content
)
+ force = params.get("force", False)
+ if force:
+ force_delete_status = self.check_force_delete_and_delete_from_db(
+ profile_id, workflow_status, resource_status, force
+ )
+ if force_delete_status:
+ return
+
if resource_status:
content["state"] = "DELETED"
profile_type = self.profile_type_mapping[content["profile_type"]]
"delete_profile", op_id, op_params, content
)
+ force = params.get("force", False)
+ if force:
+ force_delete_status = self.check_force_delete_and_delete_from_db(
+ profile_id, workflow_status, resource_status, force
+ )
+ if force_delete_status:
+ return
+
if resource_status:
content["state"] = "DELETED"
profile_type = self.profile_type_mapping[content["profile_type"]]
"delete_profile", op_id, op_params, content
)
+ force = params.get("force", False)
+ if force:
+ force_delete_status = self.check_force_delete_and_delete_from_db(
+ profile_id, workflow_status, resource_status, force
+ )
+ if force_delete_status:
+ return
+
if resource_status:
content["state"] = "DELETED"
profile_type = self.profile_type_mapping[content["profile_type"]]
"delete_oka", op_id, op_params, db_content
)
+ force = params.get("force", False)
+ if force:
+ force_delete_status = self.check_force_delete_and_delete_from_db(
+ oka_id, workflow_status, resource_status, force
+ )
+ if force_delete_status:
+ return
+
if resource_status:
db_content["state"] == "DELETED"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
"delete_ksus", op_id, ksu_params, db_ksu
)
+ force = params.get("force", False)
+ if force:
+ force_delete_status = self.check_force_delete_and_delete_from_db(
+ db_ksu["_id"], workflow_status, resource_status, force
+ )
+ if force_delete_status:
+ return
+
if resource_status:
db_ksu["state"] == "DELETED"
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)