Refactoring LCM code for vim_account, ksu and oka
Change-Id: I48215a8dc7933d9cc710acb01ed6706bacfbd5a9
Signed-off-by: yshah <shahithya.y@tataelxsi.co.in>
diff --git a/osm_lcm/k8s.py b/osm_lcm/k8s.py
index 892f220..37e638d 100644
--- a/osm_lcm/k8s.py
+++ b/osm_lcm/k8s.py
@@ -31,6 +31,15 @@
class GitOpsLcm(LcmBase):
db_collection = "gitops"
+ workflow_status = None
+ resource_status = None
+
+ profile_collection_mapping = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "resource_profiles": "k8sresource",
+ "app_profiles": "k8sapp",
+ }
profile_collection_mapping = {
"infra_controller_profiles": "k8sinfra_controller",
@@ -65,6 +74,20 @@
db_item["current_operation"] = operation["op_id"]
self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
+ def get_operation_params(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationParams", {})
+
+ def get_operation_type(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationType", {})
+
def update_operation_history(
self, content, op_id, workflow_status=None, resource_status=None
):
@@ -103,6 +126,60 @@
return content
+ async def check_workflow(self, op_id, workflow_name, db_content):
+ workflow_status, workflow_msg = await self.odu.check_workflow_status(
+ workflow_name
+ )
+ self.logger.info(
+ "Workflow Status: {} Workflow Message: {}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ operation_type = self.get_operation_type(db_content, op_id)
+ if operation_type == "create" and workflow_status:
+ db_content["state"] = "CREATED"
+ elif operation_type == "create" and not workflow_status:
+ db_content["state"] = "FAILED_CREATION"
+ elif operation_type == "delete" and workflow_status:
+ db_content["state"] = "DELETED"
+ elif operation_type == "delete" and not workflow_status:
+ db_content["state"] = "FAILED_DELETION"
+
+ if workflow_status:
+ db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ db_content["resourceState"] = "ERROR"
+
+ db_content = self.update_operation_history(
+ db_content, op_id, workflow_status, None
+ )
+ self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ return workflow_status
+
+ async def check_resource(self, resource_name, op_id, op_params, db_content):
+ workflow_status = True
+
+ resource_status, resource_msg = await self.check_resource_status(
+ resource_name, op_id, op_params, db_content
+ )
+ self.logger.info(
+ "Resource Status: {} Resource Message: {}".format(
+ resource_status, resource_msg
+ )
+ )
+
+ if resource_status:
+ db_content["resourceState"] = "READY"
+ else:
+ db_content["resourceState"] = "ERROR"
+
+ db_content = self.update_operation_history(
+ db_content, op_id, workflow_status, resource_status
+ )
+ db_content["operatingState"] = "IDLE"
+ db_content["current_operation"] = None
+ return resource_status, db_content
+
async def common_check_list(self, op_id, checkings_list, db_collection, db_item):
try:
for checking in checkings_list:
@@ -158,13 +235,6 @@
}
return db_cluster_copy
- def get_operation_params(self, item, operation_id):
- operation_history = item.get("operationHistory", [])
- operation = find_in_list(
- operation_history, lambda op: op["op_id"] == operation_id
- )
- return operation.get("operationParams", {})
-
class ClusterLcm(GitOpsLcm):
db_collection = "clusters"
@@ -1039,10 +1109,23 @@
"""
super().__init__(msg, lcm_tasks, config)
- async def add(self, op_id, op_params, content):
+ async def add(self, params, order_id):
self.logger.info("Cloud Credentials create")
+ vim_id = params["_id"]
+ op_id = vim_id
+ op_params = params
+ db_content = self.db.get_one(self.db_collection, {"_id": vim_id})
+ vim_config = db_content.get("config", {})
+ self.db.encrypt_decrypt_fields(
+ vim_config.get("credentials"),
+ "decrypt",
+ ["password", "secret"],
+ schema_version=db_content["schema_version"],
+ salt=vim_id,
+ )
+
_, workflow_name = await self.odu.launch_workflow(
- "create_cloud_credentials", op_id, op_params, content
+ "create_cloud_credentials", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
@@ -1055,7 +1138,7 @@
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
- "create_cloud_credentials", op_id, op_params, content
+ "create_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
@@ -1063,7 +1146,7 @@
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "create_cloud_credentials", op_id, op_params, content
+ "create_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
@@ -1071,18 +1154,31 @@
)
)
- content["_admin"]["operationalState"] = "ENABLED"
- for operation in content["_admin"]["operations"]:
+ db_content["_admin"]["operationalState"] = "ENABLED"
+ for operation in db_content["_admin"]["operations"]:
if operation["lcmOperationType"] == "create":
operation["operationState"] = "ENABLED"
- self.logger.info("Content : {}".format(content))
- self.db.set_one("vim_accounts", {"_id": content["_id"]}, content)
-
+ self.logger.info("Content : {}".format(db_content))
+ self.db.set_one("vim_accounts", {"_id": db_content["_id"]}, db_content)
return
- async def edit(self, op_id, op_params, content):
+ async def edit(self, params, order_id):
+ self.logger.info("Cloud Credentials Update")
+ vim_id = params["_id"]
+ op_id = vim_id
+ op_params = params
+ db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
+ vim_config = db_content.get("config", {})
+ self.db.encrypt_decrypt_fields(
+ vim_config.get("credentials"),
+ "decrypt",
+ ["password", "secret"],
+ schema_version=db_content["schema_version"],
+ salt=vim_id,
+ )
+
_, workflow_name = await self.odu.launch_workflow(
- "update_cloud_credentials", op_id, op_params, content
+ "update_cloud_credentials", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
@@ -1093,7 +1189,7 @@
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
- "update_cloud_credentials", op_id, op_params, content
+ "update_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
@@ -1101,7 +1197,7 @@
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "update_cloud_credentials", op_id, op_params, content
+ "update_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
@@ -1110,10 +1206,15 @@
)
return
- async def remove(self, op_id, op_params, content):
- self.logger.info("Cloud Credentials delete")
+ async def remove(self, params, order_id):
+ self.logger.info("Cloud Credentials remove")
+ vim_id = params["_id"]
+ op_id = vim_id
+ op_params = params
+ db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
+
_, workflow_name = await self.odu.launch_workflow(
- "delete_cloud_credentials", op_id, op_params, content
+ "delete_cloud_credentials", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
@@ -1124,14 +1225,14 @@
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "delete_cloud_credentials", op_id, op_params, content
+ "delete_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
- self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
return
@@ -1165,45 +1266,14 @@
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "CREATED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_CREATION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"create_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
- )
- content["current_operation"] = None
- self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
-
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.logger.info(f"App Create Exit with resource status: {resource_status}")
return
async def delete(self, params, order_id):
@@ -1223,48 +1293,18 @@
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "DELETED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_DELETION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"delete_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
- )
- content["current_operation"] = None
- self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
-
- # To delete it from DB
- if content["state"] == "DELETED":
- self.db.del_one("k8sapp", {"_id": content["_id"]})
+ if resource_status:
+ content["state"] = "DELETED"
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.logger.info(f"App Delete Exit with resource status: {resource_status}")
return
@@ -1298,45 +1338,16 @@
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "CREATED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_CREATION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"create_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.logger.info(
+ f"Resource Create Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
-
return
async def delete(self, params, order_id):
@@ -1356,48 +1367,20 @@
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "DELETED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_DELETION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"delete_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ if resource_status:
+ content["state"] = "DELETED"
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.logger.info(
+ f"Resource Delete Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
-
- # To delete it from DB
- if content["state"] == "DELETED":
- self.db.del_one("k8sresource", {"_id": content["_id"]})
return
@@ -1431,45 +1414,16 @@
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "CREATED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_CREATION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"create_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.logger.info(
+ f"Infra Controller Create Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
-
return
async def delete(self, params, order_id):
@@ -1489,48 +1443,20 @@
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "DELETED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_DELETION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"delete_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ if resource_status:
+ content["state"] = "DELETED"
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.logger.info(
+ f"Infra Controller Delete Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
-
- # To delete it from DB
- if content["state"] == "DELETED":
- self.db.del_one("k8sinfra_controller", {"_id": content["_id"]})
return
@@ -1564,45 +1490,16 @@
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "CREATED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_CREATION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"create_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.logger.info(
+ f"Infra Config Create Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
-
return
async def delete(self, params, order_id):
@@ -1622,47 +1519,21 @@
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
+
if workflow_status:
- content["state"] = "DELETED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_DELETION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
-
- resource_status, resource_msg = await self.check_resource_status(
- "delete_profile", op_id, op_params, content
- )
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
+ resource_status, content = await self.check_resource(
+ "delete_profile", op_id, op_params, content
)
- )
+
if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ content["state"] = "DELETED"
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.logger.info(
+ f"Infra Config Delete Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
- # To delete it from DB
- if content["state"] == "DELETED":
- self.db.del_one("k8sinfra_config", {"_id": content["_id"]})
return
@@ -1677,169 +1548,83 @@
"""
super().__init__(msg, lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("OKA Create Enter")
- db_content = content
+ op_id = params["operation_id"]
+ oka_id = params["oka_id"]
+ self.initialize_operation(oka_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
"create_oka", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
+
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- db_content["state"] = "CREATED"
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["state"] = "FAILED_CREATION"
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"create_oka", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- # self.logger.info("Db content: {}".format(db_content))
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
+ self.logger.info(f"OKA Create Exit with resource status: {resource_status}")
return
- async def edit(self, op_id, op_params, content):
+ async def edit(self, params, order_id):
self.logger.info("OKA Edit Enter")
- db_content = content
+ op_id = params["operation_id"]
+ oka_id = params["oka_id"]
+ self.initialize_operation(oka_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
- "update_oka", op_id, op_params, content
+ "update_oka", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- # self.logger.info("Db content: {}".format(db_content))
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"update_oka", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ self.logger.info(f"OKA Update Exit with resource status: {resource_status}")
return
- async def delete(self, op_id, op_params, content):
+ async def delete(self, params, order_id):
self.logger.info("OKA delete Enter")
- db_content = content
+ op_id = params["operation_id"]
+ oka_id = params["oka_id"]
+ self.initialize_operation(oka_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
- "delete_oka", op_id, op_params, content
+ "delete_oka", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- db_content["state"] = "DELETED"
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["state"] = "FAILED_DELETION"
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"delete_oka", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
- if db_content["state"] == "DELETED":
+ if resource_status:
+ db_content["state"] == "DELETED"
+ self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
+ self.logger.info(f"OKA Delete Exit with resource status: {resource_status}")
return
class KsuLcm(GitOpsLcm):
db_collection = "ksus"
+ profile_collection_mapping = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "resource_profiles": "k8sresource",
+ "app_profiles": "k8sapp",
+ }
def __init__(self, msg, lcm_tasks, config):
"""
@@ -1849,121 +1634,95 @@
"""
super().__init__(msg, lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("ksu Create Enter")
+ db_content = []
+ op_params = []
+ op_id = params["operation_id"]
+ for ksu_id in params["ksus_list"]:
+ self.logger.info("Ksu ID: {}".format(ksu_id))
+ self.initialize_operation(ksu_id, op_id)
+ db_ksu = self.db.get_one(self.db_collection, {"_id": ksu_id})
+ self.logger.info("Db KSU: {}".format(db_ksu))
+ db_content.append(db_ksu)
+ ksu_params = {}
+ ksu_params = self.get_operation_params(db_ksu, op_id)
+ self.logger.info("Operation Params: {}".format(ksu_params))
+ # Update ksu_params["profile"] with profile name and age-pubkey
+ profile_type = ksu_params["profile"]["profile_type"]
+ profile_id = ksu_params["profile"]["_id"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ ksu_params["profile"]["name"] = db_profile["name"]
+ ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
+ # Update ksu_params["oka"] with sw_catalog_path (when missing)
+ for oka in ksu_params["oka"]:
+ if "sw_catalog_path" not in oka:
+ oka_id = oka["_id"]
+ db_oka = self.db.get_one("okas", {"_id": oka_id})
+ oka["sw_catalog_path"] = f"infra-controllers/{db_oka['git_name']}"
+ op_params.append(ksu_params)
_, workflow_name = await self.odu.launch_workflow(
- "create_ksus", op_id, op_params, content
+ "create_ksus", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
+ for db_ksu, ksu_params in zip(db_content, op_params):
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu)
- for db_ksu in content:
if workflow_status:
- db_ksu["state"] = "CREATED"
- db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_ksu["state"] = "FAILED_CREATION"
- db_ksu["resourceState"] = "ERROR"
+ resource_status, db_ksu = await self.check_resource(
+ "create_ksus", op_id, ksu_params, db_ksu
+ )
- db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
- "create_ksus", op_id, op_params, content
+ "create_ksus", op_id, op_params, db_content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
- "create_ksus", op_id, op_params, content
- )
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- for db_ksu in content:
- if resource_status:
- db_ksu["resourceState"] = "READY"
- else:
- db_ksu["resourceState"] = "ERROR"
-
- db_ksu = self.update_operation_history(
- db_ksu, op_id, workflow_status, resource_status
- )
-
- for db_ksu in content:
- db_ksu["operatingState"] = "IDLE"
- db_ksu["current_operation"] = None
- self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
-
+ self.logger.info(f"KSU Create EXIT with Resource Status {resource_status}")
return
- async def edit(self, op_id, op_params, content):
+ async def edit(self, params, order_id):
self.logger.info("ksu edit Enter")
+ db_content = []
+ op_params = []
+ op_id = params["operation_id"]
+ for ksu_id in params["ksus_list"]:
+ self.initialize_operation(ksu_id, op_id)
+ db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
+ db_content.append(db_ksu)
+ ksu_params = {}
+ ksu_params = self.get_operation_params(db_ksu, op_id)
+ # Update ksu_params["profile"] with profile name and age-pubkey
+ profile_type = ksu_params["profile"]["profile_type"]
+ profile_id = ksu_params["profile"]["_id"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ ksu_params["profile"]["name"] = db_profile["name"]
+ ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
+ # Update ksu_params["oka"] with sw_catalog_path (when missing)
+ for oka in ksu_params["oka"]:
+ if "sw_catalog_path" not in oka:
+ oka_id = oka["_id"]
+ db_oka = self.db.get_one("okas", {"_id": oka_id})
+ oka["sw_catalog_path"] = f"infra-controllers/{db_oka['git_name']}"
+ op_params.append(ksu_params)
_, workflow_name = await self.odu.launch_workflow(
- "update_ksus", op_id, op_params, content
- )
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
+ "update_ksus", op_id, op_params, db_content
)
- for db_ksu in content:
+ for db_ksu, ksu_params in zip(db_content, op_params):
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu)
+
if workflow_status:
- db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_ksu["resourceState"] = "ERROR"
-
- db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None)
- self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
-
- # Clean items used in the workflow, no matter if the workflow succeeded
- clean_status, clean_msg = await self.odu.clean_items_workflow(
- "create_ksus", op_id, op_params, content
- )
- self.logger.info(
- f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
- )
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
- "update_ksus", op_id, op_params, content
- )
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
+ resource_status, db_ksu = await self.check_resource(
+ "update_ksus", op_id, ksu_params, db_ksu
)
- )
-
- for db_ksu in content:
- if resource_status:
- db_ksu["resourceState"] = "READY"
- else:
- db_ksu["resourceState"] = "ERROR"
-
- db_ksu = self.update_operation_history(
- db_ksu, op_id, workflow_status, resource_status
- )
-
- for db_ksu, ksu_params in zip(content, op_params):
- db_ksu["operatingState"] = "IDLE"
- if workflow_status:
db_ksu["name"] = ksu_params["name"]
db_ksu["description"] = ksu_params["description"]
db_ksu["profile"]["profile_type"] = ksu_params["profile"][
@@ -1971,159 +1730,100 @@
]
db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"]
db_ksu["oka"] = ksu_params["oka"]
- db_ksu["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
- return
-
- async def delete(self, op_id, op_params, content):
- self.logger.info("ksu delete Enter")
-
- _, workflow_name = await self.odu.launch_workflow(
- "delete_ksus", op_id, op_params, content
- )
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
+ # Clean items used in the workflow, no matter if the workflow succeeded
+ clean_status, clean_msg = await self.odu.clean_items_workflow(
+ "create_ksus", op_id, op_params, db_content
)
self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
+ f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
-
- for db_ksu in content:
- if workflow_status:
- db_ksu["state"] = "DELETED"
- db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_ksu["state"] = "FAILED_DELETION"
- db_ksu["resourceState"] = "ERROR"
-
- db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None)
- self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
- "delete_ksus", op_id, op_params, content
- )
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- for db_ksu in content:
- if resource_status:
- db_ksu["resourceState"] = "READY"
- else:
- db_ksu["resourceState"] = "ERROR"
-
- db_ksu = self.update_operation_history(
- db_ksu, op_id, workflow_status, resource_status
- )
-
- for db_ksu in content:
- db_ksu["operatingState"] = "IDLE"
- db_ksu["current_operation"] = None
- self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
-
- if db_ksu["state"] == "DELETED":
- self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]})
+ self.logger.info(f"KSU Update EXIT with Resource Status {resource_status}")
return
- async def clone(self, op_id, op_params, db_content):
- self.logger.info("ksu clone Enter")
+ async def delete(self, params, order_id):
+ self.logger.info("ksu delete Enter")
+ db_content = []
+ op_params = []
+ op_id = params["operation_id"]
+ for ksu_id in params["ksus_list"]:
+ self.initialize_operation(ksu_id, op_id)
+ db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
+ db_content.append(db_ksu)
+ ksu_params = {}
+ ksu_params["profile"] = {}
+ ksu_params["profile"]["profile_type"] = db_ksu["profile"]["profile_type"]
+ ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"]
+ # Update ksu_params["profile"] with profile name and age-pubkey
+ profile_type = ksu_params["profile"]["profile_type"]
+ profile_id = ksu_params["profile"]["_id"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ ksu_params["profile"]["name"] = db_profile["name"]
+ ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
+ op_params.append(ksu_params)
_, workflow_name = await self.odu.launch_workflow(
+ "delete_ksus", op_id, op_params, db_content
+ )
+
+ for db_ksu, ksu_params in zip(db_content, op_params):
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu)
+
+ if workflow_status:
+ resource_status, db_ksu = await self.check_resource(
+ "delete_ksus", op_id, ksu_params, db_ksu
+ )
+
+ if resource_status:
+ db_ksu["state"] == "DELETED"
+ self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
+ self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]})
+
+ self.logger.info(f"KSU Delete Exit with resource status: {resource_status}")
+ return
+
+ async def clone(self, params, order_id):
+ self.logger.info("ksu clone Enter")
+ op_id = params["operation_id"]
+ ksus_id = params["ksus_list"][0]
+ self.initialize_operation(ksus_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
+ op_params = self.get_operation_params(db_content, op_id)
+ _, workflow_name = await self.odu.launch_workflow(
"clone_ksus", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
+
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"clone_ksus", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+
+ self.logger.info(f"KSU Clone Exit with resource status: {resource_status}")
return
- async def move(self, op_id, op_params, db_content):
+ async def move(self, params, order_id):
self.logger.info("ksu move Enter")
-
+ op_id = params["operation_id"]
+ ksus_id = params["ksus_list"][0]
+ self.initialize_operation(ksus_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
"move_ksus", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
+
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"move_ksus", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+
+ self.logger.info(f"KSU Move Exit with resource status: {resource_status}")
return
diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py
index 54f9954..7d12e37 100644
--- a/osm_lcm/lcm.py
+++ b/osm_lcm/lcm.py
@@ -644,24 +644,14 @@
elif topic == "vim_account":
vim_id = params["_id"]
op_id = vim_id
- op_params = params
db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
vim_config = db_vim.get("config", {})
- if command in ("create", "created", "edit", "edited"):
- self.db.encrypt_decrypt_fields(
- vim_config.get("credentials"),
- "decrypt",
- ["password", "secret"],
- schema_version=db_vim["schema_version"],
- salt=vim_id,
- )
- self.logger.debug("Db Vim: {}".format(db_vim))
if command in ("create", "created"):
self.logger.debug("Main config: {}".format(self.main_config.to_dict()))
if "credentials" or "credentials_base64" in vim_config:
self.logger.info("Vim add cloud credentials")
task = asyncio.ensure_future(
- self.cloud_credentials.add(op_id, op_params, db_vim)
+ self.cloud_credentials.add(params, order_id)
)
self.lcm_tasks.register(
"vim_account", vim_id, op_id, "cloud_credentials_add", task
@@ -678,7 +668,7 @@
if "credentials" in vim_config:
self.logger.info("Vim remove cloud credentials")
task = asyncio.ensure_future(
- self.cloud_credentials.remove(op_id, op_params, db_vim)
+ self.cloud_credentials.remove(params, order_id)
)
self.lcm_tasks.register(
"vim_account", vim_id, op_id, "cloud_credentials_remove", task
@@ -696,7 +686,7 @@
if "credentials" in vim_config:
self.logger.info("Vim update cloud credentials")
task = asyncio.ensure_future(
- self.cloud_credentials.edit(op_id, op_params, db_vim)
+ self.cloud_credentials.edit(params, order_id)
)
self.lcm_tasks.register(
"vim_account", vim_id, op_id, "cloud_credentials_update", task
@@ -931,98 +921,39 @@
elif topic == "oka":
op_id = params["operation_id"]
oka_id = params["oka_id"]
- self.oka.initialize_operation(oka_id, op_id)
- db_oka = self.db.get_one("okas", {"_id": oka_id})
- op_params = self.get_operation_params(db_oka, op_id)
if command == "create":
- task = asyncio.ensure_future(self.oka.create(op_id, op_params, db_oka))
+ task = asyncio.ensure_future(self.oka.create(params, order_id))
self.lcm_tasks.register("oka", oka_id, op_id, "oka_create", task)
return
elif command == "edit":
- task = asyncio.ensure_future(self.oka.edit(op_id, op_params, db_oka))
+ task = asyncio.ensure_future(self.oka.edit(params, order_id))
self.lcm_tasks.register("oka", oka_id, op_id, "oka_edit", task)
return
elif command == "delete":
- task = asyncio.ensure_future(self.oka.delete(op_id, op_params, db_oka))
+ task = asyncio.ensure_future(self.oka.delete(params, order_id))
self.lcm_tasks.register("oka", oka_id, op_id, "oka_delete", task)
return
elif topic == "ksu":
op_id = params["operation_id"]
- op_params = None
- db_content = None
- if not (command == "clone" or command == "move"):
- # op_params is a list
- # db_content is a list of KSU
- db_content = []
- op_params = []
- for ksu_id in params["ksus_list"]:
- self.ksu.initialize_operation(ksu_id, op_id)
- db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
- db_content.append(db_ksu)
- ksu_params = {}
- if command == "delete":
- ksu_params["profile"] = {}
- ksu_params["profile"]["profile_type"] = db_ksu["profile"][
- "profile_type"
- ]
- ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"]
- else:
- ksu_params = self.get_operation_params(db_ksu, op_id)
- # Update ksu_params["profile"] with profile name and age-pubkey
- profile_type = ksu_params["profile"]["profile_type"]
- profile_id = ksu_params["profile"]["_id"]
- profile_collection = self.profile_collection_mapping[profile_type]
- db_profile = self.db.get_one(
- profile_collection, {"_id": profile_id}
- )
- ksu_params["profile"]["name"] = db_profile["name"]
- ksu_params["profile"]["age_pubkey"] = db_profile.get(
- "age_pubkey", ""
- )
- if command == "create" or command == "edit" or command == "edited":
- # Update ksu_params["oka"] with sw_catalog_path (when missing)
- for oka in ksu_params["oka"]:
- if "sw_catalog_path" not in oka:
- oka_id = oka["_id"]
- db_oka = self.db.get_one("okas", {"_id": oka_id})
- oka[
- "sw_catalog_path"
- ] = f"infra-controllers/{db_oka['git_name']}"
- op_params.append(ksu_params)
- else:
- # db_content and op_params are single items
- self.ksu.initialize_operation(params["_id"], op_id)
- db_content = self.db.get_one("ksus", {"_id": params["_id"]})
- db_content = db_ksu
- op_params = self.get_operation_params(db_ksu, op_id)
+ ksu_id = op_id
if command == "create":
- task = asyncio.ensure_future(
- self.ksu.create(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.ksu.create(params, order_id))
self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_create", task)
return
elif command == "edit" or command == "edited":
- task = asyncio.ensure_future(
- self.ksu.edit(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.ksu.edit(params, order_id))
self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_edit", task)
return
elif command == "delete":
- task = asyncio.ensure_future(
- self.ksu.delete(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.ksu.delete(params, order_id))
self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_delete", task)
return
elif command == "clone":
- task = asyncio.ensure_future(
- self.ksu.clone(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.ksu.clone(params, order_id))
self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_clone", task)
return
elif command == "move":
- task = asyncio.ensure_future(
- self.ksu.move(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.ksu.move(params, order_id))
self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task)
return