Feature 11073: Enhanced OSM declarative modelling for applications. App as first... 93/15393/47 v19.0.0
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 17 Jul 2025 11:04:13 +0000 (13:04 +0200)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 29 Jan 2026 15:50:05 +0000 (16:50 +0100)
Change-Id: I6b750f4d862692ab885e98afe3771ba817dd6535
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
15 files changed:
Dockerfile.production
osm_lcm/gitops.py [new file with mode: 0644]
osm_lcm/k8s.py
osm_lcm/lcm.py
osm_lcm/lcm_utils.py
osm_lcm/n2vc/kubectl.py
osm_lcm/odu_libs/app.py [new file with mode: 0644]
osm_lcm/odu_libs/cluster_mgmt.py
osm_lcm/odu_libs/ksu.py
osm_lcm/odu_libs/nodegroup.py
osm_lcm/odu_libs/oka.py
osm_lcm/odu_libs/templates/launcher-app.j2 [new file with mode: 0644]
osm_lcm/odu_workflows.py
requirements.in
requirements.txt

index 8087ebf..d8139af 100644 (file)
@@ -101,6 +101,7 @@ WORKDIR /app
 RUN apk add --no-cache \
     bash \
     curl \
+    git \
     openssh-client \
     openssh-keygen \
     openssl
diff --git a/osm_lcm/gitops.py b/osm_lcm/gitops.py
new file mode 100644 (file)
index 0000000..22d2717
--- /dev/null
@@ -0,0 +1,445 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import logging
+import tempfile
+from time import time
+import traceback
+from git import Repo
+from osm_lcm.lcm_utils import LcmBase
+from osm_lcm import odu_workflows
+from osm_lcm.data_utils.list_utils import find_in_list
+from osm_lcm.n2vc.kubectl import Kubectl
+import yaml
+from urllib.parse import quote
+
+
+class GitOpsLcm(LcmBase):
+    db_collection = "gitops"
+    workflow_status = None
+    resource_status = None
+
+    profile_collection_mapping = {
+        "infra_controller_profiles": "k8sinfra_controller",
+        "infra_config_profiles": "k8sinfra_config",
+        "resource_profiles": "k8sresource",
+        "app_profiles": "k8sapp",
+    }
+
+    profile_type_mapping = {
+        "infra-controllers": "infra_controller_profiles",
+        "infra-configs": "infra_config_profiles",
+        "managed-resources": "resource_profiles",
+        "applications": "app_profiles",
+    }
+
+    def __init__(self, msg, lcm_tasks, config):
+        self.logger = logging.getLogger("lcm.gitops")
+        self.lcm_tasks = lcm_tasks
+        self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+        self._checkloop_kustomization_timeout = 900
+        self._checkloop_resource_timeout = 900
+        self._workflows = {}
+        self.gitops_config = config["gitops"]
+        self.logger.debug(f"GitOps config: {self.gitops_config}")
+        self._repo_base_url = self.gitops_config.get("git_base_url")
+        self._repo_user = self.gitops_config.get("user")
+        self._repo_sw_catalogs_url = self.gitops_config.get(
+            "sw_catalogs_repo_url",
+            f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
+        )
+        self._repo_password = self.gitops_config.get("password", "OUM+O61Iy1")
+        self._full_repo_sw_catalogs_url = self.build_git_url_with_credentials(
+            self._repo_sw_catalogs_url
+        )
+        super().__init__(msg, self.logger)
+
+    def build_git_url_with_credentials(self, repo_url):
+        # Build authenticated URL if credentials were provided
+        if self._repo_password:
+            # URL-safe escape password
+            safe_user = quote(self._repo_user)
+            safe_pass = quote(self._repo_password)
+
+            # Insert credentials into the URL
+            # e.g. https://username:password@github.com/org/repo.git
+            auth_url = repo_url.replace("https://", f"https://{safe_user}:{safe_pass}@")
+            auth_url = repo_url.replace("http://", f"https://{safe_user}:{safe_pass}@")
+        else:
+            auth_url = repo_url
+        return auth_url
+
+    async def check_dummy_operation(self, op_id, op_params, content):
+        self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
+        return True, "OK"
+
+    def initialize_operation(self, item_id, op_id):
+        db_item = self.db.get_one(self.db_collection, {"_id": item_id})
+        operation = next(
+            (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
+            None,
+        )
+        operation["workflowState"] = "PROCESSING"
+        operation["resourceState"] = "NOT_READY"
+        operation["operationState"] = "IN_PROGRESS"
+        operation["gitOperationInfo"] = None
+        db_item["current_operation"] = operation["op_id"]
+        self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
+
+    def get_operation_params(self, item, operation_id):
+        operation_history = item.get("operationHistory", [])
+        operation = find_in_list(
+            operation_history, lambda op: op["op_id"] == operation_id
+        )
+        return operation.get("operationParams", {})
+
+    def get_operation_type(self, item, operation_id):
+        operation_history = item.get("operationHistory", [])
+        operation = find_in_list(
+            operation_history, lambda op: op["op_id"] == operation_id
+        )
+        return operation.get("operationType", {})
+
+    def update_state_operation_history(
+        self, content, op_id, workflow_state=None, resource_state=None
+    ):
+        self.logger.info(
+            f"Update state of operation {op_id} in Operation History in DB"
+        )
+        self.logger.info(
+            f"Workflow state: {workflow_state}. Resource state: {resource_state}"
+        )
+        self.logger.debug(f"Content: {content}")
+
+        op_num = 0
+        for operation in content["operationHistory"]:
+            self.logger.debug("Operations: {}".format(operation))
+            if operation["op_id"] == op_id:
+                self.logger.debug("Found operation number: {}".format(op_num))
+                if workflow_state is not None:
+                    operation["workflowState"] = workflow_state
+
+                if resource_state is not None:
+                    operation["resourceState"] = resource_state
+                break
+            op_num += 1
+        self.logger.debug("content: {}".format(content))
+
+        return content
+
+    def update_operation_history(
+        self, content, op_id, workflow_status=None, resource_status=None, op_end=True
+    ):
+        self.logger.info(
+            f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
+        )
+        self.logger.debug(f"Content: {content}")
+
+        op_num = 0
+        for operation in content["operationHistory"]:
+            self.logger.debug("Operations: {}".format(operation))
+            if operation["op_id"] == op_id:
+                self.logger.debug("Found operation number: {}".format(op_num))
+                if workflow_status is not None:
+                    if workflow_status:
+                        operation["workflowState"] = "COMPLETED"
+                        operation["result"] = True
+                    else:
+                        operation["workflowState"] = "ERROR"
+                        operation["operationState"] = "FAILED"
+                        operation["result"] = False
+
+                if resource_status is not None:
+                    if resource_status:
+                        operation["resourceState"] = "READY"
+                        operation["operationState"] = "COMPLETED"
+                        operation["result"] = True
+                    else:
+                        operation["resourceState"] = "NOT_READY"
+                        operation["operationState"] = "FAILED"
+                        operation["result"] = False
+
+                if op_end:
+                    now = time()
+                    operation["endDate"] = now
+                break
+            op_num += 1
+        self.logger.debug("content: {}".format(content))
+
+        return content
+
+    async def check_workflow_and_update_db(self, op_id, workflow_name, db_content):
+        workflow_status, workflow_msg = await self.odu.check_workflow_status(
+            op_id, workflow_name
+        )
+        self.logger.info(
+            "Workflow Status: {} Workflow Message: {}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        operation_type = self.get_operation_type(db_content, op_id)
+        if operation_type == "create" and workflow_status:
+            db_content["state"] = "CREATED"
+        elif operation_type == "create" and not workflow_status:
+            db_content["state"] = "FAILED_CREATION"
+        elif operation_type == "delete" and workflow_status:
+            db_content["state"] = "DELETED"
+        elif operation_type == "delete" and not workflow_status:
+            db_content["state"] = "FAILED_DELETION"
+
+        if workflow_status:
+            db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            db_content["resourceState"] = "ERROR"
+
+        db_content = self.update_operation_history(
+            db_content, op_id, workflow_status, None
+        )
+        self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+        return workflow_status
+
+    async def check_resource_and_update_db(
+        self, resource_name, op_id, op_params, db_content
+    ):
+        workflow_status = True
+
+        resource_status, resource_msg = await self.check_resource_status(
+            resource_name, op_id, op_params, db_content
+        )
+        self.logger.info(
+            "Resource Status: {} Resource Message: {}".format(
+                resource_status, resource_msg
+            )
+        )
+
+        if resource_status:
+            db_content["resourceState"] = "READY"
+        else:
+            db_content["resourceState"] = "ERROR"
+
+        db_content = self.update_operation_history(
+            db_content, op_id, workflow_status, resource_status
+        )
+        db_content["operatingState"] = "IDLE"
+        db_content["current_operation"] = None
+        return resource_status, db_content
+
+    async def common_check_list(
+        self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None
+    ):
+        try:
+            for checking in checkings_list:
+                if checking["enable"]:
+                    status, message = await self.odu.readiness_loop(
+                        op_id=op_id,
+                        item=checking["item"],
+                        name=checking["name"],
+                        namespace=checking["namespace"],
+                        condition=checking.get("condition"),
+                        deleted=checking.get("deleted", False),
+                        timeout=checking["timeout"],
+                        kubectl_obj=kubectl_obj,
+                    )
+                    if not status:
+                        error_message = "Resources not ready: "
+                        error_message += checking.get("error_message", "")
+                        return status, f"{error_message}: {message}"
+                    else:
+                        db_item["resourceState"] = checking["resourceState"]
+                        db_item = self.update_state_operation_history(
+                            db_item, op_id, None, checking["resourceState"]
+                        )
+                        self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
+        except Exception as e:
+            self.logger.debug(traceback.format_exc())
+            self.logger.debug(f"Exception: {e}", exc_info=True)
+            return False, f"Unexpected exception: {e}"
+        return True, "OK"
+
+    async def check_resource_status(self, key, op_id, op_params, content):
+        self.logger.info(
+            f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
+        )
+        self.logger.debug(f"Check resource status. Content: {content}")
+        check_resource_function = self._workflows.get(key, {}).get(
+            "check_resource_function"
+        )
+        self.logger.info("check_resource function : {}".format(check_resource_function))
+        if check_resource_function:
+            return await check_resource_function(op_id, op_params, content)
+        else:
+            return await self.check_dummy_operation(op_id, op_params, content)
+
+    def check_force_delete_and_delete_from_db(
+        self, _id, workflow_status, resource_status, force
+    ):
+        self.logger.info(
+            f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}"
+        )
+        if force and (not workflow_status or not resource_status):
+            self.db.del_one(self.db_collection, {"_id": _id})
+            return True
+        return False
+
+    def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
+        self.db.encrypt_decrypt_fields(
+            content,
+            "decrypt",
+            fields,
+            schema_version="1.11",
+            salt=content["_id"],
+        )
+
+    def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
+        self.db.encrypt_decrypt_fields(
+            content,
+            "encrypt",
+            fields,
+            schema_version="1.11",
+            salt=content["_id"],
+        )
+
+    def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
+        # This deep copy is intended to be passed to ODU workflows.
+        content_copy = copy.deepcopy(content)
+
+        # decrypting the key
+        self.db.encrypt_decrypt_fields(
+            content_copy,
+            "decrypt",
+            fields,
+            schema_version="1.11",
+            salt=content_copy["_id"],
+        )
+        return content_copy
+
+    def delete_ksu_dependency(self, _id, data):
+        used_oka = []
+        existing_oka = []
+
+        for oka_data in data["oka"]:
+            if oka_data.get("_id"):
+                used_oka.append(oka_data["_id"])
+
+        all_ksu_data = self.db.get_list("ksus", {})
+        for ksu_data in all_ksu_data:
+            if ksu_data["_id"] != _id:
+                for oka_data in ksu_data["oka"]:
+                    if oka_data.get("_id"):
+                        if oka_data["_id"] not in existing_oka:
+                            existing_oka.append(oka_data["_id"])
+
+        self.logger.info(f"Used OKA: {used_oka}")
+        self.logger.info(f"Existing OKA: {existing_oka}")
+
+        for oka_id in used_oka:
+            if oka_id not in existing_oka:
+                self.db.set_one(
+                    "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
+                )
+
+        return
+
+    def delete_profile_ksu(self, _id, profile_type):
+        filter_q = {"profile": {"_id": _id, "profile_type": profile_type}}
+        ksu_list = self.db.get_list("ksus", filter_q)
+        for ksu_data in ksu_list:
+            self.delete_ksu_dependency(ksu_data["_id"], ksu_data)
+
+        if ksu_list:
+            self.db.del_list("ksus", filter_q)
+        return
+
+    def cluster_kubectl(self, db_cluster):
+        cluster_kubeconfig = db_cluster["credentials"]
+        kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
+        with open(kubeconfig_path, "w") as kubeconfig_file:
+            yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
+        return Kubectl(config_file=kubeconfig_path)
+
+    def cloneGitRepo(self, repo_url, branch):
+        self.logger.debug(f"Cloning repo {repo_url}, branch {branch}")
+        tmpdir = tempfile.mkdtemp()
+        self.logger.debug(f"Created temp folder {tmpdir}")
+        cloned_repo = Repo.clone_from(
+            repo_url,
+            tmpdir,
+            allow_unsafe_options=True,
+            multi_options=["-c", "http.sslVerify=false"],
+        )
+        self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
+        assert cloned_repo
+        new_branch = cloned_repo.create_head(branch)  # create a new branch
+        assert new_branch.checkout() == cloned_repo.active_branch
+        self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
+        self.logger.info(f"Repo {repo_url} cloned in {tmpdir}. New branch: {branch}")
+        return tmpdir
+
+    def createCommit(self, repo_dir, commit_msg):
+        repo = Repo(repo_dir)
+        self.logger.info(
+            f"Creating commit '{commit_msg}' in branch '{repo.active_branch}'"
+        )
+        self.logger.debug(f"Current active branch: {repo.active_branch}")
+        # repo.index.add('**')
+        repo.git.add(all=True)
+        repo.index.commit(commit_msg)
+        self.logger.info(
+            f"Commit '{commit_msg}' created in branch '{repo.active_branch}'"
+        )
+        self.logger.debug(f"Current active branch: {repo.active_branch}")
+        return repo.active_branch
+
+    def mergeGit(self, repo_dir, git_branch):
+        repo = Repo(repo_dir)
+        self.logger.info(f"Merging local branch '{git_branch}' into main")
+        with_git = False
+        if with_git:
+            try:
+                repo.git("checkout main")
+                repo.git(f"merge {git_branch}")
+                return True
+            except Exception as e:
+                self.logger.error(e)
+                return False
+        else:
+            # prepare a merge
+            main = repo.heads.main  # right-hand side is ahead of us, in the future
+            merge_base = repo.merge_base(git_branch, main)  # three-way merge
+            repo.index.merge_tree(main, base=merge_base)  # write the merge into index
+            try:
+                # The merge is done in the branch
+                repo.index.commit(
+                    f"Merged {git_branch} and main",
+                    parent_commits=(git_branch.commit, main.commit),
+                )
+                # Now, git_branch is ahed of master. Now let master point to the recent commit
+                aux_head = repo.create_head("aux")
+                main.commit = aux_head.commit
+                repo.delete_head(aux_head)
+                assert main.checkout()
+                return True
+            except Exception as e:
+                self.logger.error(e)
+                return False
+
+    def pushToRemote(self, repo_dir):
+        repo = Repo(repo_dir)
+        self.logger.info("Pushing the change to remote")
+        # repo.remotes.origin.push(refspec='{}:{}'.format(local_branch, remote_branch))
+        repo.remotes.origin.push()
+        self.logger.info("Push done")
+        return True
index c949793..6b02ec5 100644 (file)
@@ -18,17 +18,14 @@ __author__ = (
     "Shahithya Y <shahithya.y@tataelxsi.co.in>",
 )
 
-import copy
-import logging
+import os
 from time import time
 import traceback
-from osm_lcm.lcm_utils import LcmBase
+import yaml
 from copy import deepcopy
-from osm_lcm import odu_workflows
 from osm_lcm import vim_sdn
-from osm_lcm.data_utils.list_utils import find_in_list
-from osm_lcm.n2vc.kubectl import Kubectl
-import yaml
+from osm_lcm.gitops import GitOpsLcm
+from osm_lcm.lcm_utils import LcmException
 
 MAP_PROFILE = {
     "infra_controller_profiles": "infra-controllers",
@@ -38,323 +35,6 @@ MAP_PROFILE = {
 }
 
 
-class GitOpsLcm(LcmBase):
-    db_collection = "gitops"
-    workflow_status = None
-    resource_status = None
-
-    profile_collection_mapping = {
-        "infra_controller_profiles": "k8sinfra_controller",
-        "infra_config_profiles": "k8sinfra_config",
-        "resource_profiles": "k8sresource",
-        "app_profiles": "k8sapp",
-    }
-
-    profile_type_mapping = {
-        "infra-controllers": "infra_controller_profiles",
-        "infra-configs": "infra_config_profiles",
-        "managed-resources": "resource_profiles",
-        "applications": "app_profiles",
-    }
-
-    def __init__(self, msg, lcm_tasks, config):
-        self.logger = logging.getLogger("lcm.gitops")
-        self.lcm_tasks = lcm_tasks
-        self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
-        self._checkloop_kustomization_timeout = 900
-        self._checkloop_resource_timeout = 900
-        self._workflows = {}
-        super().__init__(msg, self.logger)
-
-    async def check_dummy_operation(self, op_id, op_params, content):
-        self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
-        return True, "OK"
-
-    def initialize_operation(self, item_id, op_id):
-        db_item = self.db.get_one(self.db_collection, {"_id": item_id})
-        operation = next(
-            (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
-            None,
-        )
-        operation["workflowState"] = "PROCESSING"
-        operation["resourceState"] = "NOT_READY"
-        operation["operationState"] = "IN_PROGRESS"
-        operation["gitOperationInfo"] = None
-        db_item["current_operation"] = operation["op_id"]
-        self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
-
-    def get_operation_params(self, item, operation_id):
-        operation_history = item.get("operationHistory", [])
-        operation = find_in_list(
-            operation_history, lambda op: op["op_id"] == operation_id
-        )
-        return operation.get("operationParams", {})
-
-    def get_operation_type(self, item, operation_id):
-        operation_history = item.get("operationHistory", [])
-        operation = find_in_list(
-            operation_history, lambda op: op["op_id"] == operation_id
-        )
-        return operation.get("operationType", {})
-
-    def update_state_operation_history(
-        self, content, op_id, workflow_state=None, resource_state=None
-    ):
-        self.logger.info(
-            f"Update state of operation {op_id} in Operation History in DB"
-        )
-        self.logger.info(
-            f"Workflow state: {workflow_state}. Resource state: {resource_state}"
-        )
-        self.logger.debug(f"Content: {content}")
-
-        op_num = 0
-        for operation in content["operationHistory"]:
-            self.logger.debug("Operations: {}".format(operation))
-            if operation["op_id"] == op_id:
-                self.logger.debug("Found operation number: {}".format(op_num))
-                if workflow_state is not None:
-                    operation["workflowState"] = workflow_state
-
-                if resource_state is not None:
-                    operation["resourceState"] = resource_state
-                break
-            op_num += 1
-        self.logger.debug("content: {}".format(content))
-
-        return content
-
-    def update_operation_history(
-        self, content, op_id, workflow_status=None, resource_status=None, op_end=True
-    ):
-        self.logger.info(
-            f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
-        )
-        self.logger.debug(f"Content: {content}")
-
-        op_num = 0
-        for operation in content["operationHistory"]:
-            self.logger.debug("Operations: {}".format(operation))
-            if operation["op_id"] == op_id:
-                self.logger.debug("Found operation number: {}".format(op_num))
-                if workflow_status is not None:
-                    if workflow_status:
-                        operation["workflowState"] = "COMPLETED"
-                        operation["result"] = True
-                    else:
-                        operation["workflowState"] = "ERROR"
-                        operation["operationState"] = "FAILED"
-                        operation["result"] = False
-
-                if resource_status is not None:
-                    if resource_status:
-                        operation["resourceState"] = "READY"
-                        operation["operationState"] = "COMPLETED"
-                        operation["result"] = True
-                    else:
-                        operation["resourceState"] = "NOT_READY"
-                        operation["operationState"] = "FAILED"
-                        operation["result"] = False
-
-                if op_end:
-                    now = time()
-                    operation["endDate"] = now
-                break
-            op_num += 1
-        self.logger.debug("content: {}".format(content))
-
-        return content
-
-    async def check_workflow_and_update_db(self, op_id, workflow_name, db_content):
-        workflow_status, workflow_msg = await self.odu.check_workflow_status(
-            op_id, workflow_name
-        )
-        self.logger.info(
-            "Workflow Status: {} Workflow Message: {}".format(
-                workflow_status, workflow_msg
-            )
-        )
-        operation_type = self.get_operation_type(db_content, op_id)
-        if operation_type == "create" and workflow_status:
-            db_content["state"] = "CREATED"
-        elif operation_type == "create" and not workflow_status:
-            db_content["state"] = "FAILED_CREATION"
-        elif operation_type == "delete" and workflow_status:
-            db_content["state"] = "DELETED"
-        elif operation_type == "delete" and not workflow_status:
-            db_content["state"] = "FAILED_DELETION"
-
-        if workflow_status:
-            db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
-        else:
-            db_content["resourceState"] = "ERROR"
-
-        db_content = self.update_operation_history(
-            db_content, op_id, workflow_status, None
-        )
-        self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
-        return workflow_status
-
-    async def check_resource_and_update_db(
-        self, resource_name, op_id, op_params, db_content
-    ):
-        workflow_status = True
-
-        resource_status, resource_msg = await self.check_resource_status(
-            resource_name, op_id, op_params, db_content
-        )
-        self.logger.info(
-            "Resource Status: {} Resource Message: {}".format(
-                resource_status, resource_msg
-            )
-        )
-
-        if resource_status:
-            db_content["resourceState"] = "READY"
-        else:
-            db_content["resourceState"] = "ERROR"
-
-        db_content = self.update_operation_history(
-            db_content, op_id, workflow_status, resource_status
-        )
-        db_content["operatingState"] = "IDLE"
-        db_content["current_operation"] = None
-        return resource_status, db_content
-
-    async def common_check_list(
-        self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None
-    ):
-        try:
-            for checking in checkings_list:
-                if checking["enable"]:
-                    status, message = await self.odu.readiness_loop(
-                        op_id=op_id,
-                        item=checking["item"],
-                        name=checking["name"],
-                        namespace=checking["namespace"],
-                        condition=checking.get("condition"),
-                        deleted=checking.get("deleted", False),
-                        timeout=checking["timeout"],
-                        kubectl_obj=kubectl_obj,
-                    )
-                    if not status:
-                        error_message = "Resources not ready: "
-                        error_message += checking.get("error_message", "")
-                        return status, f"{error_message}: {message}"
-                    else:
-                        db_item["resourceState"] = checking["resourceState"]
-                        db_item = self.update_state_operation_history(
-                            db_item, op_id, None, checking["resourceState"]
-                        )
-                        self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
-        except Exception as e:
-            self.logger.debug(traceback.format_exc())
-            self.logger.debug(f"Exception: {e}", exc_info=True)
-            return False, f"Unexpected exception: {e}"
-        return True, "OK"
-
-    async def check_resource_status(self, key, op_id, op_params, content):
-        self.logger.info(
-            f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
-        )
-        self.logger.debug(f"Check resource status. Content: {content}")
-        check_resource_function = self._workflows.get(key, {}).get(
-            "check_resource_function"
-        )
-        self.logger.info("check_resource function : {}".format(check_resource_function))
-        if check_resource_function:
-            return await check_resource_function(op_id, op_params, content)
-        else:
-            return await self.check_dummy_operation(op_id, op_params, content)
-
-    def check_force_delete_and_delete_from_db(
-        self, _id, workflow_status, resource_status, force
-    ):
-        self.logger.info(
-            f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}"
-        )
-        if force and (not workflow_status or not resource_status):
-            self.db.del_one(self.db_collection, {"_id": _id})
-            return True
-        return False
-
-    def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
-        self.db.encrypt_decrypt_fields(
-            content,
-            "decrypt",
-            fields,
-            schema_version="1.11",
-            salt=content["_id"],
-        )
-
-    def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
-        self.db.encrypt_decrypt_fields(
-            content,
-            "encrypt",
-            fields,
-            schema_version="1.11",
-            salt=content["_id"],
-        )
-
-    def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
-        # This deep copy is intended to be passed to ODU workflows.
-        content_copy = copy.deepcopy(content)
-
-        # decrypting the key
-        self.db.encrypt_decrypt_fields(
-            content_copy,
-            "decrypt",
-            fields,
-            schema_version="1.11",
-            salt=content_copy["_id"],
-        )
-        return content_copy
-
-    def delete_ksu_dependency(self, _id, data):
-        used_oka = []
-        existing_oka = []
-
-        for oka_data in data["oka"]:
-            if oka_data.get("_id"):
-                used_oka.append(oka_data["_id"])
-
-        all_ksu_data = self.db.get_list("ksus", {})
-        for ksu_data in all_ksu_data:
-            if ksu_data["_id"] != _id:
-                for oka_data in ksu_data["oka"]:
-                    if oka_data.get("_id"):
-                        if oka_data["_id"] not in existing_oka:
-                            existing_oka.append(oka_data["_id"])
-
-        self.logger.info(f"Used OKA: {used_oka}")
-        self.logger.info(f"Existing OKA: {existing_oka}")
-
-        for oka_id in used_oka:
-            if oka_id not in existing_oka:
-                self.db.set_one(
-                    "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
-                )
-
-        return
-
-    def delete_profile_ksu(self, _id, profile_type):
-        filter_q = {"profile": {"_id": _id, "profile_type": profile_type}}
-        ksu_list = self.db.get_list("ksus", filter_q)
-        for ksu_data in ksu_list:
-            self.delete_ksu_dependency(ksu_data["_id"], ksu_data)
-
-        if ksu_list:
-            self.db.del_list("ksus", filter_q)
-        return
-
-    def cluster_kubectl(self, db_cluster):
-        cluster_kubeconfig = db_cluster["credentials"]
-        kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
-        with open(kubeconfig_path, "w") as kubeconfig_file:
-            yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
-        return Kubectl(config_file=kubeconfig_path)
-
-
 class NodeGroupLcm(GitOpsLcm):
     db_collection = "nodegroups"
 
@@ -405,7 +85,7 @@ class NodeGroupLcm(GitOpsLcm):
         }
         self.logger.info(f"Workflow content: {workflow_content}")
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "add_nodegroup", op_id, op_params, workflow_content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -502,7 +182,7 @@ class NodeGroupLcm(GitOpsLcm):
         }
         self.logger.info(f"Workflow content: {workflow_content}")
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "scale_nodegroup", op_id, op_params, workflow_content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -588,7 +268,7 @@ class NodeGroupLcm(GitOpsLcm):
 
         workflow_content = {"nodegroup": db_nodegroup, "cluster": db_cluster}
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_nodegroup", op_id, op_params, workflow_content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -713,7 +393,7 @@ class ClusterLcm(GitOpsLcm):
         db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
         workflow_content["vim_account"] = db_vim
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "create_cluster", op_id, op_params, workflow_content
         )
         if not workflow_res:
@@ -998,7 +678,7 @@ class ClusterLcm(GitOpsLcm):
             self.logger.debug(f"Exception: {e}", exc_info=True)
             raise e
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_cluster", op_id, op_params, workflow_content
         )
         if not workflow_res:
@@ -1197,7 +877,7 @@ class ClusterLcm(GitOpsLcm):
         # content["profile"] = db_profile
         workflow_content["profile"] = db_profile
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "attach_profile_to_cluster", op_id, op_params, workflow_content
         )
         if not workflow_res:
@@ -1284,7 +964,7 @@ class ClusterLcm(GitOpsLcm):
         db_profile["profile_type"] = profile_type
         workflow_content["profile"] = db_profile
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "detach_profile_from_cluster", op_id, op_params, workflow_content
         )
         if not workflow_res:
@@ -1367,7 +1047,7 @@ class ClusterLcm(GitOpsLcm):
             "cluster": db_cluster_copy,
         }
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "register_cluster", op_id, op_params, workflow_content
         )
         if not workflow_res:
@@ -1503,7 +1183,7 @@ class ClusterLcm(GitOpsLcm):
             "cluster": self.decrypted_copy(db_cluster),
         }
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "deregister_cluster", op_id, op_params, workflow_content
         )
         if not workflow_res:
@@ -1603,7 +1283,7 @@ class ClusterLcm(GitOpsLcm):
         db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
         workflow_content["vim_account"] = db_vim
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "update_cluster", op_id, op_params, workflow_content
         )
         if not workflow_res:
@@ -1795,7 +1475,7 @@ class CloudCredentialsLcm(GitOpsLcm):
             salt=vim_id,
         )
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "create_cloud_credentials", op_id, op_params, db_content
         )
 
@@ -1848,7 +1528,7 @@ class CloudCredentialsLcm(GitOpsLcm):
             salt=vim_id,
         )
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "update_cloud_credentials", op_id, op_params, db_content
         )
         workflow_status, workflow_msg = await self.odu.check_workflow_status(
@@ -1884,7 +1564,7 @@ class CloudCredentialsLcm(GitOpsLcm):
         op_params = params
         db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_cloud_credentials", op_id, op_params, db_content
         )
         workflow_status, workflow_msg = await self.odu.check_workflow_status(
@@ -1919,7 +1599,7 @@ class K8sAppLcm(GitOpsLcm):
         super().__init__(msg, lcm_tasks, config)
 
     async def create(self, params, order_id):
-        self.logger.info("App Create Enter")
+        self.logger.info("App Profile Create Enter")
 
         op_id = params["operation_id"]
         profile_id = params["profile_id"]
@@ -1932,7 +1612,7 @@ class K8sAppLcm(GitOpsLcm):
         op_params = self.get_operation_params(content, op_id)
         self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "create_profile", op_id, op_params, content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -1946,7 +1626,9 @@ class K8sAppLcm(GitOpsLcm):
                 "create_profile", op_id, op_params, content
             )
         self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
-        self.logger.info(f"App Create Exit with resource status: {resource_status}")
+        self.logger.info(
+            f"App Profile Create Exit with resource status: {resource_status}"
+        )
         return
 
     async def delete(self, params, order_id):
@@ -1961,7 +1643,7 @@ class K8sAppLcm(GitOpsLcm):
         content = self.db.get_one("k8sapp", {"_id": profile_id})
         op_params = self.get_operation_params(content, op_id)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_profile", op_id, op_params, content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -1990,7 +1672,9 @@ class K8sAppLcm(GitOpsLcm):
             self.delete_profile_ksu(profile_id, profile_type)
             self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
             self.db.del_one(self.db_collection, {"_id": content["_id"]})
-        self.logger.info(f"App Delete Exit with resource status: {resource_status}")
+        self.logger.info(
+            f"App Profile Delete Exit with resource status: {resource_status}"
+        )
         return
 
 
@@ -2006,7 +1690,7 @@ class K8sResourceLcm(GitOpsLcm):
         super().__init__(msg, lcm_tasks, config)
 
     async def create(self, params, order_id):
-        self.logger.info("Resource Create Enter")
+        self.logger.info("Resource Profile Create Enter")
 
         op_id = params["operation_id"]
         profile_id = params["profile_id"]
@@ -2019,7 +1703,7 @@ class K8sResourceLcm(GitOpsLcm):
         op_params = self.get_operation_params(content, op_id)
         self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "create_profile", op_id, op_params, content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -2050,7 +1734,7 @@ class K8sResourceLcm(GitOpsLcm):
         content = self.db.get_one("k8sresource", {"_id": profile_id})
         op_params = self.get_operation_params(content, op_id)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_profile", op_id, op_params, content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -2096,7 +1780,7 @@ class K8sInfraControllerLcm(GitOpsLcm):
         super().__init__(msg, lcm_tasks, config)
 
     async def create(self, params, order_id):
-        self.logger.info("Infra controller Create Enter")
+        self.logger.info("Infra controller Profile Create Enter")
 
         op_id = params["operation_id"]
         profile_id = params["profile_id"]
@@ -2109,7 +1793,7 @@ class K8sInfraControllerLcm(GitOpsLcm):
         op_params = self.get_operation_params(content, op_id)
         self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "create_profile", op_id, op_params, content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -2140,7 +1824,7 @@ class K8sInfraControllerLcm(GitOpsLcm):
         content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
         op_params = self.get_operation_params(content, op_id)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_profile", op_id, op_params, content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -2186,7 +1870,7 @@ class K8sInfraConfigLcm(GitOpsLcm):
         super().__init__(msg, lcm_tasks, config)
 
     async def create(self, params, order_id):
-        self.logger.info("Infra config Create Enter")
+        self.logger.info("Infra config Profile Create Enter")
 
         op_id = params["operation_id"]
         profile_id = params["profile_id"]
@@ -2199,7 +1883,7 @@ class K8sInfraConfigLcm(GitOpsLcm):
         op_params = self.get_operation_params(content, op_id)
         self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "create_profile", op_id, op_params, content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -2230,7 +1914,7 @@ class K8sInfraConfigLcm(GitOpsLcm):
         content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
         op_params = self.get_operation_params(content, op_id)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_profile", op_id, op_params, content
         )
         self.logger.info("workflow_name is: {}".format(workflow_name))
@@ -2284,7 +1968,7 @@ class OkaLcm(GitOpsLcm):
         db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
         op_params = self.get_operation_params(db_content, op_id)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "create_oka", op_id, op_params, db_content
         )
 
@@ -2316,7 +2000,7 @@ class OkaLcm(GitOpsLcm):
         db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
         op_params = self.get_operation_params(db_content, op_id)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "update_oka", op_id, op_params, db_content
         )
         workflow_status = await self.check_workflow_and_update_db(
@@ -2346,7 +2030,7 @@ class OkaLcm(GitOpsLcm):
         db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
         op_params = self.get_operation_params(db_content, op_id)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_oka", op_id, op_params, db_content
         )
         workflow_status = await self.check_workflow_and_update_db(
@@ -2451,7 +2135,7 @@ class KsuLcm(GitOpsLcm):
             op_params.append(ksu_params)
 
         # A single workflow is launched for all KSUs
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "create_ksus", op_id, op_params, db_content
         )
         # Update workflow status in all KSUs
@@ -2519,7 +2203,7 @@ class KsuLcm(GitOpsLcm):
                     ] = f"{oka_type}/{db_oka['git_name']}/templates"
             op_params.append(ksu_params)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "update_ksus", op_id, op_params, db_content
         )
 
@@ -2572,7 +2256,7 @@ class KsuLcm(GitOpsLcm):
             ksu_params["profile"]["name"] = db_profile["name"]
             op_params.append(ksu_params)
 
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "delete_ksus", op_id, op_params, db_content
         )
 
@@ -2610,7 +2294,7 @@ class KsuLcm(GitOpsLcm):
         self.initialize_operation(ksus_id, op_id)
         db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
         op_params = self.get_operation_params(db_content, op_id)
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "clone_ksus", op_id, op_params, db_content
         )
 
@@ -2634,7 +2318,7 @@ class KsuLcm(GitOpsLcm):
         self.initialize_operation(ksus_id, op_id)
         db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
         op_params = self.get_operation_params(db_content, op_id)
-        workflow_res, workflow_name = await self.odu.launch_workflow(
+        workflow_res, workflow_name, _ = await self.odu.launch_workflow(
             "move_ksus", op_id, op_params, db_content
         )
 
@@ -2753,3 +2437,309 @@ class KsuLcm(GitOpsLcm):
                 self.logger.error(e)
                 return False, f"Error checking KSU in cluster {db_cluster['name']}."
         return True, "OK"
+
+
+class AppInstanceLcm(GitOpsLcm):
+    db_collection = "appinstances"
+
+    def __init__(self, msg, lcm_tasks, config):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+        super().__init__(msg, lcm_tasks, config)
+        self._workflows = {
+            "create_app": {
+                "check_resource_function": self.check_create_app,
+            },
+            "update_app": {
+                "check_resource_function": self.check_update_app,
+            },
+            "delete_app": {
+                "check_resource_function": self.check_delete_app,
+            },
+        }
+
+    def get_dbclusters_from_profile(self, profile_id, profile_type):
+        cluster_list = []
+        db_clusters = self.db.get_list("clusters")
+        self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}")
+        for db_cluster in db_clusters:
+            if profile_id in db_cluster.get(profile_type, []):
+                self.logger.info(
+                    f"Profile {profile_id} found in cluster {db_cluster['name']}"
+                )
+                cluster_list.append(db_cluster)
+        return cluster_list
+
+    def update_app_dependency(self, app_id, db_app):
+        self.logger.info(f"Updating AppInstance dependencies for AppInstance {app_id}")
+        oka_id = db_app.get("oka")
+        if not oka_id:
+            self.logger.info(f"No OKA associated with AppInstance {app_id}")
+            return
+
+        used_oka = []
+        all_apps = self.db.get_list(self.db_collection, {})
+        for app in all_apps:
+            if app["_id"] != app_id:
+                app_oka_id = app["oka"]
+                if app_oka_id not in used_oka:
+                    used_oka.append(app_oka_id)
+        self.logger.info(f"Used OKA: {used_oka}")
+
+        if oka_id not in used_oka:
+            self.db.set_one(
+                "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
+            )
+        return
+
+    async def generic_operation(self, params, order_id, operation_name):
+        self.logger.info(f"Generic operation. Operation name: {operation_name}")
+        # self.logger.debug(f"Params: {params}")
+        try:
+            op_id = params["operation_id"]
+            app_id = params["appinstance"]
+            self.initialize_operation(app_id, op_id)
+            db_app = self.db.get_one(self.db_collection, {"_id": app_id})
+            # self.logger.debug("Db App: {}".format(db_app))
+
+            # Initialize workflow_content with a copy of the db_app, decrypting fields to use in workflows
+            db_app_copy = self.decrypted_copy(db_app)
+            workflow_content = {
+                "app": db_app_copy,
+            }
+
+            # Update workflow_content with profile info
+            profile_type = db_app["profile_type"]
+            profile_id = db_app["profile"]
+            profile_collection = self.profile_collection_mapping[profile_type]
+            db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+            # db_profile is decrypted inline
+            # No need to use decrypted_copy because db_profile won't be updated.
+            self.decrypt_age_keys(db_profile)
+            workflow_content["profile"] = db_profile
+
+            op_params = self.get_operation_params(db_app, op_id)
+            if not op_params:
+                op_params = {}
+            self.logger.debug("Operation Params: {}".format(op_params))
+
+            # Get SW catalog path from op_params or from DB
+            aux_dict = {}
+            if operation_name == "create_app":
+                aux_dict = op_params
+            else:
+                aux_dict = db_app
+            sw_catalog_path = ""
+            if "sw_catalog_path" in aux_dict:
+                sw_catalog_path = aux_dict.get("sw_catalog_path", "")
+            elif "oka" in aux_dict:
+                oka_id = aux_dict["oka"]
+                db_oka = self.db.get_one("okas", {"_id": oka_id})
+                oka_type = MAP_PROFILE[
+                    db_oka.get("profile_type", "infra_controller_profiles")
+                ]
+                sw_catalog_path = f"{oka_type}/{db_oka['git_name'].lower()}"
+            else:
+                self.logger.error("SW Catalog path could not be determined.")
+                raise LcmException("SW Catalog path could not be determined.")
+            self.logger.debug(f"SW Catalog path: {sw_catalog_path}")
+
+            # Get model from Git repo
+            # Clone the SW catalog repo
+            repodir = self.cloneGitRepo(
+                repo_url=self._full_repo_sw_catalogs_url, branch="main"
+            )
+            model_file_path = os.path.join(repodir, sw_catalog_path, "model.yaml")
+            if not os.path.exists(model_file_path):
+                self.logger.error(f"Model file not found at path: {model_file_path}")
+                raise LcmException(f"Model file not found at path: {model_file_path}")
+            # Store the model content in workflow_content
+            with open(model_file_path) as model_file:
+                workflow_content["model"] = yaml.safe_load(model_file.read())
+
+            # A single workflow is launched for the App operation
+            self.logger.debug("Launching workflow {}".format(operation_name))
+            (
+                workflow_res,
+                workflow_name,
+                workflow_resources,
+            ) = await self.odu.launch_workflow(
+                operation_name, op_id, op_params, workflow_content
+            )
+
+            if not workflow_res:
+                self.logger.error(f"Failed to launch workflow: {workflow_name}")
+                if operation_name == "create_app":
+                    db_app["state"] = "FAILED_CREATION"
+                elif operation_name == "delete_app":
+                    db_app["state"] = "FAILED_DELETION"
+                db_app["resourceState"] = "ERROR"
+                db_app = self.update_operation_history(
+                    db_app, op_id, workflow_status=False, resource_status=None
+                )
+                self.db.set_one(self.db_collection, {"_id": db_app["_id"]}, db_app)
+                # Clean items used in the workflow, no matter if the workflow succeeded
+                clean_status, clean_msg = await self.odu.clean_items_workflow(
+                    operation_name, op_id, op_params, workflow_content
+                )
+                self.logger.info(
+                    f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
+                )
+                return
+
+            # Update resources created in workflow
+            db_app["app_model"] = workflow_resources.get("app_model", {})
+
+            # Update workflow status in App
+            workflow_status = await self.check_workflow_and_update_db(
+                op_id, workflow_name, db_app
+            )
+            # Update resource status in DB
+            if workflow_status:
+                resource_status, db_app = await self.check_resource_and_update_db(
+                    operation_name, op_id, op_params, db_app
+                )
+            else:
+                resource_status = False
+            self.db.set_one(self.db_collection, {"_id": db_app["_id"]}, db_app)
+
+            # Clean items used in the workflow, no matter if the workflow succeeded
+            clean_status, clean_msg = await self.odu.clean_items_workflow(
+                operation_name, op_id, op_params, workflow_content
+            )
+            self.logger.info(
+                f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
+            )
+
+            if operation_name == "delete_app":
+                force = params.get("force", False)
+                if force:
+                    force_delete_status = self.check_force_delete_and_delete_from_db(
+                        db_app["_id"], workflow_status, resource_status, force
+                    )
+                    if force_delete_status:
+                        return
+                if resource_status:
+                    db_app["state"] == "DELETED"
+                    self.update_app_dependency(db_app["_id"], db_app)
+                    self.db.del_one(self.db_collection, {"_id": db_app["_id"]})
+
+            self.logger.info(
+                f"Generic app operation Exit {operation_name} with resource Status {resource_status}"
+            )
+            return
+        except Exception as e:
+            self.logger.debug(traceback.format_exc())
+            self.logger.debug(f"Exception: {e}", exc_info=True)
+            return
+
+    async def create(self, params, order_id):
+        self.logger.info("App Create Enter")
+        return await self.generic_operation(params, order_id, "create_app")
+
+    async def update(self, params, order_id):
+        self.logger.info("App Edit Enter")
+        return await self.generic_operation(params, order_id, "update_app")
+
+    async def delete(self, params, order_id):
+        self.logger.info("App Delete Enter")
+        return await self.generic_operation(params, order_id, "delete_app")
+
+    async def check_appinstance(self, op_id, op_params, content, deleted=False):
+        self.logger.info(
+            f"check_app_instance Operation {op_id}. Params: {op_params}. Deleted: {deleted}"
+        )
+        self.logger.debug(f"Content: {content}")
+        db_app = content
+        profile_id = db_app["profile"]
+        profile_type = db_app["profile_type"]
+        app_name = db_app["name"]
+        self.logger.info(
+            f"Checking status of AppInstance {app_name} for profile {profile_id}."
+        )
+
+        # TODO: read app_model and get kustomization name and namespace
+        # app_model = db_app.get("app_model", {})
+        kustomization_list = [
+            {
+                "name": f"jenkins-{app_name}",
+                "namespace": "flux-system",
+            }
+        ]
+        checkings_list = []
+        if deleted:
+            for kustomization in kustomization_list:
+                checkings_list.append(
+                    {
+                        "item": "kustomization",
+                        "name": kustomization["name"].lower(),
+                        "namespace": kustomization["namespace"],
+                        "deleted": True,
+                        "timeout": self._checkloop_kustomization_timeout,
+                        "enable": True,
+                        "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
+                    }
+                )
+        else:
+            for kustomization in kustomization_list:
+                checkings_list.append(
+                    {
+                        "item": "kustomization",
+                        "name": kustomization["name"].lower(),
+                        "namespace": kustomization["namespace"],
+                        "condition": {
+                            "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
+                            "value": "True",
+                        },
+                        "timeout": self._checkloop_kustomization_timeout,
+                        "enable": True,
+                        "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
+                    }
+                )
+
+        dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
+        if not dbcluster_list:
+            self.logger.info(f"No clusters found for profile {profile_id}.")
+        for db_cluster in dbcluster_list:
+            try:
+                self.logger.info(
+                    f"Checking status of AppInstance {app_name} in cluster {db_cluster['name']}."
+                )
+                cluster_kubectl = self.cluster_kubectl(db_cluster)
+                result, message = await self.common_check_list(
+                    op_id,
+                    checkings_list,
+                    self.db_collection,
+                    db_app,
+                    kubectl_obj=cluster_kubectl,
+                )
+                if not result:
+                    return False, message
+            except Exception as e:
+                self.logger.error(
+                    f"Error checking AppInstance in cluster {db_cluster['name']}."
+                )
+                self.logger.error(e)
+                return (
+                    False,
+                    f"Error checking AppInstance in cluster {db_cluster['name']}.",
+                )
+        return True, "OK"
+
+    async def check_create_app(self, op_id, op_params, content):
+        self.logger.info(f"check_update_app Operation {op_id}. Params: {op_params}.")
+        # self.logger.debug(f"Content: {content}")
+        return await self.check_appinstance(op_id, op_params, content)
+
+    async def check_update_app(self, op_id, op_params, content):
+        self.logger.info(f"check_update_app Operation {op_id}. Params: {op_params}.")
+        # self.logger.debug(f"Content: {content}")
+        return await self.check_appinstance(op_id, op_params, content)
+
+    async def check_delete_app(self, op_id, op_params, content):
+        self.logger.info(f"check_delete_app Operation {op_id}. Params: {op_params}.")
+        # self.logger.debug(f"Content: {content}")
+        return await self.check_appinstance(op_id, op_params, content, deleted=True)
index 2d26e1b..45fd9ad 100644 (file)
@@ -959,6 +959,27 @@ class Lcm:
                 task = asyncio.ensure_future(self.ksu.move(params, order_id))
                 self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task)
                 return
+        elif topic == "appinstance":
+            op_id = params["operation_id"]
+            appinstance_id = params["appinstance"]
+            if command == "create":
+                task = asyncio.ensure_future(self.appinstance.create(params, order_id))
+                self.lcm_tasks.register(
+                    "appinstance", appinstance_id, op_id, "app_create", task
+                )
+                return
+            elif command == "update" or command == "updated":
+                task = asyncio.ensure_future(self.appinstance.update(params, order_id))
+                self.lcm_tasks.register(
+                    "appinstance", appinstance_id, op_id, "app_edit", task
+                )
+                return
+            elif command == "delete":
+                task = asyncio.ensure_future(self.appinstance.delete(params, order_id))
+                self.lcm_tasks.register(
+                    "appinstance", appinstance_id, op_id, "app_delete", task
+                )
+                return
         elif topic == "nodegroup":
             nodegroup_id = params["nodegroup_id"]
             op_id = params["operation_id"]
@@ -1009,6 +1030,7 @@ class Lcm:
                     "k8s_infra_config",
                     "oka",
                     "ksu",
+                    "appinstance",
                     "nodegroup",
                 )
                 self.logger.debug(
@@ -1095,6 +1117,9 @@ class Lcm:
         )
         self.oka = k8s.OkaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
         self.ksu = k8s.KsuLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+        self.appinstance = k8s.AppInstanceLcm(
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
+        )
         self.nodegroup = k8s.NodeGroupLcm(
             self.msg, self.lcm_tasks, self.main_config.to_dict()
         )
index d37b9b4..ef1200b 100644 (file)
@@ -504,6 +504,7 @@ class TaskRegistry(LcmBase):
         "k8s_infra_config",
         "oka",
         "ksu",
+        "appinstance",
     ]
 
     # Map topic to InstanceID
@@ -526,6 +527,7 @@ class TaskRegistry(LcmBase):
         "k8s_infra_config": "k8sinfra_config",
         "oka": "oka",
         "ksu": "ksus",
+        "appinstance": "appinstances",
     }
 
     def __init__(self, worker_id=None, logger=None):
@@ -546,6 +548,7 @@ class TaskRegistry(LcmBase):
             "oka": {},
             "ksu": {},
             "odu": {},
+            "appinstance": {},
         }
         self.worker_id = worker_id
         self.db = Database().instance.db
index eac857a..98c1240 100644 (file)
@@ -23,6 +23,7 @@ import json
 import yaml
 import tarfile
 import io
+import os
 from time import sleep
 
 from distutils.version import LooseVersion
@@ -1006,34 +1007,74 @@ class Kubectl:
 
     def copy_file_to_pod(
         self, namespace, pod_name, container_name, src_file, dest_path
-    ):
-        # Create an in-memory tar file containing the source file
-        tar_buffer = io.BytesIO()
-        with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
-            tar.add(src_file, arcname=dest_path.split("/")[-1])
-
-        tar_buffer.seek(0)
-
-        # Define the command to extract the tar file in the pod
-        exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
-
-        # Execute the command
-        resp = stream(
-            self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
-            pod_name,
-            namespace,
-            command=exec_command,
-            container=container_name,
-            stdin=True,
-            stderr=True,
-            stdout=True,
-            tty=False,
-            _preload_content=False,
-        )
+    ) -> bool:
+        try:
+            # Create the destination directory in the pod
+            dest_dir = os.path.dirname(dest_path)
+            if dest_dir:
+                self.logger.debug(f"Creating directory {dest_dir} in pod {pod_name}")
+                mkdir_command = ["mkdir", "-p", dest_dir]
+
+                resp = stream(
+                    self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
+                    pod_name,
+                    namespace,
+                    command=mkdir_command,
+                    container=container_name,
+                    stdin=False,
+                    stderr=True,
+                    stdout=True,
+                    tty=False,
+                    _preload_content=False,
+                )
+                resp.close()
+                self.logger.debug(f"Directory {dest_dir} created")
 
-        # Write the tar data to the pod
-        resp.write_stdin(tar_buffer.read())
-        resp.close()
+            # Create an in-memory tar file containing the source file
+            self.logger.debug(
+                f"Creating in-memory tar of {src_file} as {dest_path.split('/')[-1]}"
+            )
+            tar_buffer = io.BytesIO()
+            with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
+                tar.add(src_file, arcname=dest_path.split("/")[-1])
+
+            tar_buffer.seek(0)
+            self.logger.debug(
+                f"Tar buffer created, size={tar_buffer.getbuffer().nbytes} bytes"
+            )
+
+            # Define the command to extract the tar file in the pod
+            exec_command = ["tar", "xvf", "-", "-C", dest_dir]
+            self.logger.debug(f"Exec command prepared: {exec_command}")
+
+            # Execute the command
+            resp = stream(
+                self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
+                pod_name,
+                namespace,
+                command=exec_command,
+                container=container_name,
+                stdin=True,
+                stderr=True,
+                stdout=True,
+                tty=False,
+                _preload_content=False,
+            )
+            self.logger.debug(
+                f"Started exec stream to pod {pod_name} (ns={namespace}, container={container_name})"
+            )
+
+            # Write the tar data to the pod
+            data = tar_buffer.read()
+            self.logger.debug(f"Writing {len(data)} bytes to pod stdin")
+            resp.write_stdin(data)
+            self.logger.debug("Data written to pod stdin")
+            resp.close()
+            self.logger.debug("Exec stream closed")
+            return True
+        except Exception as e:
+            self.logger.error(f"Failed to copy file {src_file} to pod: {e}")
+            return False
 
     @retry(
         attempts=10,
@@ -1041,15 +1082,15 @@ class Kubectl:
         fallback=Exception("Failed creating the pvc"),
     )
     async def create_pvc_with_content(
-        self, name: str, namespace: str, src_file: str, dest_filename: str
+        self, name: str, namespace: str, src_files: typing.List, dest_files: typing.List
     ):
         """
         Create a PVC with content
 
         :param: name:       Name of the pvc to be created
         :param: namespace:  Name of the namespace where the pvc will be stored
-        :param: src_file:   File to be copied
-        :param: filename:   Name of the file in the destination folder
+        :param: src_files:  List of source files to be copied
+        :param: dest_files: List of destination filenames (paired with src_files)
         """
         pod_name = f"copy-pod-{name}"
         self.logger.debug(f"Creating pvc {name}")
@@ -1061,13 +1102,24 @@ class Kubectl:
         self.logger.debug("Sleeping")
         sleep(40)
         self.logger.debug(f"Copying files to pod {pod_name}")
-        self.copy_file_to_pod(
-            namespace=namespace,
-            pod_name=pod_name,
-            container_name="copy-container",
-            src_file=src_file,
-            dest_path=f"/mnt/data/{dest_filename}",
-        )
+        for src_f, dest_f in zip(src_files, dest_files):
+            dest_path = f"/mnt/data/{dest_f}"
+            self.logger.debug(f"Copying file {src_f} to {dest_path} in pod {pod_name}")
+            result = self.copy_file_to_pod(
+                namespace=namespace,
+                pod_name=pod_name,
+                container_name="copy-container",
+                src_file=src_f,
+                dest_path=dest_path,
+            )
+            if result:
+                self.logger.debug(
+                    f"Successfully copied file {src_f} to {dest_path} in pod {pod_name}"
+                )
+            else:
+                raise Exception(
+                    f"Failed copying file {src_f} to {dest_path} in pod {pod_name}"
+                )
         self.logger.debug(f"Deleting pod {pod_name}")
         await self.delete_pod(pod_name, namespace)
 
diff --git a/osm_lcm/odu_libs/app.py b/osm_lcm/odu_libs/app.py
new file mode 100644 (file)
index 0000000..b23fb2a
--- /dev/null
@@ -0,0 +1,303 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+
+
+import yaml
+import tempfile
+import os
+
+
+MAP_PROFILE = {
+    "infra_controller_profiles": "infra-controller-profiles",
+    "infra_config_profiles": "infra-config-profiles",
+    "resource_profiles": "managed-resources",
+    "app_profiles": "app-profiles",
+}
+
+
+def merge_model(base, override):
+    """Recursively merge override dictionary into base dictionary."""
+    merge_model = base.copy()
+    for k, v in override.get("spec", {}).items():
+        if k != "ksus":
+            merge_model["spec"][k] = v
+    for ksu_override in override.get("spec", {}).get("ksus", []):
+        for ksu_base in merge_model.get("spec", {}).get("ksus", []):
+            if ksu_base.get("name") == ksu_override.get("name"):
+                for k, v in ksu_override.items():
+                    if k != "patterns":
+                        ksu_base[k] = v
+                        continue
+                    for pattern_override in ksu_override.get("patterns", []):
+                        for pattern_base in ksu_base.get("patterns", []):
+                            if pattern_base.get("name") == pattern_override.get("name"):
+                                for kp, vp in pattern_override.items():
+                                    if kp != "bricks":
+                                        pattern_base[kp] = vp
+                                        continue
+                                    for brick_override in pattern_override.get(
+                                        "bricks", []
+                                    ):
+                                        for brick_base in pattern_base.get(
+                                            "bricks", []
+                                        ):
+                                            if brick_base.get(
+                                                "name"
+                                            ) == brick_override.get("name"):
+                                                for kb, vb in brick_override.items():
+                                                    if kb != "hrset-values":
+                                                        brick_base[kb] = vb
+                                                        continue
+                                                    for (
+                                                        hrset_override
+                                                    ) in brick_override.get(
+                                                        "hrset-values", []
+                                                    ):
+                                                        for (
+                                                            hrset_base
+                                                        ) in brick_base.get(
+                                                            "hrset-values", []
+                                                        ):
+                                                            if hrset_base.get(
+                                                                "name"
+                                                            ) == hrset_override.get(
+                                                                "name"
+                                                            ):
+                                                                hrset_base |= (
+                                                                    hrset_override
+                                                                )
+                                                                break
+                                                        else:
+                                                            brick_base[
+                                                                "hrset-values"
+                                                            ].append(hrset_override)
+                                                break
+                                        else:
+                                            pattern_base["bricks"].append(
+                                                brick_override
+                                            )
+                                break
+                        else:
+                            ksu_base["patterns"].append(pattern_override)
+                break
+        else:
+            merge_model["spec"]["ksus"].append(ksu_override)
+    return merge_model
+
+
+async def launch_app(self, op_id, op_params, workflow_content, operation_type):
+    self.logger.info(
+        f"launch_app Enter. Operation {op_id}. Operation Type: {operation_type}"
+    )
+    # self.logger.debug(f"Operation Params: {op_params}")
+    # self.logger.debug(f"Content: {workflow_content}")
+
+    db_app = workflow_content["app"]
+    db_profile = workflow_content.get("profile")
+
+    profile_t = db_app.get("profile_type")
+    profile_type = MAP_PROFILE[profile_t]
+    profile_name = db_profile.get("git_name").lower()
+    app_name = db_app["git_name"].lower()
+    app_command = f"app {operation_type} $environment"
+    age_public_key = db_profile.get("age_pubkey")
+
+    sw_catalog_model = workflow_content.get("model")
+    self.logger.debug(f"SW catalog model: {sw_catalog_model}")
+
+    # Update the app model, extending it also with the model from op_params
+    if operation_type == "update":
+        model = op_params.get("model", db_app.get("app_model", {}))
+    else:
+        model = op_params.get("model", {})
+    app_model = merge_model(sw_catalog_model, model)
+
+    app_model["kind"] = "AppInstantiation"
+    app_model["metadata"]["name"] = app_name
+    for ksu in app_model.get("spec", {}).get("ksus", []):
+        for pattern in ksu.get("patterns", []):
+            for brick in pattern.get("bricks", []):
+                brick["public-age-key"] = age_public_key
+    self.logger.debug(f"App model: {app_model}")
+
+    if operation_type == "update":
+        params = op_params.get("params", db_app.get("params", {}))
+    else:
+        params = op_params.get("params", {})
+    params["PROFILE_TYPE"] = profile_type
+    params["PROFILE_NAME"] = profile_name
+    params["APPNAME"] = app_name
+    self.logger.debug(f"Params: {params}")
+
+    if operation_type == "update":
+        secret_params = op_params.get("secret_params", db_app.get("secret_params", {}))
+    else:
+        secret_params = op_params.get("secret_params", {})
+    self.logger.debug(f"Secret Params: {secret_params}")
+
+    # Create temporary folder for the app model and the parameters
+    temp_dir = tempfile.mkdtemp(prefix=f"app-{operation_type}-{op_id}-")
+    self.logger.debug(f"Temporary dir created: {temp_dir}")
+    with open(f"{temp_dir}/app_instance_model.yaml", "w") as f:
+        yaml.safe_dump(
+            app_model, f, indent=2, default_flow_style=False, sort_keys=False
+        )
+
+    os.makedirs(f"{temp_dir}/parameters/clear", exist_ok=True)
+    with open(f"{temp_dir}/parameters/clear/environment.yaml", "w") as f:
+        yaml.safe_dump(params, f, indent=2, default_flow_style=False, sort_keys=False)
+
+    # Create PVC and copy app model and parameters to PVC
+    app_model_pvc = f"temp-pvc-app-{op_id}"
+    src_files = [
+        f"{temp_dir}/app_instance_model.yaml",
+        f"{temp_dir}/parameters/clear/environment.yaml",
+    ]
+    dest_files = [
+        "app_instance_model.yaml",
+        "parameters/clear/environment.yaml",
+    ]
+    self.logger.debug(
+        f"Copying files to PVC {app_model_pvc}: {src_files} -> {dest_files}"
+    )
+    await self._kubectl.create_pvc_with_content(
+        name=app_model_pvc,
+        namespace="osm-workflows",
+        src_files=src_files,
+        dest_files=dest_files,
+    )
+
+    # Create secret with secret_params
+    secret_name = f"secret-app-{op_id}"
+    secret_namespace = "osm-workflows"
+    secret_key = "environment.yaml"
+    secret_value = yaml.safe_dump(
+        secret_params, indent=2, default_flow_style=False, sort_keys=False
+    )
+    try:
+        self.logger.debug(f"Testing kubectl: {self._kubectl}")
+        self.logger.debug(
+            f"Testing kubectl configuration: {self._kubectl.configuration}"
+        )
+        self.logger.debug(
+            f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+        )
+        self.logger.debug(
+            f"Creating secret {secret_name} in namespace {secret_namespace}"
+        )
+        await self.create_secret(
+            secret_name,
+            secret_namespace,
+            secret_key,
+            secret_value,
+        )
+    except Exception as e:
+        self.logger.info(
+            f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}"
+        )
+        return (
+            False,
+            f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}",
+        )
+
+    # Create workflow to launch the app
+    workflow_template = "launcher-app.j2"
+    workflow_name = f"{operation_type}-app-{op_id}"
+    # Additional params for the workflow
+    osm_project_name = workflow_content.get("project_name", "osm_admin")
+
+    # Render workflow
+    manifest = self.render_jinja_template(
+        workflow_template,
+        output_file=None,
+        workflow_name=workflow_name,
+        app_command=app_command,
+        app_model_pvc=app_model_pvc,
+        app_secret_name=secret_name,
+        git_fleet_url=self._repo_fleet_url,
+        git_sw_catalogs_url=self._repo_sw_catalogs_url,
+        app_name=app_name,
+        profile_name=profile_name,
+        profile_type=profile_type,
+        osm_project_name=osm_project_name,
+        workflow_debug=self._workflow_debug,
+        workflow_dry_run=self._workflow_dry_run,
+    )
+    self.logger.debug(f"Workflow manifest: {manifest}")
+
+    # Submit workflow
+    self._kubectl.create_generic_object(
+        namespace="osm-workflows",
+        manifest_dict=yaml.safe_load(manifest),
+        api_group="argoproj.io",
+        api_plural="workflows",
+        api_version="v1alpha1",
+    )
+    workflow_resources = {
+        "app_model": app_model,
+        "secret_params": secret_params,
+        "params": params,
+    }
+    return True, workflow_name, workflow_resources
+
+
+async def create_app(self, op_id, op_params, content):
+    self.logger.info(f"create_app Enter. Operation {op_id}")
+    # self.logger.debug(f"Operation Params: {op_params}")
+    # self.logger.debug(f"Content: {workflow_content}")
+    return await self.launch_app(op_id, op_params, content, "create")
+
+
+async def update_app(self, op_id, op_params, content):
+    self.logger.info(f"update_app Enter. Operation {op_id}")
+    # self.logger.debug(f"Operation Params: {op_params}")
+    # self.logger.debug(f"Content: {workflow_content}")
+    return await self.launch_app(op_id, op_params, content, "update")
+
+
+async def delete_app(self, op_id, op_params, content):
+    self.logger.info(f"delete_app Enter. Operation {op_id}")
+    # self.logger.debug(f"Operation Params: {op_params}")
+    # self.logger.debug(f"Content: {workflow_content}")
+    return await self.launch_app(op_id, op_params, content, "delete")
+
+
+async def clean_items_app_launch(self, op_id, op_params, workflow_content):
+    self.logger.info(f"clean_items_app_launch Enter. Operation {op_id}")
+    # self.logger.debug(f"Operation Params: {op_params}")
+    # self.logger.debug(f"Content: {workflow_content}")
+    try:
+        secret_name = f"secret-app-{op_id}"
+        volume_name = f"temp-pvc-app-{op_id}"
+        items = {
+            "secrets": [
+                {
+                    "name": secret_name,
+                    "namespace": "osm-workflows",
+                }
+            ],
+            "pvcs": [
+                {
+                    "name": volume_name,
+                    "namespace": "osm-workflows",
+                }
+            ],
+        }
+        await self.clean_items(items)
+        return True, "OK"
+    except Exception as e:
+        return False, f"Error while cleaning items: {e}"
index 7f35c9d..9c0cbaf 100644 (file)
@@ -73,7 +73,7 @@ async def create_cluster(self, op_id, op_params, content):
         )
     except Exception as e:
         self.logger.info(f"Cannot create secret {secret_name}: {e}")
-        return False, f"Cannot create secret {secret_name}: {e}"
+        return False, f"Cannot create secret {secret_name}: {e}", None
 
     # Additional params for the workflow
     cluster_kustomization_name = cluster_name
@@ -119,7 +119,7 @@ async def create_cluster(self, op_id, op_params, content):
             )
         except Exception as e:
             self.logger.info(f"Cannot create configmap {configmap_name}: {e}")
-            return False, f"Cannot create configmap {configmap_name}: {e}"
+            return False, f"Cannot create configmap {configmap_name}: {e}", None
 
     # Render workflow
     # workflow_kwargs = {
@@ -174,7 +174,7 @@ async def create_cluster(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def update_cluster(self, op_id, op_params, content):
@@ -215,7 +215,7 @@ async def update_cluster(self, op_id, op_params, content):
         )
     except Exception as e:
         self.logger.info(f"Cannot create secret {secret_name}: {e}")
-        return False, f"Cannot create secret {secret_name}: {e}"
+        return False, f"Cannot create secret {secret_name}: {e}", None
 
     # Additional params for the workflow
     cluster_kustomization_name = cluster_name
@@ -274,7 +274,7 @@ async def update_cluster(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def delete_cluster(self, op_id, op_params, content):
@@ -320,7 +320,7 @@ async def delete_cluster(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def register_cluster(self, op_id, op_params, content):
@@ -364,6 +364,7 @@ async def register_cluster(self, op_id, op_params, content):
         return (
             False,
             f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}",
+            None,
         )
 
     # Create secret with kubeconfig
@@ -394,6 +395,7 @@ async def register_cluster(self, op_id, op_params, content):
         return (
             False,
             f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}",
+            None,
         )
 
     # Additional params for the workflow
@@ -441,7 +443,7 @@ async def register_cluster(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def deregister_cluster(self, op_id, op_params, content):
@@ -486,7 +488,7 @@ async def deregister_cluster(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def get_cluster_credentials(self, db_cluster):
index e7c2f82..c7fd50d 100644 (file)
@@ -177,7 +177,7 @@ async def create_ksus(self, op_id, op_params_list, content_list):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def update_ksus(self, op_id, op_params_list, content_list):
@@ -328,7 +328,7 @@ async def update_ksus(self, op_id, op_params_list, content_list):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def delete_ksus(self, op_id, op_params_list, content_list):
@@ -374,21 +374,21 @@ async def delete_ksus(self, op_id, op_params_list, content_list):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def clone_ksu(self, op_id, op_params, content):
     self.logger.info(f"clone_ksu Enter. Operation {op_id}. Params: {op_params}")
     # self.logger.debug(f"Content: {content}")
     workflow_name = f"clone-ksu-{content['_id']}"
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def move_ksu(self, op_id, op_params, content):
     self.logger.info(f"move_ksu Enter. Operation {op_id}. Params: {op_params}")
     # self.logger.debug(f"Content: {content}")
     workflow_name = f"move-ksu-{content['_id']}"
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def clean_items_ksu_create(self, op_id, op_params_list, content_list):
index ff40f9c..d703858 100644 (file)
@@ -60,7 +60,7 @@ async def add_nodegroup(self, op_id, op_params, content):
         )
     except Exception as e:
         self.logger.info(f"Cannot create secret {secret_name}: {e}")
-        return False, f"Cannot create secret {secret_name}: {e}"
+        return False, f"Cannot create secret {secret_name}: {e}", None
 
     private_subnet = op_params.get("private_subnet", [])
     public_subnet = op_params.get("public_subnet", [])
@@ -78,7 +78,7 @@ async def add_nodegroup(self, op_id, op_params, content):
         )
     except Exception as e:
         self.logger.info(f"Cannot create configmap {configmap_name}: {e}")
-        return False, f"Cannot create configmap {configmap_name}: {e}"
+        return False, f"Cannot create configmap {configmap_name}: {e}", None
 
     # Additional params for the workflow
     nodegroup_kustomization_name = nodegroup_name
@@ -135,7 +135,7 @@ async def add_nodegroup(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def scale_nodegroup(self, op_id, op_params, content):
@@ -170,7 +170,7 @@ async def scale_nodegroup(self, op_id, op_params, content):
         )
     except Exception as e:
         self.logger.info(f"Cannot create secret {secret_name}: {e}")
-        return False, f"Cannot create secret {secret_name}: {e}"
+        return False, f"Cannot create secret {secret_name}: {e}", None
 
     # Additional params for the workflow
     nodegroup_kustomization_name = nodegroup_name
@@ -213,7 +213,7 @@ async def scale_nodegroup(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def delete_nodegroup(self, op_id, op_params, content):
@@ -254,7 +254,7 @@ async def delete_nodegroup(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def clean_items_nodegroup_add(self, op_id, op_params, content):
index 63c4d36..564acbe 100644 (file)
@@ -55,8 +55,8 @@ async def create_oka(self, op_id, op_params, content):
     await self._kubectl.create_pvc_with_content(
         name=temp_volume_name,
         namespace="osm-workflows",
-        src_file=f"{oka_folder}/{oka_filename}",
-        dest_filename=f"{oka_name}.tar.gz",
+        src_files=[f"{oka_folder}/{oka_filename}"],
+        dest_files=[f"{oka_name}.tar.gz"],
     )
 
     # Render workflow
@@ -83,7 +83,7 @@ async def create_oka(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def update_oka(self, op_id, op_params, content):
@@ -111,8 +111,8 @@ async def update_oka(self, op_id, op_params, content):
     await self._kubectl.create_pvc_with_content(
         name=temp_volume_name,
         namespace="osm-workflows",
-        src_folder=oka_folder,
-        filename=oka_filename,
+        src_files=[f"{oka_folder}/{oka_filename}"],
+        dest_files=[f"{oka_name}.tar.gz"],
     )
 
     # Render workflow
@@ -139,7 +139,7 @@ async def update_oka(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def delete_oka(self, op_id, op_params, content):
@@ -178,7 +178,7 @@ async def delete_oka(self, op_id, op_params, content):
         api_plural="workflows",
         api_version="v1alpha1",
     )
-    return True, workflow_name
+    return True, workflow_name, None
 
 
 async def clean_items_oka_create(self, op_id, op_params_list, content_list):
diff --git a/osm_lcm/odu_libs/templates/launcher-app.j2 b/osm_lcm/odu_libs/templates/launcher-app.j2
new file mode 100644 (file)
index 0000000..87b52d7
--- /dev/null
@@ -0,0 +1,87 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+  name: {{ workflow_name }}
+spec:
+  arguments:
+    parameters:
+    # Full OSM SDK command to execute
+    - name: command
+      description: |
+        Full command string to execute with the OSM SDK.
+        Examples:
+        - "app create $environment": Deploy new application instance
+        - "app update $environment": Update existing application instance
+        - "app delete $environment": Remove application instance
+        This parameter accepts any valid OSM SDK command for maximum flexibility.
+      value: {{ app_command }}
+    # Volume reference for application models and parameters
+    # NOTE: The PVC must be created and populated with the following structure:
+    #   /model/app_instance_model.yaml
+    #   /model/parameters/clear/environment.yaml
+    - name: model_volume_name
+      value: "{{ app_model_pvc }}"
+    # Secret reference for mounting sensitive parameters
+    # This secret will be mounted as a file at:
+    # /model/parameters/secret/environment.yaml
+    - name: secret_name
+      value: "{{ app_secret_name }}"
+    # Fleet repo configuration
+    - name: git_fleet_url
+      value: "{{ git_fleet_url }}"
+    - name: fleet_destination_folder
+      value: "/fleet/fleet-osm"
+    - name: git_fleet_cred_secret
+      value: fleet-repo
+    # SW-Catalogs repo configuration
+    - name: git_sw_catalogs_url
+      value: "{{ git_sw_catalogs_url }}"
+    - name: sw_catalogs_destination_folder
+      value: "/sw-catalogs/sw-catalogs-osm"
+    - name: git_sw_catalogs_cred_secret
+      value: sw-catalogs
+    # Target deployment information
+    - name: app_name
+      value: "{{ app_name }}"
+    - name: profile_name
+      value: "{{ profile_name }}"
+    - name: profile_type
+      value: "{{ profile_type }}"
+    - name: project_name
+      value: "{{ osm_project_name }}"
+    # OSM SDK container configuration
+    - name: osm_sdk_image_repository
+      value: "opensourcemano/osm-nushell-krm-functions"
+    - name: osm_sdk_image_tag
+      value: "testing-daily"
+    # Debug and dry-run flags
+    - name: debug
+      value: "{{ workflow_debug }}"
+    - name: dry_run
+      value: "{{ workflow_dry_run }}"
+
+  # Cleanup policy
+  ttlStrategy:
+    secondsAfterCompletion: 1800  # Time to live after workflow is completed
+    secondsAfterSuccess: 1800     # Time to live after workflow is successful
+    secondsAfterFailure: 1800     # Time to live after workflow fails
+
+  workflowTemplateRef:
+    name: full-app-management-wft
index e352883..0c51b2d 100644 (file)
@@ -21,6 +21,7 @@ from osm_lcm.odu_libs import (
     vim_mgmt as odu_vim_mgmt,
     cluster_mgmt as odu_cluster_mgmt,
     nodegroup as odu_nodegroup,
+    app as odu_app,
     ksu as odu_ksu,
     oka as odu_oka,
     profiles as odu_profiles,
@@ -127,6 +128,18 @@ class OduWorkflow(LcmBase):
             "move_ksu": {
                 "workflow_function": self.move_ksu,
             },
+            "create_app": {
+                "workflow_function": self.create_app,
+                "clean_function": self.clean_items_app_launch,
+            },
+            "update_app": {
+                "workflow_function": self.update_app,
+                "clean_function": self.clean_items_app_launch,
+            },
+            "delete_app": {
+                "workflow_function": self.delete_app,
+                "clean_function": self.clean_items_app_launch,
+            },
             "create_cloud_credentials": {
                 "workflow_function": self.create_cloud_credentials,
                 "clean_function": self.clean_items_cloud_credentials_create,
@@ -211,19 +224,28 @@ class OduWorkflow(LcmBase):
     delete_secret = odu_common.delete_secret
     create_configmap = odu_common.create_configmap
     delete_configmap = odu_common.delete_configmap
+    create_app = odu_app.create_app
+    update_app = odu_app.update_app
+    delete_app = odu_app.delete_app
+    launch_app = odu_app.launch_app
+    clean_items_app_launch = odu_app.clean_items_app_launch
 
     async def launch_workflow(self, key, op_id, op_params, content):
         self.logger.info(
-            f"Workflow is getting into launch. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}"
+            f"Workflow is getting into launch. Key: {key}. Operation: {op_id}"
         )
+        # self.logger.debug(f"Operation Params: {op_params}")
+        # self.logger.debug(f"Content: {content}")
         workflow_function = self._workflows[key]["workflow_function"]
         self.logger.info("workflow function : {}".format(workflow_function))
         try:
-            result, workflow_name = await workflow_function(op_id, op_params, content)
-            return result, workflow_name
+            result, workflow_name, workflow_resources = await workflow_function(
+                op_id, op_params, content
+            )
+            return result, workflow_name, workflow_resources
         except Exception as e:
             self.logger.error(f"Error launching workflow: {e}")
-            return False, str(e)
+            return False, str(e), None
 
     async def dummy_clean_items(self, op_id, op_params, content):
         self.logger.info(
@@ -245,7 +267,7 @@ class OduWorkflow(LcmBase):
     async def dummy_operation(self, op_id, op_params, content):
         self.logger.info("Empty operation status Enter")
         self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
-        return content["workflow_name"]
+        return True, content["workflow_name"], None
 
     async def clean_items(self, items):
         # Delete pods
index 79f5cfe..8a93de8 100644 (file)
@@ -17,6 +17,7 @@ async-timeout==4.0.3
 charset-normalizer
 checksumdir
 config-man
+GitPython
 google-auth<2.18.0
 grpcio-tools
 grpclib
index ea67128..6dea1f0 100644 (file)
@@ -69,6 +69,10 @@ frozenlist==1.8.0
     # via
     #   aiohttp
     #   aiosignal
+gitdb==4.0.12
+    # via gitpython
+gitpython==3.1.45
+    # via -r requirements.in
 glom==24.11.0
     # via config-man
 google-auth==2.17.3
@@ -194,6 +198,8 @@ six==1.17.0
     #   paramiko
     #   pymacaroons
     #   python-dateutil
+smmap==5.0.2
+    # via gitdb
 termcolor==3.2.0
     # via fire
 theblues==0.5.2