Feature 11073: Enhanced OSM declarative modelling for applications. App as first class citizen
Change-Id: I6b750f4d862692ab885e98afe3771ba817dd6535
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
diff --git a/osm_lcm/gitops.py b/osm_lcm/gitops.py
new file mode 100644
index 0000000..22d2717
--- /dev/null
+++ b/osm_lcm/gitops.py
@@ -0,0 +1,445 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import logging
+import tempfile
+from time import time
+import traceback
+from git import Repo
+from osm_lcm.lcm_utils import LcmBase
+from osm_lcm import odu_workflows
+from osm_lcm.data_utils.list_utils import find_in_list
+from osm_lcm.n2vc.kubectl import Kubectl
+import yaml
+from urllib.parse import quote
+
+
+class GitOpsLcm(LcmBase):
+ db_collection = "gitops"
+ workflow_status = None
+ resource_status = None
+
+ profile_collection_mapping = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "resource_profiles": "k8sresource",
+ "app_profiles": "k8sapp",
+ }
+
+ profile_type_mapping = {
+ "infra-controllers": "infra_controller_profiles",
+ "infra-configs": "infra_config_profiles",
+ "managed-resources": "resource_profiles",
+ "applications": "app_profiles",
+ }
+
+ def __init__(self, msg, lcm_tasks, config):
+ self.logger = logging.getLogger("lcm.gitops")
+ self.lcm_tasks = lcm_tasks
+ self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+ self._checkloop_kustomization_timeout = 900
+ self._checkloop_resource_timeout = 900
+ self._workflows = {}
+ self.gitops_config = config["gitops"]
+ self.logger.debug(f"GitOps config: {self.gitops_config}")
+ self._repo_base_url = self.gitops_config.get("git_base_url")
+ self._repo_user = self.gitops_config.get("user")
+ self._repo_sw_catalogs_url = self.gitops_config.get(
+ "sw_catalogs_repo_url",
+ f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
+ )
+ self._repo_password = self.gitops_config.get("password", "OUM+O61Iy1")
+ self._full_repo_sw_catalogs_url = self.build_git_url_with_credentials(
+ self._repo_sw_catalogs_url
+ )
+ super().__init__(msg, self.logger)
+
+ def build_git_url_with_credentials(self, repo_url):
+ # Build authenticated URL if credentials were provided
+ if self._repo_password:
+ # URL-safe escape password
+ safe_user = quote(self._repo_user)
+ safe_pass = quote(self._repo_password)
+
+ # Insert credentials into the URL
+ # e.g. https://username:password@github.com/org/repo.git
+ auth_url = repo_url.replace("https://", f"https://{safe_user}:{safe_pass}@")
+ auth_url = repo_url.replace("http://", f"https://{safe_user}:{safe_pass}@")
+ else:
+ auth_url = repo_url
+ return auth_url
+
+ async def check_dummy_operation(self, op_id, op_params, content):
+ self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
+ return True, "OK"
+
+ def initialize_operation(self, item_id, op_id):
+ db_item = self.db.get_one(self.db_collection, {"_id": item_id})
+ operation = next(
+ (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
+ None,
+ )
+ operation["workflowState"] = "PROCESSING"
+ operation["resourceState"] = "NOT_READY"
+ operation["operationState"] = "IN_PROGRESS"
+ operation["gitOperationInfo"] = None
+ db_item["current_operation"] = operation["op_id"]
+ self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
+
+ def get_operation_params(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationParams", {})
+
+ def get_operation_type(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationType", {})
+
+ def update_state_operation_history(
+ self, content, op_id, workflow_state=None, resource_state=None
+ ):
+ self.logger.info(
+ f"Update state of operation {op_id} in Operation History in DB"
+ )
+ self.logger.info(
+ f"Workflow state: {workflow_state}. Resource state: {resource_state}"
+ )
+ self.logger.debug(f"Content: {content}")
+
+ op_num = 0
+ for operation in content["operationHistory"]:
+ self.logger.debug("Operations: {}".format(operation))
+ if operation["op_id"] == op_id:
+ self.logger.debug("Found operation number: {}".format(op_num))
+ if workflow_state is not None:
+ operation["workflowState"] = workflow_state
+
+ if resource_state is not None:
+ operation["resourceState"] = resource_state
+ break
+ op_num += 1
+ self.logger.debug("content: {}".format(content))
+
+ return content
+
+ def update_operation_history(
+ self, content, op_id, workflow_status=None, resource_status=None, op_end=True
+ ):
+ self.logger.info(
+ f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
+ )
+ self.logger.debug(f"Content: {content}")
+
+ op_num = 0
+ for operation in content["operationHistory"]:
+ self.logger.debug("Operations: {}".format(operation))
+ if operation["op_id"] == op_id:
+ self.logger.debug("Found operation number: {}".format(op_num))
+ if workflow_status is not None:
+ if workflow_status:
+ operation["workflowState"] = "COMPLETED"
+ operation["result"] = True
+ else:
+ operation["workflowState"] = "ERROR"
+ operation["operationState"] = "FAILED"
+ operation["result"] = False
+
+ if resource_status is not None:
+ if resource_status:
+ operation["resourceState"] = "READY"
+ operation["operationState"] = "COMPLETED"
+ operation["result"] = True
+ else:
+ operation["resourceState"] = "NOT_READY"
+ operation["operationState"] = "FAILED"
+ operation["result"] = False
+
+ if op_end:
+ now = time()
+ operation["endDate"] = now
+ break
+ op_num += 1
+ self.logger.debug("content: {}".format(content))
+
+ return content
+
+ async def check_workflow_and_update_db(self, op_id, workflow_name, db_content):
+ workflow_status, workflow_msg = await self.odu.check_workflow_status(
+ op_id, workflow_name
+ )
+ self.logger.info(
+ "Workflow Status: {} Workflow Message: {}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ operation_type = self.get_operation_type(db_content, op_id)
+ if operation_type == "create" and workflow_status:
+ db_content["state"] = "CREATED"
+ elif operation_type == "create" and not workflow_status:
+ db_content["state"] = "FAILED_CREATION"
+ elif operation_type == "delete" and workflow_status:
+ db_content["state"] = "DELETED"
+ elif operation_type == "delete" and not workflow_status:
+ db_content["state"] = "FAILED_DELETION"
+
+ if workflow_status:
+ db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ db_content["resourceState"] = "ERROR"
+
+ db_content = self.update_operation_history(
+ db_content, op_id, workflow_status, None
+ )
+ self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ return workflow_status
+
+ async def check_resource_and_update_db(
+ self, resource_name, op_id, op_params, db_content
+ ):
+ workflow_status = True
+
+ resource_status, resource_msg = await self.check_resource_status(
+ resource_name, op_id, op_params, db_content
+ )
+ self.logger.info(
+ "Resource Status: {} Resource Message: {}".format(
+ resource_status, resource_msg
+ )
+ )
+
+ if resource_status:
+ db_content["resourceState"] = "READY"
+ else:
+ db_content["resourceState"] = "ERROR"
+
+ db_content = self.update_operation_history(
+ db_content, op_id, workflow_status, resource_status
+ )
+ db_content["operatingState"] = "IDLE"
+ db_content["current_operation"] = None
+ return resource_status, db_content
+
+ async def common_check_list(
+ self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None
+ ):
+ try:
+ for checking in checkings_list:
+ if checking["enable"]:
+ status, message = await self.odu.readiness_loop(
+ op_id=op_id,
+ item=checking["item"],
+ name=checking["name"],
+ namespace=checking["namespace"],
+ condition=checking.get("condition"),
+ deleted=checking.get("deleted", False),
+ timeout=checking["timeout"],
+ kubectl_obj=kubectl_obj,
+ )
+ if not status:
+ error_message = "Resources not ready: "
+ error_message += checking.get("error_message", "")
+ return status, f"{error_message}: {message}"
+ else:
+ db_item["resourceState"] = checking["resourceState"]
+ db_item = self.update_state_operation_history(
+ db_item, op_id, None, checking["resourceState"]
+ )
+ self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
+ except Exception as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.debug(f"Exception: {e}", exc_info=True)
+ return False, f"Unexpected exception: {e}"
+ return True, "OK"
+
+ async def check_resource_status(self, key, op_id, op_params, content):
+ self.logger.info(
+ f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
+ )
+ self.logger.debug(f"Check resource status. Content: {content}")
+ check_resource_function = self._workflows.get(key, {}).get(
+ "check_resource_function"
+ )
+ self.logger.info("check_resource function : {}".format(check_resource_function))
+ if check_resource_function:
+ return await check_resource_function(op_id, op_params, content)
+ else:
+ return await self.check_dummy_operation(op_id, op_params, content)
+
+ def check_force_delete_and_delete_from_db(
+ self, _id, workflow_status, resource_status, force
+ ):
+ self.logger.info(
+ f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}"
+ )
+ if force and (not workflow_status or not resource_status):
+ self.db.del_one(self.db_collection, {"_id": _id})
+ return True
+ return False
+
+ def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
+ self.db.encrypt_decrypt_fields(
+ content,
+ "decrypt",
+ fields,
+ schema_version="1.11",
+ salt=content["_id"],
+ )
+
+ def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
+ self.db.encrypt_decrypt_fields(
+ content,
+ "encrypt",
+ fields,
+ schema_version="1.11",
+ salt=content["_id"],
+ )
+
+ def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
+ # This deep copy is intended to be passed to ODU workflows.
+ content_copy = copy.deepcopy(content)
+
+ # decrypting the key
+ self.db.encrypt_decrypt_fields(
+ content_copy,
+ "decrypt",
+ fields,
+ schema_version="1.11",
+ salt=content_copy["_id"],
+ )
+ return content_copy
+
+ def delete_ksu_dependency(self, _id, data):
+ used_oka = []
+ existing_oka = []
+
+ for oka_data in data["oka"]:
+ if oka_data.get("_id"):
+ used_oka.append(oka_data["_id"])
+
+ all_ksu_data = self.db.get_list("ksus", {})
+ for ksu_data in all_ksu_data:
+ if ksu_data["_id"] != _id:
+ for oka_data in ksu_data["oka"]:
+ if oka_data.get("_id"):
+ if oka_data["_id"] not in existing_oka:
+ existing_oka.append(oka_data["_id"])
+
+ self.logger.info(f"Used OKA: {used_oka}")
+ self.logger.info(f"Existing OKA: {existing_oka}")
+
+ for oka_id in used_oka:
+ if oka_id not in existing_oka:
+ self.db.set_one(
+ "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
+ )
+
+ return
+
+ def delete_profile_ksu(self, _id, profile_type):
+ filter_q = {"profile": {"_id": _id, "profile_type": profile_type}}
+ ksu_list = self.db.get_list("ksus", filter_q)
+ for ksu_data in ksu_list:
+ self.delete_ksu_dependency(ksu_data["_id"], ksu_data)
+
+ if ksu_list:
+ self.db.del_list("ksus", filter_q)
+ return
+
+ def cluster_kubectl(self, db_cluster):
+ cluster_kubeconfig = db_cluster["credentials"]
+ kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
+ with open(kubeconfig_path, "w") as kubeconfig_file:
+ yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
+ return Kubectl(config_file=kubeconfig_path)
+
+ def cloneGitRepo(self, repo_url, branch):
+ self.logger.debug(f"Cloning repo {repo_url}, branch {branch}")
+ tmpdir = tempfile.mkdtemp()
+ self.logger.debug(f"Created temp folder {tmpdir}")
+ cloned_repo = Repo.clone_from(
+ repo_url,
+ tmpdir,
+ allow_unsafe_options=True,
+ multi_options=["-c", "http.sslVerify=false"],
+ )
+ self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
+ assert cloned_repo
+ new_branch = cloned_repo.create_head(branch) # create a new branch
+ assert new_branch.checkout() == cloned_repo.active_branch
+ self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
+ self.logger.info(f"Repo {repo_url} cloned in {tmpdir}. New branch: {branch}")
+ return tmpdir
+
+ def createCommit(self, repo_dir, commit_msg):
+ repo = Repo(repo_dir)
+ self.logger.info(
+ f"Creating commit '{commit_msg}' in branch '{repo.active_branch}'"
+ )
+ self.logger.debug(f"Current active branch: {repo.active_branch}")
+ # repo.index.add('**')
+ repo.git.add(all=True)
+ repo.index.commit(commit_msg)
+ self.logger.info(
+ f"Commit '{commit_msg}' created in branch '{repo.active_branch}'"
+ )
+ self.logger.debug(f"Current active branch: {repo.active_branch}")
+ return repo.active_branch
+
+ def mergeGit(self, repo_dir, git_branch):
+ repo = Repo(repo_dir)
+ self.logger.info(f"Merging local branch '{git_branch}' into main")
+ with_git = False
+ if with_git:
+ try:
+ repo.git("checkout main")
+ repo.git(f"merge {git_branch}")
+ return True
+ except Exception as e:
+ self.logger.error(e)
+ return False
+ else:
+ # prepare a merge
+ main = repo.heads.main # right-hand side is ahead of us, in the future
+ merge_base = repo.merge_base(git_branch, main) # three-way merge
+ repo.index.merge_tree(main, base=merge_base) # write the merge into index
+ try:
+ # The merge is done in the branch
+ repo.index.commit(
+ f"Merged {git_branch} and main",
+ parent_commits=(git_branch.commit, main.commit),
+ )
+ # Now, git_branch is ahed of master. Now let master point to the recent commit
+ aux_head = repo.create_head("aux")
+ main.commit = aux_head.commit
+ repo.delete_head(aux_head)
+ assert main.checkout()
+ return True
+ except Exception as e:
+ self.logger.error(e)
+ return False
+
+ def pushToRemote(self, repo_dir):
+ repo = Repo(repo_dir)
+ self.logger.info("Pushing the change to remote")
+ # repo.remotes.origin.push(refspec='{}:{}'.format(local_branch, remote_branch))
+ repo.remotes.origin.push()
+ self.logger.info("Push done")
+ return True