Feature 11073: Enhanced OSM declarative modelling for applications. App as first class citizen

Change-Id: I6b750f4d862692ab885e98afe3771ba817dd6535
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
diff --git a/osm_lcm/gitops.py b/osm_lcm/gitops.py
new file mode 100644
index 0000000..22d2717
--- /dev/null
+++ b/osm_lcm/gitops.py
@@ -0,0 +1,445 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import logging
+import tempfile
+from time import time
+import traceback
+from git import Repo
+from osm_lcm.lcm_utils import LcmBase
+from osm_lcm import odu_workflows
+from osm_lcm.data_utils.list_utils import find_in_list
+from osm_lcm.n2vc.kubectl import Kubectl
+import yaml
+from urllib.parse import quote
+
+
+class GitOpsLcm(LcmBase):
+    db_collection = "gitops"
+    workflow_status = None
+    resource_status = None
+
+    profile_collection_mapping = {
+        "infra_controller_profiles": "k8sinfra_controller",
+        "infra_config_profiles": "k8sinfra_config",
+        "resource_profiles": "k8sresource",
+        "app_profiles": "k8sapp",
+    }
+
+    profile_type_mapping = {
+        "infra-controllers": "infra_controller_profiles",
+        "infra-configs": "infra_config_profiles",
+        "managed-resources": "resource_profiles",
+        "applications": "app_profiles",
+    }
+
+    def __init__(self, msg, lcm_tasks, config):
+        self.logger = logging.getLogger("lcm.gitops")
+        self.lcm_tasks = lcm_tasks
+        self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+        self._checkloop_kustomization_timeout = 900
+        self._checkloop_resource_timeout = 900
+        self._workflows = {}
+        self.gitops_config = config["gitops"]
+        self.logger.debug(f"GitOps config: {self.gitops_config}")
+        self._repo_base_url = self.gitops_config.get("git_base_url")
+        self._repo_user = self.gitops_config.get("user")
+        self._repo_sw_catalogs_url = self.gitops_config.get(
+            "sw_catalogs_repo_url",
+            f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
+        )
+        self._repo_password = self.gitops_config.get("password", "OUM+O61Iy1")
+        self._full_repo_sw_catalogs_url = self.build_git_url_with_credentials(
+            self._repo_sw_catalogs_url
+        )
+        super().__init__(msg, self.logger)
+
+    def build_git_url_with_credentials(self, repo_url):
+        # Build authenticated URL if credentials were provided
+        if self._repo_password:
+            # URL-safe escape password
+            safe_user = quote(self._repo_user)
+            safe_pass = quote(self._repo_password)
+
+            # Insert credentials into the URL
+            # e.g. https://username:password@github.com/org/repo.git
+            auth_url = repo_url.replace("https://", f"https://{safe_user}:{safe_pass}@")
+            auth_url = repo_url.replace("http://", f"https://{safe_user}:{safe_pass}@")
+        else:
+            auth_url = repo_url
+        return auth_url
+
+    async def check_dummy_operation(self, op_id, op_params, content):
+        self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
+        return True, "OK"
+
+    def initialize_operation(self, item_id, op_id):
+        db_item = self.db.get_one(self.db_collection, {"_id": item_id})
+        operation = next(
+            (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
+            None,
+        )
+        operation["workflowState"] = "PROCESSING"
+        operation["resourceState"] = "NOT_READY"
+        operation["operationState"] = "IN_PROGRESS"
+        operation["gitOperationInfo"] = None
+        db_item["current_operation"] = operation["op_id"]
+        self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
+
+    def get_operation_params(self, item, operation_id):
+        operation_history = item.get("operationHistory", [])
+        operation = find_in_list(
+            operation_history, lambda op: op["op_id"] == operation_id
+        )
+        return operation.get("operationParams", {})
+
+    def get_operation_type(self, item, operation_id):
+        operation_history = item.get("operationHistory", [])
+        operation = find_in_list(
+            operation_history, lambda op: op["op_id"] == operation_id
+        )
+        return operation.get("operationType", {})
+
+    def update_state_operation_history(
+        self, content, op_id, workflow_state=None, resource_state=None
+    ):
+        self.logger.info(
+            f"Update state of operation {op_id} in Operation History in DB"
+        )
+        self.logger.info(
+            f"Workflow state: {workflow_state}. Resource state: {resource_state}"
+        )
+        self.logger.debug(f"Content: {content}")
+
+        op_num = 0
+        for operation in content["operationHistory"]:
+            self.logger.debug("Operations: {}".format(operation))
+            if operation["op_id"] == op_id:
+                self.logger.debug("Found operation number: {}".format(op_num))
+                if workflow_state is not None:
+                    operation["workflowState"] = workflow_state
+
+                if resource_state is not None:
+                    operation["resourceState"] = resource_state
+                break
+            op_num += 1
+        self.logger.debug("content: {}".format(content))
+
+        return content
+
+    def update_operation_history(
+        self, content, op_id, workflow_status=None, resource_status=None, op_end=True
+    ):
+        self.logger.info(
+            f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
+        )
+        self.logger.debug(f"Content: {content}")
+
+        op_num = 0
+        for operation in content["operationHistory"]:
+            self.logger.debug("Operations: {}".format(operation))
+            if operation["op_id"] == op_id:
+                self.logger.debug("Found operation number: {}".format(op_num))
+                if workflow_status is not None:
+                    if workflow_status:
+                        operation["workflowState"] = "COMPLETED"
+                        operation["result"] = True
+                    else:
+                        operation["workflowState"] = "ERROR"
+                        operation["operationState"] = "FAILED"
+                        operation["result"] = False
+
+                if resource_status is not None:
+                    if resource_status:
+                        operation["resourceState"] = "READY"
+                        operation["operationState"] = "COMPLETED"
+                        operation["result"] = True
+                    else:
+                        operation["resourceState"] = "NOT_READY"
+                        operation["operationState"] = "FAILED"
+                        operation["result"] = False
+
+                if op_end:
+                    now = time()
+                    operation["endDate"] = now
+                break
+            op_num += 1
+        self.logger.debug("content: {}".format(content))
+
+        return content
+
+    async def check_workflow_and_update_db(self, op_id, workflow_name, db_content):
+        workflow_status, workflow_msg = await self.odu.check_workflow_status(
+            op_id, workflow_name
+        )
+        self.logger.info(
+            "Workflow Status: {} Workflow Message: {}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        operation_type = self.get_operation_type(db_content, op_id)
+        if operation_type == "create" and workflow_status:
+            db_content["state"] = "CREATED"
+        elif operation_type == "create" and not workflow_status:
+            db_content["state"] = "FAILED_CREATION"
+        elif operation_type == "delete" and workflow_status:
+            db_content["state"] = "DELETED"
+        elif operation_type == "delete" and not workflow_status:
+            db_content["state"] = "FAILED_DELETION"
+
+        if workflow_status:
+            db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            db_content["resourceState"] = "ERROR"
+
+        db_content = self.update_operation_history(
+            db_content, op_id, workflow_status, None
+        )
+        self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+        return workflow_status
+
+    async def check_resource_and_update_db(
+        self, resource_name, op_id, op_params, db_content
+    ):
+        workflow_status = True
+
+        resource_status, resource_msg = await self.check_resource_status(
+            resource_name, op_id, op_params, db_content
+        )
+        self.logger.info(
+            "Resource Status: {} Resource Message: {}".format(
+                resource_status, resource_msg
+            )
+        )
+
+        if resource_status:
+            db_content["resourceState"] = "READY"
+        else:
+            db_content["resourceState"] = "ERROR"
+
+        db_content = self.update_operation_history(
+            db_content, op_id, workflow_status, resource_status
+        )
+        db_content["operatingState"] = "IDLE"
+        db_content["current_operation"] = None
+        return resource_status, db_content
+
+    async def common_check_list(
+        self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None
+    ):
+        try:
+            for checking in checkings_list:
+                if checking["enable"]:
+                    status, message = await self.odu.readiness_loop(
+                        op_id=op_id,
+                        item=checking["item"],
+                        name=checking["name"],
+                        namespace=checking["namespace"],
+                        condition=checking.get("condition"),
+                        deleted=checking.get("deleted", False),
+                        timeout=checking["timeout"],
+                        kubectl_obj=kubectl_obj,
+                    )
+                    if not status:
+                        error_message = "Resources not ready: "
+                        error_message += checking.get("error_message", "")
+                        return status, f"{error_message}: {message}"
+                    else:
+                        db_item["resourceState"] = checking["resourceState"]
+                        db_item = self.update_state_operation_history(
+                            db_item, op_id, None, checking["resourceState"]
+                        )
+                        self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
+        except Exception as e:
+            self.logger.debug(traceback.format_exc())
+            self.logger.debug(f"Exception: {e}", exc_info=True)
+            return False, f"Unexpected exception: {e}"
+        return True, "OK"
+
+    async def check_resource_status(self, key, op_id, op_params, content):
+        self.logger.info(
+            f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
+        )
+        self.logger.debug(f"Check resource status. Content: {content}")
+        check_resource_function = self._workflows.get(key, {}).get(
+            "check_resource_function"
+        )
+        self.logger.info("check_resource function : {}".format(check_resource_function))
+        if check_resource_function:
+            return await check_resource_function(op_id, op_params, content)
+        else:
+            return await self.check_dummy_operation(op_id, op_params, content)
+
+    def check_force_delete_and_delete_from_db(
+        self, _id, workflow_status, resource_status, force
+    ):
+        self.logger.info(
+            f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}"
+        )
+        if force and (not workflow_status or not resource_status):
+            self.db.del_one(self.db_collection, {"_id": _id})
+            return True
+        return False
+
+    def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
+        self.db.encrypt_decrypt_fields(
+            content,
+            "decrypt",
+            fields,
+            schema_version="1.11",
+            salt=content["_id"],
+        )
+
+    def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
+        self.db.encrypt_decrypt_fields(
+            content,
+            "encrypt",
+            fields,
+            schema_version="1.11",
+            salt=content["_id"],
+        )
+
+    def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
+        # This deep copy is intended to be passed to ODU workflows.
+        content_copy = copy.deepcopy(content)
+
+        # decrypting the key
+        self.db.encrypt_decrypt_fields(
+            content_copy,
+            "decrypt",
+            fields,
+            schema_version="1.11",
+            salt=content_copy["_id"],
+        )
+        return content_copy
+
+    def delete_ksu_dependency(self, _id, data):
+        used_oka = []
+        existing_oka = []
+
+        for oka_data in data["oka"]:
+            if oka_data.get("_id"):
+                used_oka.append(oka_data["_id"])
+
+        all_ksu_data = self.db.get_list("ksus", {})
+        for ksu_data in all_ksu_data:
+            if ksu_data["_id"] != _id:
+                for oka_data in ksu_data["oka"]:
+                    if oka_data.get("_id"):
+                        if oka_data["_id"] not in existing_oka:
+                            existing_oka.append(oka_data["_id"])
+
+        self.logger.info(f"Used OKA: {used_oka}")
+        self.logger.info(f"Existing OKA: {existing_oka}")
+
+        for oka_id in used_oka:
+            if oka_id not in existing_oka:
+                self.db.set_one(
+                    "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
+                )
+
+        return
+
+    def delete_profile_ksu(self, _id, profile_type):
+        filter_q = {"profile": {"_id": _id, "profile_type": profile_type}}
+        ksu_list = self.db.get_list("ksus", filter_q)
+        for ksu_data in ksu_list:
+            self.delete_ksu_dependency(ksu_data["_id"], ksu_data)
+
+        if ksu_list:
+            self.db.del_list("ksus", filter_q)
+        return
+
+    def cluster_kubectl(self, db_cluster):
+        cluster_kubeconfig = db_cluster["credentials"]
+        kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
+        with open(kubeconfig_path, "w") as kubeconfig_file:
+            yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
+        return Kubectl(config_file=kubeconfig_path)
+
+    def cloneGitRepo(self, repo_url, branch):
+        self.logger.debug(f"Cloning repo {repo_url}, branch {branch}")
+        tmpdir = tempfile.mkdtemp()
+        self.logger.debug(f"Created temp folder {tmpdir}")
+        cloned_repo = Repo.clone_from(
+            repo_url,
+            tmpdir,
+            allow_unsafe_options=True,
+            multi_options=["-c", "http.sslVerify=false"],
+        )
+        self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
+        assert cloned_repo
+        new_branch = cloned_repo.create_head(branch)  # create a new branch
+        assert new_branch.checkout() == cloned_repo.active_branch
+        self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
+        self.logger.info(f"Repo {repo_url} cloned in {tmpdir}. New branch: {branch}")
+        return tmpdir
+
+    def createCommit(self, repo_dir, commit_msg):
+        repo = Repo(repo_dir)
+        self.logger.info(
+            f"Creating commit '{commit_msg}' in branch '{repo.active_branch}'"
+        )
+        self.logger.debug(f"Current active branch: {repo.active_branch}")
+        # repo.index.add('**')
+        repo.git.add(all=True)
+        repo.index.commit(commit_msg)
+        self.logger.info(
+            f"Commit '{commit_msg}' created in branch '{repo.active_branch}'"
+        )
+        self.logger.debug(f"Current active branch: {repo.active_branch}")
+        return repo.active_branch
+
+    def mergeGit(self, repo_dir, git_branch):
+        repo = Repo(repo_dir)
+        self.logger.info(f"Merging local branch '{git_branch}' into main")
+        with_git = False
+        if with_git:
+            try:
+                repo.git("checkout main")
+                repo.git(f"merge {git_branch}")
+                return True
+            except Exception as e:
+                self.logger.error(e)
+                return False
+        else:
+            # prepare a merge
+            main = repo.heads.main  # right-hand side is ahead of us, in the future
+            merge_base = repo.merge_base(git_branch, main)  # three-way merge
+            repo.index.merge_tree(main, base=merge_base)  # write the merge into index
+            try:
+                # The merge is done in the branch
+                repo.index.commit(
+                    f"Merged {git_branch} and main",
+                    parent_commits=(git_branch.commit, main.commit),
+                )
+                # Now, git_branch is ahed of master. Now let master point to the recent commit
+                aux_head = repo.create_head("aux")
+                main.commit = aux_head.commit
+                repo.delete_head(aux_head)
+                assert main.checkout()
+                return True
+            except Exception as e:
+                self.logger.error(e)
+                return False
+
+    def pushToRemote(self, repo_dir):
+        repo = Repo(repo_dir)
+        self.logger.info("Pushing the change to remote")
+        # repo.remotes.origin.push(refspec='{}:{}'.format(local_branch, remote_branch))
+        repo.remotes.origin.push()
+        self.logger.info("Push done")
+        return True