class GitOpsLcm(LcmBase):
+ db_collection = "gitops"
+
def __init__(self, msg, lcm_tasks, config):
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return True, "OK"
+ def initialize_operation(self, item_id, op_id):
+ db_item = self.db.get_one(self.db_collection, {"_id": item_id})
+ operation = next(
+ (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
+ None,
+ )
+ operation["workflowState"] = "PROCESSING"
+ operation["resourceState"] = "NOT_READY"
+ operation["operationState"] = "IN_PROGRESS"
+ operation["gitOperationInfo"] = None
+ db_item["current_operation"] = operation["op_id"]
+ self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
+
def update_operation_history(
self, content, workflow_status=None, resource_status=None
):
if command != "get_creds":
op_id = params["operation_id"]
cluster_id = params["cluster_id"]
+ self.cluster.initialize_operation(cluster_id, op_id)
db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
"""
if command in (
elif topic == "k8s_app":
op_id = params["operation_id"]
profile_id = params["profile_id"]
+ self.k8s_app.initialize_operation(profile_id, op_id)
db_profile = self.db.get_one("k8sapp", {"_id": profile_id})
db_profile["profile_type"] = "applications"
op_params = self.get_operation_params(db_profile, op_id)
elif topic == "k8s_resource":
op_id = params["operation_id"]
profile_id = params["profile_id"]
+ self.k8s_resource.initialize_operation(profile_id, op_id)
db_profile = self.db.get_one("k8sresource", {"_id": profile_id})
db_profile["profile_type"] = "managed-resources"
op_params = self.get_operation_params(db_profile, op_id)
elif topic == "k8s_infra_controller":
op_id = params["operation_id"]
profile_id = params["profile_id"]
+ self.k8s_infra_controller.initialize_operation(profile_id, op_id)
db_profile = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
db_profile["profile_type"] = "infra-controllers"
op_params = self.get_operation_params(db_profile, op_id)
elif topic == "k8s_infra_config":
op_id = params["operation_id"]
profile_id = params["profile_id"]
+ self.k8s_infra_config.initialize_operation(profile_id, op_id)
db_profile = self.db.get_one("k8sinfra_config", {"_id": profile_id})
db_profile["profile_type"] = "infra-configs"
op_params = self.get_operation_params(db_profile, op_id)
elif topic == "oka":
op_id = params["operation_id"]
oka_id = params["oka_id"]
+ self.oka.initialize_operation(oka_id, op_id)
db_oka = self.db.get_one("okas", {"_id": oka_id})
op_params = self.get_operation_params(db_oka, op_id)
if command == "create":
db_content = []
op_params = []
for ksu_id in params["ksus_list"]:
+ self.ksu.initialize_operation(ksu_id, op_id)
db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
db_content.append(db_ksu)
ksu_params = {}
op_params.append(ksu_params)
else:
# db_content and op_params are single items
+ self.ksu.initialize_operation(params["_id"], op_id)
db_content = self.db.get_one("ksus", {"_id": params["_id"]})
db_content = db_ksu
op_params = self.get_operation_params(db_ksu, op_id)