class GitOpsLcm(LcmBase):
db_collection = "gitops"
+ workflow_status = None
+ resource_status = None
+
+ profile_collection_mapping = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "resource_profiles": "k8sresource",
+ "app_profiles": "k8sapp",
+ }
profile_collection_mapping = {
"infra_controller_profiles": "k8sinfra_controller",
db_item["current_operation"] = operation["op_id"]
self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
+ def get_operation_params(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationParams", {})
+
+ def get_operation_type(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationType", {})
+
def update_operation_history(
self, content, op_id, workflow_status=None, resource_status=None
):
return content
+ async def check_workflow(self, op_id, workflow_name, db_content):
+ workflow_status, workflow_msg = await self.odu.check_workflow_status(
+ workflow_name
+ )
+ self.logger.info(
+ "Workflow Status: {} Workflow Message: {}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ operation_type = self.get_operation_type(db_content, op_id)
+ if operation_type == "create" and workflow_status:
+ db_content["state"] = "CREATED"
+ elif operation_type == "create" and not workflow_status:
+ db_content["state"] = "FAILED_CREATION"
+ elif operation_type == "delete" and workflow_status:
+ db_content["state"] = "DELETED"
+ elif operation_type == "delete" and not workflow_status:
+ db_content["state"] = "FAILED_DELETION"
+
+ if workflow_status:
+ db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ db_content["resourceState"] = "ERROR"
+
+ db_content = self.update_operation_history(
+ db_content, op_id, workflow_status, None
+ )
+ self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ return workflow_status
+
+ async def check_resource(self, resource_name, op_id, op_params, db_content):
+ workflow_status = True
+
+ resource_status, resource_msg = await self.check_resource_status(
+ resource_name, op_id, op_params, db_content
+ )
+ self.logger.info(
+ "Resource Status: {} Resource Message: {}".format(
+ resource_status, resource_msg
+ )
+ )
+
+ if resource_status:
+ db_content["resourceState"] = "READY"
+ else:
+ db_content["resourceState"] = "ERROR"
+
+ db_content = self.update_operation_history(
+ db_content, op_id, workflow_status, resource_status
+ )
+ db_content["operatingState"] = "IDLE"
+ db_content["current_operation"] = None
+ return resource_status, db_content
+
async def common_check_list(self, op_id, checkings_list, db_collection, db_item):
try:
for checking in checkings_list:
}
return db_cluster_copy
- def get_operation_params(self, item, operation_id):
- operation_history = item.get("operationHistory", [])
- operation = find_in_list(
- operation_history, lambda op: op["op_id"] == operation_id
- )
- return operation.get("operationParams", {})
-
class ClusterLcm(GitOpsLcm):
db_collection = "clusters"
"""
super().__init__(msg, lcm_tasks, config)
- async def add(self, op_id, op_params, content):
+ async def add(self, params, order_id):
self.logger.info("Cloud Credentials create")
+ vim_id = params["_id"]
+ op_id = vim_id
+ op_params = params
+ db_content = self.db.get_one(self.db_collection, {"_id": vim_id})
+ vim_config = db_content.get("config", {})
+ self.db.encrypt_decrypt_fields(
+ vim_config.get("credentials"),
+ "decrypt",
+ ["password", "secret"],
+ schema_version=db_content["schema_version"],
+ salt=vim_id,
+ )
+
_, workflow_name = await self.odu.launch_workflow(
- "create_cloud_credentials", op_id, op_params, content
+ "create_cloud_credentials", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
- "create_cloud_credentials", op_id, op_params, content
+ "create_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "create_cloud_credentials", op_id, op_params, content
+ "create_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
)
)
- content["_admin"]["operationalState"] = "ENABLED"
- for operation in content["_admin"]["operations"]:
+ db_content["_admin"]["operationalState"] = "ENABLED"
+ for operation in db_content["_admin"]["operations"]:
if operation["lcmOperationType"] == "create":
operation["operationState"] = "ENABLED"
- self.logger.info("Content : {}".format(content))
- self.db.set_one("vim_accounts", {"_id": content["_id"]}, content)
-
+ self.logger.info("Content : {}".format(db_content))
+ self.db.set_one("vim_accounts", {"_id": db_content["_id"]}, db_content)
return
- async def edit(self, op_id, op_params, content):
+ async def edit(self, params, order_id):
+ self.logger.info("Cloud Credentials Update")
+ vim_id = params["_id"]
+ op_id = vim_id
+ op_params = params
+ db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
+ vim_config = db_content.get("config", {})
+ self.db.encrypt_decrypt_fields(
+ vim_config.get("credentials"),
+ "decrypt",
+ ["password", "secret"],
+ schema_version=db_content["schema_version"],
+ salt=vim_id,
+ )
+
_, workflow_name = await self.odu.launch_workflow(
- "update_cloud_credentials", op_id, op_params, content
+ "update_cloud_credentials", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
- "update_cloud_credentials", op_id, op_params, content
+ "update_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "update_cloud_credentials", op_id, op_params, content
+ "update_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
)
return
- async def remove(self, op_id, op_params, content):
- self.logger.info("Cloud Credentials delete")
+ async def remove(self, params, order_id):
+ self.logger.info("Cloud Credentials remove")
+ vim_id = params["_id"]
+ op_id = vim_id
+ op_params = params
+ db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
+
_, workflow_name = await self.odu.launch_workflow(
- "delete_cloud_credentials", op_id, op_params, content
+ "delete_cloud_credentials", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "delete_cloud_credentials", op_id, op_params, content
+ "delete_cloud_credentials", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
- self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
return
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "CREATED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_CREATION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"create_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
- )
- content["current_operation"] = None
- self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
-
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.logger.info(f"App Create Exit with resource status: {resource_status}")
return
async def delete(self, params, order_id):
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "DELETED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_DELETION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"delete_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
- )
- content["current_operation"] = None
- self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
- # To delete it from DB
- if content["state"] == "DELETED":
- self.db.del_one("k8sapp", {"_id": content["_id"]})
+ if resource_status:
+ content["state"] = "DELETED"
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.logger.info(f"App Delete Exit with resource status: {resource_status}")
return
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "CREATED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_CREATION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"create_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.logger.info(
+ f"Resource Create Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
-
return
async def delete(self, params, order_id):
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "DELETED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_DELETION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"delete_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ if resource_status:
+ content["state"] = "DELETED"
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.logger.info(
+ f"Resource Delete Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
-
- # To delete it from DB
- if content["state"] == "DELETED":
- self.db.del_one("k8sresource", {"_id": content["_id"]})
return
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "CREATED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_CREATION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"create_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.logger.info(
+ f"Infra Controller Create Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
-
return
async def delete(self, params, order_id):
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "DELETED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_DELETION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"delete_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ if resource_status:
+ content["state"] = "DELETED"
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.logger.info(
+ f"Infra Controller Delete Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
-
- # To delete it from DB
- if content["state"] == "DELETED":
- self.db.del_one("k8sinfra_controller", {"_id": content["_id"]})
return
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "CREATED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_CREATION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, content = await self.check_resource(
"create_profile", op_id, op_params, content
)
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
-
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.logger.info(
+ f"Infra Config Create Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
-
return
async def delete(self, params, order_id):
)
self.logger.info("workflow_name is :{}".format(workflow_name))
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "workflow_status is :{} and workflow_msg is :{}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- content["state"] = "DELETED"
- content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- content["state"] = "FAILED_DELETION"
- content["resourceState"] = "ERROR"
- # has to call update_operation_history return content
- content = self.update_operation_history(content, op_id, workflow_status, None)
- self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, content)
- resource_status, resource_msg = await self.check_resource_status(
- "delete_profile", op_id, op_params, content
- )
- self.logger.info(
- "resource_status is :{} and resource_msg is :{}".format(
- resource_status, resource_msg
+ if workflow_status:
+ resource_status, content = await self.check_resource(
+ "delete_profile", op_id, op_params, content
)
- )
- if resource_status:
- content["resourceState"] = "READY"
- else:
- content["resourceState"] = "ERROR"
- content["operatingState"] = "IDLE"
- content = self.update_operation_history(
- content, op_id, workflow_status, resource_status
+ if resource_status:
+ content["state"] = "DELETED"
+ self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
+ self.db.del_one(self.db_collection, {"_id": content["_id"]})
+ self.logger.info(
+ f"Infra Config Delete Exit with resource status: {resource_status}"
)
- content["current_operation"] = None
- self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
- # To delete it from DB
- if content["state"] == "DELETED":
- self.db.del_one("k8sinfra_config", {"_id": content["_id"]})
return
"""
super().__init__(msg, lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("OKA Create Enter")
- db_content = content
+ op_id = params["operation_id"]
+ oka_id = params["oka_id"]
+ self.initialize_operation(oka_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
"create_oka", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- db_content["state"] = "CREATED"
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["state"] = "FAILED_CREATION"
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"create_oka", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- # self.logger.info("Db content: {}".format(db_content))
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
+ self.logger.info(f"OKA Create Exit with resource status: {resource_status}")
return
- async def edit(self, op_id, op_params, content):
+ async def edit(self, params, order_id):
self.logger.info("OKA Edit Enter")
- db_content = content
+ op_id = params["operation_id"]
+ oka_id = params["oka_id"]
+ self.initialize_operation(oka_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
- "update_oka", op_id, op_params, content
- )
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
+ "update_oka", op_id, op_params, db_content
)
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- # self.logger.info("Db content: {}".format(db_content))
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"update_oka", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ self.logger.info(f"OKA Update Exit with resource status: {resource_status}")
return
- async def delete(self, op_id, op_params, content):
+ async def delete(self, params, order_id):
self.logger.info("OKA delete Enter")
- db_content = content
+ op_id = params["operation_id"]
+ oka_id = params["oka_id"]
+ self.initialize_operation(oka_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
- "delete_oka", op_id, op_params, content
- )
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
-
- if workflow_status:
- db_content["state"] = "DELETED"
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["state"] = "FAILED_DELETION"
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
+ "delete_oka", op_id, op_params, db_content
)
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"delete_oka", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-
- if db_content["state"] == "DELETED":
+ if resource_status:
+ db_content["state"] == "DELETED"
+ self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
+ self.logger.info(f"OKA Delete Exit with resource status: {resource_status}")
return
class KsuLcm(GitOpsLcm):
db_collection = "ksus"
+ profile_collection_mapping = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "resource_profiles": "k8sresource",
+ "app_profiles": "k8sapp",
+ }
def __init__(self, msg, lcm_tasks, config):
"""
"""
super().__init__(msg, lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("ksu Create Enter")
+ db_content = []
+ op_params = []
+ op_id = params["operation_id"]
+ for ksu_id in params["ksus_list"]:
+ self.logger.info("Ksu ID: {}".format(ksu_id))
+ self.initialize_operation(ksu_id, op_id)
+ db_ksu = self.db.get_one(self.db_collection, {"_id": ksu_id})
+ self.logger.info("Db KSU: {}".format(db_ksu))
+ db_content.append(db_ksu)
+ ksu_params = {}
+ ksu_params = self.get_operation_params(db_ksu, op_id)
+ self.logger.info("Operation Params: {}".format(ksu_params))
+ # Update ksu_params["profile"] with profile name and age-pubkey
+ profile_type = ksu_params["profile"]["profile_type"]
+ profile_id = ksu_params["profile"]["_id"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ ksu_params["profile"]["name"] = db_profile["name"]
+ ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
+ # Update ksu_params["oka"] with sw_catalog_path (when missing)
+ for oka in ksu_params["oka"]:
+ if "sw_catalog_path" not in oka:
+ oka_id = oka["_id"]
+ db_oka = self.db.get_one("okas", {"_id": oka_id})
+ oka["sw_catalog_path"] = f"infra-controllers/{db_oka['git_name']}"
+ op_params.append(ksu_params)
_, workflow_name = await self.odu.launch_workflow(
- "create_ksus", op_id, op_params, content
- )
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
+ "create_ksus", op_id, op_params, db_content
)
+ for db_ksu, ksu_params in zip(db_content, op_params):
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu)
- for db_ksu in content:
if workflow_status:
- db_ksu["state"] = "CREATED"
- db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_ksu["state"] = "FAILED_CREATION"
- db_ksu["resourceState"] = "ERROR"
+ resource_status, db_ksu = await self.check_resource(
+ "create_ksus", op_id, ksu_params, db_ksu
+ )
- db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
- "create_ksus", op_id, op_params, content
+ "create_ksus", op_id, op_params, db_content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
- "create_ksus", op_id, op_params, content
- )
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- for db_ksu in content:
- if resource_status:
- db_ksu["resourceState"] = "READY"
- else:
- db_ksu["resourceState"] = "ERROR"
-
- db_ksu = self.update_operation_history(
- db_ksu, op_id, workflow_status, resource_status
- )
-
- for db_ksu in content:
- db_ksu["operatingState"] = "IDLE"
- db_ksu["current_operation"] = None
- self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
-
+ self.logger.info(f"KSU Create EXIT with Resource Status {resource_status}")
return
- async def edit(self, op_id, op_params, content):
+ async def edit(self, params, order_id):
self.logger.info("ksu edit Enter")
+ db_content = []
+ op_params = []
+ op_id = params["operation_id"]
+ for ksu_id in params["ksus_list"]:
+ self.initialize_operation(ksu_id, op_id)
+ db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
+ db_content.append(db_ksu)
+ ksu_params = {}
+ ksu_params = self.get_operation_params(db_ksu, op_id)
+ # Update ksu_params["profile"] with profile name and age-pubkey
+ profile_type = ksu_params["profile"]["profile_type"]
+ profile_id = ksu_params["profile"]["_id"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ ksu_params["profile"]["name"] = db_profile["name"]
+ ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
+ # Update ksu_params["oka"] with sw_catalog_path (when missing)
+ for oka in ksu_params["oka"]:
+ if "sw_catalog_path" not in oka:
+ oka_id = oka["_id"]
+ db_oka = self.db.get_one("okas", {"_id": oka_id})
+ oka["sw_catalog_path"] = f"infra-controllers/{db_oka['git_name']}"
+ op_params.append(ksu_params)
_, workflow_name = await self.odu.launch_workflow(
- "update_ksus", op_id, op_params, content
+ "update_ksus", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
-
- for db_ksu in content:
- if workflow_status:
- db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_ksu["resourceState"] = "ERROR"
- db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None)
- self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
+ for db_ksu, ksu_params in zip(db_content, op_params):
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu)
- # Clean items used in the workflow, no matter if the workflow succeeded
- clean_status, clean_msg = await self.odu.clean_items_workflow(
- "create_ksus", op_id, op_params, content
- )
- self.logger.info(
- f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
- )
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
- "update_ksus", op_id, op_params, content
- )
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- for db_ksu in content:
- if resource_status:
- db_ksu["resourceState"] = "READY"
- else:
- db_ksu["resourceState"] = "ERROR"
-
- db_ksu = self.update_operation_history(
- db_ksu, op_id, workflow_status, resource_status
- )
-
- for db_ksu, ksu_params in zip(content, op_params):
- db_ksu["operatingState"] = "IDLE"
if workflow_status:
+ resource_status, db_ksu = await self.check_resource(
+ "update_ksus", op_id, ksu_params, db_ksu
+ )
db_ksu["name"] = ksu_params["name"]
db_ksu["description"] = ksu_params["description"]
db_ksu["profile"]["profile_type"] = ksu_params["profile"][
]
db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"]
db_ksu["oka"] = ksu_params["oka"]
- db_ksu["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
+ # Clean items used in the workflow, no matter if the workflow succeeded
+ clean_status, clean_msg = await self.odu.clean_items_workflow(
+ "create_ksus", op_id, op_params, db_content
+ )
+ self.logger.info(
+ f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
+ )
+ self.logger.info(f"KSU Update EXIT with Resource Status {resource_status}")
return
- async def delete(self, op_id, op_params, content):
+ async def delete(self, params, order_id):
self.logger.info("ksu delete Enter")
+ db_content = []
+ op_params = []
+ op_id = params["operation_id"]
+ for ksu_id in params["ksus_list"]:
+ self.initialize_operation(ksu_id, op_id)
+ db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
+ db_content.append(db_ksu)
+ ksu_params = {}
+ ksu_params["profile"] = {}
+ ksu_params["profile"]["profile_type"] = db_ksu["profile"]["profile_type"]
+ ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"]
+ # Update ksu_params["profile"] with profile name and age-pubkey
+ profile_type = ksu_params["profile"]["profile_type"]
+ profile_id = ksu_params["profile"]["_id"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ ksu_params["profile"]["name"] = db_profile["name"]
+ ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
+ op_params.append(ksu_params)
_, workflow_name = await self.odu.launch_workflow(
- "delete_ksus", op_id, op_params, content
- )
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
+ "delete_ksus", op_id, op_params, db_content
)
- for db_ksu in content:
- if workflow_status:
- db_ksu["state"] = "DELETED"
- db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_ksu["state"] = "FAILED_DELETION"
- db_ksu["resourceState"] = "ERROR"
+ for db_ksu, ksu_params in zip(db_content, op_params):
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_ksu)
- db_ksu = self.update_operation_history(db_ksu, op_id, workflow_status, None)
- self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
-
- if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
- "delete_ksus", op_id, op_params, content
- )
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- for db_ksu in content:
- if resource_status:
- db_ksu["resourceState"] = "READY"
- else:
- db_ksu["resourceState"] = "ERROR"
-
- db_ksu = self.update_operation_history(
- db_ksu, op_id, workflow_status, resource_status
+ if workflow_status:
+ resource_status, db_ksu = await self.check_resource(
+ "delete_ksus", op_id, ksu_params, db_ksu
)
- for db_ksu in content:
- db_ksu["operatingState"] = "IDLE"
- db_ksu["current_operation"] = None
- self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
-
- if db_ksu["state"] == "DELETED":
+ if resource_status:
+ db_ksu["state"] == "DELETED"
+ self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]})
+
+ self.logger.info(f"KSU Delete Exit with resource status: {resource_status}")
return
- async def clone(self, op_id, op_params, db_content):
+ async def clone(self, params, order_id):
self.logger.info("ksu clone Enter")
-
+ op_id = params["operation_id"]
+ ksus_id = params["ksus_list"][0]
+ self.initialize_operation(ksus_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
"clone_ksus", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
- if workflow_status:
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"clone_ksus", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+
+ self.logger.info(f"KSU Clone Exit with resource status: {resource_status}")
return
- async def move(self, op_id, op_params, db_content):
+ async def move(self, params, order_id):
self.logger.info("ksu move Enter")
-
+ op_id = params["operation_id"]
+ ksus_id = params["ksus_list"][0]
+ self.initialize_operation(ksus_id, op_id)
+ db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
+ op_params = self.get_operation_params(db_content, op_id)
_, workflow_name = await self.odu.launch_workflow(
"move_ksus", op_id, op_params, db_content
)
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
-
- if workflow_status:
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["resourceState"] = "ERROR"
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ workflow_status = await self.check_workflow(op_id, workflow_name, db_content)
if workflow_status:
- resource_status, resource_msg = await self.check_resource_status(
+ resource_status, db_content = await self.check_resource(
"move_ksus", op_id, op_params, db_content
)
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
-
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+
+ self.logger.info(f"KSU Move Exit with resource status: {resource_status}")
return