From 61a4c696b9b4fcec98e19daa38badef9a5927210 Mon Sep 17 00:00:00 2001 From: garciadeblas Date: Thu, 17 Jul 2025 13:04:13 +0200 Subject: [PATCH] Feature 11073: Enhanced OSM declarative modelling for applications. App as first class citizen Change-Id: I6b750f4d862692ab885e98afe3771ba817dd6535 Signed-off-by: garciadeblas --- Dockerfile.production | 1 + osm_lcm/gitops.py | 445 +++++++++++++ osm_lcm/k8s.py | 708 ++++++++++----------- osm_lcm/lcm.py | 25 + osm_lcm/lcm_utils.py | 3 + osm_lcm/n2vc/kubectl.py | 126 ++-- osm_lcm/odu_libs/app.py | 303 +++++++++ osm_lcm/odu_libs/cluster_mgmt.py | 18 +- osm_lcm/odu_libs/ksu.py | 10 +- osm_lcm/odu_libs/nodegroup.py | 12 +- osm_lcm/odu_libs/oka.py | 14 +- osm_lcm/odu_libs/templates/launcher-app.j2 | 87 +++ osm_lcm/odu_workflows.py | 32 +- requirements.in | 1 + requirements.txt | 6 + 15 files changed, 1364 insertions(+), 427 deletions(-) create mode 100644 osm_lcm/gitops.py create mode 100644 osm_lcm/odu_libs/app.py create mode 100644 osm_lcm/odu_libs/templates/launcher-app.j2 diff --git a/Dockerfile.production b/Dockerfile.production index 8087ebff..d8139af0 100644 --- a/Dockerfile.production +++ b/Dockerfile.production @@ -101,6 +101,7 @@ WORKDIR /app RUN apk add --no-cache \ bash \ curl \ + git \ openssh-client \ openssh-keygen \ openssl diff --git a/osm_lcm/gitops.py b/osm_lcm/gitops.py new file mode 100644 index 00000000..22d27172 --- /dev/null +++ b/osm_lcm/gitops.py @@ -0,0 +1,445 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import logging +import tempfile +from time import time +import traceback +from git import Repo +from osm_lcm.lcm_utils import LcmBase +from osm_lcm import odu_workflows +from osm_lcm.data_utils.list_utils import find_in_list +from osm_lcm.n2vc.kubectl import Kubectl +import yaml +from urllib.parse import quote + + +class GitOpsLcm(LcmBase): + db_collection = "gitops" + workflow_status = None + resource_status = None + + profile_collection_mapping = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "resource_profiles": "k8sresource", + "app_profiles": "k8sapp", + } + + profile_type_mapping = { + "infra-controllers": "infra_controller_profiles", + "infra-configs": "infra_config_profiles", + "managed-resources": "resource_profiles", + "applications": "app_profiles", + } + + def __init__(self, msg, lcm_tasks, config): + self.logger = logging.getLogger("lcm.gitops") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + self._checkloop_kustomization_timeout = 900 + self._checkloop_resource_timeout = 900 + self._workflows = {} + self.gitops_config = config["gitops"] + self.logger.debug(f"GitOps config: {self.gitops_config}") + self._repo_base_url = self.gitops_config.get("git_base_url") + self._repo_user = self.gitops_config.get("user") + self._repo_sw_catalogs_url = self.gitops_config.get( + "sw_catalogs_repo_url", + f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git", + ) + self._repo_password = self.gitops_config.get("password", "OUM+O61Iy1") + self._full_repo_sw_catalogs_url = self.build_git_url_with_credentials( + self._repo_sw_catalogs_url + ) + super().__init__(msg, self.logger) + + def build_git_url_with_credentials(self, repo_url): + # Build authenticated URL if credentials were provided + if self._repo_password: + # URL-safe escape password + safe_user = quote(self._repo_user) + safe_pass = quote(self._repo_password) + + # Insert credentials into the URL + # e.g. https://username:password@github.com/org/repo.git + auth_url = repo_url.replace("https://", f"https://{safe_user}:{safe_pass}@") + auth_url = repo_url.replace("http://", f"https://{safe_user}:{safe_pass}@") + else: + auth_url = repo_url + return auth_url + + async def check_dummy_operation(self, op_id, op_params, content): + self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}") + return True, "OK" + + def initialize_operation(self, item_id, op_id): + db_item = self.db.get_one(self.db_collection, {"_id": item_id}) + operation = next( + (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id), + None, + ) + operation["workflowState"] = "PROCESSING" + operation["resourceState"] = "NOT_READY" + operation["operationState"] = "IN_PROGRESS" + operation["gitOperationInfo"] = None + db_item["current_operation"] = operation["op_id"] + self.db.set_one(self.db_collection, {"_id": item_id}, db_item) + + def get_operation_params(self, item, operation_id): + operation_history = item.get("operationHistory", []) + operation = find_in_list( + operation_history, lambda op: op["op_id"] == operation_id + ) + return operation.get("operationParams", {}) + + def get_operation_type(self, item, operation_id): + operation_history = item.get("operationHistory", []) + operation = find_in_list( + operation_history, lambda op: op["op_id"] == operation_id + ) + return operation.get("operationType", {}) + + def update_state_operation_history( + self, content, op_id, workflow_state=None, resource_state=None + ): + self.logger.info( + f"Update state of operation {op_id} in Operation History in DB" + ) + self.logger.info( + f"Workflow state: {workflow_state}. Resource state: {resource_state}" + ) + self.logger.debug(f"Content: {content}") + + op_num = 0 + for operation in content["operationHistory"]: + self.logger.debug("Operations: {}".format(operation)) + if operation["op_id"] == op_id: + self.logger.debug("Found operation number: {}".format(op_num)) + if workflow_state is not None: + operation["workflowState"] = workflow_state + + if resource_state is not None: + operation["resourceState"] = resource_state + break + op_num += 1 + self.logger.debug("content: {}".format(content)) + + return content + + def update_operation_history( + self, content, op_id, workflow_status=None, resource_status=None, op_end=True + ): + self.logger.info( + f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}" + ) + self.logger.debug(f"Content: {content}") + + op_num = 0 + for operation in content["operationHistory"]: + self.logger.debug("Operations: {}".format(operation)) + if operation["op_id"] == op_id: + self.logger.debug("Found operation number: {}".format(op_num)) + if workflow_status is not None: + if workflow_status: + operation["workflowState"] = "COMPLETED" + operation["result"] = True + else: + operation["workflowState"] = "ERROR" + operation["operationState"] = "FAILED" + operation["result"] = False + + if resource_status is not None: + if resource_status: + operation["resourceState"] = "READY" + operation["operationState"] = "COMPLETED" + operation["result"] = True + else: + operation["resourceState"] = "NOT_READY" + operation["operationState"] = "FAILED" + operation["result"] = False + + if op_end: + now = time() + operation["endDate"] = now + break + op_num += 1 + self.logger.debug("content: {}".format(content)) + + return content + + async def check_workflow_and_update_db(self, op_id, workflow_name, db_content): + workflow_status, workflow_msg = await self.odu.check_workflow_status( + op_id, workflow_name + ) + self.logger.info( + "Workflow Status: {} Workflow Message: {}".format( + workflow_status, workflow_msg + ) + ) + operation_type = self.get_operation_type(db_content, op_id) + if operation_type == "create" and workflow_status: + db_content["state"] = "CREATED" + elif operation_type == "create" and not workflow_status: + db_content["state"] = "FAILED_CREATION" + elif operation_type == "delete" and workflow_status: + db_content["state"] = "DELETED" + elif operation_type == "delete" and not workflow_status: + db_content["state"] = "FAILED_DELETION" + + if workflow_status: + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, op_id, workflow_status, None + ) + self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) + return workflow_status + + async def check_resource_and_update_db( + self, resource_name, op_id, op_params, db_content + ): + workflow_status = True + + resource_status, resource_msg = await self.check_resource_status( + resource_name, op_id, op_params, db_content + ) + self.logger.info( + "Resource Status: {} Resource Message: {}".format( + resource_status, resource_msg + ) + ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, op_id, workflow_status, resource_status + ) + db_content["operatingState"] = "IDLE" + db_content["current_operation"] = None + return resource_status, db_content + + async def common_check_list( + self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None + ): + try: + for checking in checkings_list: + if checking["enable"]: + status, message = await self.odu.readiness_loop( + op_id=op_id, + item=checking["item"], + name=checking["name"], + namespace=checking["namespace"], + condition=checking.get("condition"), + deleted=checking.get("deleted", False), + timeout=checking["timeout"], + kubectl_obj=kubectl_obj, + ) + if not status: + error_message = "Resources not ready: " + error_message += checking.get("error_message", "") + return status, f"{error_message}: {message}" + else: + db_item["resourceState"] = checking["resourceState"] + db_item = self.update_state_operation_history( + db_item, op_id, None, checking["resourceState"] + ) + self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item) + except Exception as e: + self.logger.debug(traceback.format_exc()) + self.logger.debug(f"Exception: {e}", exc_info=True) + return False, f"Unexpected exception: {e}" + return True, "OK" + + async def check_resource_status(self, key, op_id, op_params, content): + self.logger.info( + f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}." + ) + self.logger.debug(f"Check resource status. Content: {content}") + check_resource_function = self._workflows.get(key, {}).get( + "check_resource_function" + ) + self.logger.info("check_resource function : {}".format(check_resource_function)) + if check_resource_function: + return await check_resource_function(op_id, op_params, content) + else: + return await self.check_dummy_operation(op_id, op_params, content) + + def check_force_delete_and_delete_from_db( + self, _id, workflow_status, resource_status, force + ): + self.logger.info( + f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}" + ) + if force and (not workflow_status or not resource_status): + self.db.del_one(self.db_collection, {"_id": _id}) + return True + return False + + def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]): + self.db.encrypt_decrypt_fields( + content, + "decrypt", + fields, + schema_version="1.11", + salt=content["_id"], + ) + + def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]): + self.db.encrypt_decrypt_fields( + content, + "encrypt", + fields, + schema_version="1.11", + salt=content["_id"], + ) + + def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]): + # This deep copy is intended to be passed to ODU workflows. + content_copy = copy.deepcopy(content) + + # decrypting the key + self.db.encrypt_decrypt_fields( + content_copy, + "decrypt", + fields, + schema_version="1.11", + salt=content_copy["_id"], + ) + return content_copy + + def delete_ksu_dependency(self, _id, data): + used_oka = [] + existing_oka = [] + + for oka_data in data["oka"]: + if oka_data.get("_id"): + used_oka.append(oka_data["_id"]) + + all_ksu_data = self.db.get_list("ksus", {}) + for ksu_data in all_ksu_data: + if ksu_data["_id"] != _id: + for oka_data in ksu_data["oka"]: + if oka_data.get("_id"): + if oka_data["_id"] not in existing_oka: + existing_oka.append(oka_data["_id"]) + + self.logger.info(f"Used OKA: {used_oka}") + self.logger.info(f"Existing OKA: {existing_oka}") + + for oka_id in used_oka: + if oka_id not in existing_oka: + self.db.set_one( + "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"} + ) + + return + + def delete_profile_ksu(self, _id, profile_type): + filter_q = {"profile": {"_id": _id, "profile_type": profile_type}} + ksu_list = self.db.get_list("ksus", filter_q) + for ksu_data in ksu_list: + self.delete_ksu_dependency(ksu_data["_id"], ksu_data) + + if ksu_list: + self.db.del_list("ksus", filter_q) + return + + def cluster_kubectl(self, db_cluster): + cluster_kubeconfig = db_cluster["credentials"] + kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml" + with open(kubeconfig_path, "w") as kubeconfig_file: + yaml.safe_dump(cluster_kubeconfig, kubeconfig_file) + return Kubectl(config_file=kubeconfig_path) + + def cloneGitRepo(self, repo_url, branch): + self.logger.debug(f"Cloning repo {repo_url}, branch {branch}") + tmpdir = tempfile.mkdtemp() + self.logger.debug(f"Created temp folder {tmpdir}") + cloned_repo = Repo.clone_from( + repo_url, + tmpdir, + allow_unsafe_options=True, + multi_options=["-c", "http.sslVerify=false"], + ) + self.logger.debug(f"Current active branch: {cloned_repo.active_branch}") + assert cloned_repo + new_branch = cloned_repo.create_head(branch) # create a new branch + assert new_branch.checkout() == cloned_repo.active_branch + self.logger.debug(f"Current active branch: {cloned_repo.active_branch}") + self.logger.info(f"Repo {repo_url} cloned in {tmpdir}. New branch: {branch}") + return tmpdir + + def createCommit(self, repo_dir, commit_msg): + repo = Repo(repo_dir) + self.logger.info( + f"Creating commit '{commit_msg}' in branch '{repo.active_branch}'" + ) + self.logger.debug(f"Current active branch: {repo.active_branch}") + # repo.index.add('**') + repo.git.add(all=True) + repo.index.commit(commit_msg) + self.logger.info( + f"Commit '{commit_msg}' created in branch '{repo.active_branch}'" + ) + self.logger.debug(f"Current active branch: {repo.active_branch}") + return repo.active_branch + + def mergeGit(self, repo_dir, git_branch): + repo = Repo(repo_dir) + self.logger.info(f"Merging local branch '{git_branch}' into main") + with_git = False + if with_git: + try: + repo.git("checkout main") + repo.git(f"merge {git_branch}") + return True + except Exception as e: + self.logger.error(e) + return False + else: + # prepare a merge + main = repo.heads.main # right-hand side is ahead of us, in the future + merge_base = repo.merge_base(git_branch, main) # three-way merge + repo.index.merge_tree(main, base=merge_base) # write the merge into index + try: + # The merge is done in the branch + repo.index.commit( + f"Merged {git_branch} and main", + parent_commits=(git_branch.commit, main.commit), + ) + # Now, git_branch is ahed of master. Now let master point to the recent commit + aux_head = repo.create_head("aux") + main.commit = aux_head.commit + repo.delete_head(aux_head) + assert main.checkout() + return True + except Exception as e: + self.logger.error(e) + return False + + def pushToRemote(self, repo_dir): + repo = Repo(repo_dir) + self.logger.info("Pushing the change to remote") + # repo.remotes.origin.push(refspec='{}:{}'.format(local_branch, remote_branch)) + repo.remotes.origin.push() + self.logger.info("Push done") + return True diff --git a/osm_lcm/k8s.py b/osm_lcm/k8s.py index c9497931..6b02ec5f 100644 --- a/osm_lcm/k8s.py +++ b/osm_lcm/k8s.py @@ -18,17 +18,14 @@ __author__ = ( "Shahithya Y ", ) -import copy -import logging +import os from time import time import traceback -from osm_lcm.lcm_utils import LcmBase +import yaml from copy import deepcopy -from osm_lcm import odu_workflows from osm_lcm import vim_sdn -from osm_lcm.data_utils.list_utils import find_in_list -from osm_lcm.n2vc.kubectl import Kubectl -import yaml +from osm_lcm.gitops import GitOpsLcm +from osm_lcm.lcm_utils import LcmException MAP_PROFILE = { "infra_controller_profiles": "infra-controllers", @@ -38,323 +35,6 @@ MAP_PROFILE = { } -class GitOpsLcm(LcmBase): - db_collection = "gitops" - workflow_status = None - resource_status = None - - profile_collection_mapping = { - "infra_controller_profiles": "k8sinfra_controller", - "infra_config_profiles": "k8sinfra_config", - "resource_profiles": "k8sresource", - "app_profiles": "k8sapp", - } - - profile_type_mapping = { - "infra-controllers": "infra_controller_profiles", - "infra-configs": "infra_config_profiles", - "managed-resources": "resource_profiles", - "applications": "app_profiles", - } - - def __init__(self, msg, lcm_tasks, config): - self.logger = logging.getLogger("lcm.gitops") - self.lcm_tasks = lcm_tasks - self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) - self._checkloop_kustomization_timeout = 900 - self._checkloop_resource_timeout = 900 - self._workflows = {} - super().__init__(msg, self.logger) - - async def check_dummy_operation(self, op_id, op_params, content): - self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}") - return True, "OK" - - def initialize_operation(self, item_id, op_id): - db_item = self.db.get_one(self.db_collection, {"_id": item_id}) - operation = next( - (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id), - None, - ) - operation["workflowState"] = "PROCESSING" - operation["resourceState"] = "NOT_READY" - operation["operationState"] = "IN_PROGRESS" - operation["gitOperationInfo"] = None - db_item["current_operation"] = operation["op_id"] - self.db.set_one(self.db_collection, {"_id": item_id}, db_item) - - def get_operation_params(self, item, operation_id): - operation_history = item.get("operationHistory", []) - operation = find_in_list( - operation_history, lambda op: op["op_id"] == operation_id - ) - return operation.get("operationParams", {}) - - def get_operation_type(self, item, operation_id): - operation_history = item.get("operationHistory", []) - operation = find_in_list( - operation_history, lambda op: op["op_id"] == operation_id - ) - return operation.get("operationType", {}) - - def update_state_operation_history( - self, content, op_id, workflow_state=None, resource_state=None - ): - self.logger.info( - f"Update state of operation {op_id} in Operation History in DB" - ) - self.logger.info( - f"Workflow state: {workflow_state}. Resource state: {resource_state}" - ) - self.logger.debug(f"Content: {content}") - - op_num = 0 - for operation in content["operationHistory"]: - self.logger.debug("Operations: {}".format(operation)) - if operation["op_id"] == op_id: - self.logger.debug("Found operation number: {}".format(op_num)) - if workflow_state is not None: - operation["workflowState"] = workflow_state - - if resource_state is not None: - operation["resourceState"] = resource_state - break - op_num += 1 - self.logger.debug("content: {}".format(content)) - - return content - - def update_operation_history( - self, content, op_id, workflow_status=None, resource_status=None, op_end=True - ): - self.logger.info( - f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}" - ) - self.logger.debug(f"Content: {content}") - - op_num = 0 - for operation in content["operationHistory"]: - self.logger.debug("Operations: {}".format(operation)) - if operation["op_id"] == op_id: - self.logger.debug("Found operation number: {}".format(op_num)) - if workflow_status is not None: - if workflow_status: - operation["workflowState"] = "COMPLETED" - operation["result"] = True - else: - operation["workflowState"] = "ERROR" - operation["operationState"] = "FAILED" - operation["result"] = False - - if resource_status is not None: - if resource_status: - operation["resourceState"] = "READY" - operation["operationState"] = "COMPLETED" - operation["result"] = True - else: - operation["resourceState"] = "NOT_READY" - operation["operationState"] = "FAILED" - operation["result"] = False - - if op_end: - now = time() - operation["endDate"] = now - break - op_num += 1 - self.logger.debug("content: {}".format(content)) - - return content - - async def check_workflow_and_update_db(self, op_id, workflow_name, db_content): - workflow_status, workflow_msg = await self.odu.check_workflow_status( - op_id, workflow_name - ) - self.logger.info( - "Workflow Status: {} Workflow Message: {}".format( - workflow_status, workflow_msg - ) - ) - operation_type = self.get_operation_type(db_content, op_id) - if operation_type == "create" and workflow_status: - db_content["state"] = "CREATED" - elif operation_type == "create" and not workflow_status: - db_content["state"] = "FAILED_CREATION" - elif operation_type == "delete" and workflow_status: - db_content["state"] = "DELETED" - elif operation_type == "delete" and not workflow_status: - db_content["state"] = "FAILED_DELETION" - - if workflow_status: - db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" - else: - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, None - ) - self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) - return workflow_status - - async def check_resource_and_update_db( - self, resource_name, op_id, op_params, db_content - ): - workflow_status = True - - resource_status, resource_msg = await self.check_resource_status( - resource_name, op_id, op_params, db_content - ) - self.logger.info( - "Resource Status: {} Resource Message: {}".format( - resource_status, resource_msg - ) - ) - - if resource_status: - db_content["resourceState"] = "READY" - else: - db_content["resourceState"] = "ERROR" - - db_content = self.update_operation_history( - db_content, op_id, workflow_status, resource_status - ) - db_content["operatingState"] = "IDLE" - db_content["current_operation"] = None - return resource_status, db_content - - async def common_check_list( - self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None - ): - try: - for checking in checkings_list: - if checking["enable"]: - status, message = await self.odu.readiness_loop( - op_id=op_id, - item=checking["item"], - name=checking["name"], - namespace=checking["namespace"], - condition=checking.get("condition"), - deleted=checking.get("deleted", False), - timeout=checking["timeout"], - kubectl_obj=kubectl_obj, - ) - if not status: - error_message = "Resources not ready: " - error_message += checking.get("error_message", "") - return status, f"{error_message}: {message}" - else: - db_item["resourceState"] = checking["resourceState"] - db_item = self.update_state_operation_history( - db_item, op_id, None, checking["resourceState"] - ) - self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item) - except Exception as e: - self.logger.debug(traceback.format_exc()) - self.logger.debug(f"Exception: {e}", exc_info=True) - return False, f"Unexpected exception: {e}" - return True, "OK" - - async def check_resource_status(self, key, op_id, op_params, content): - self.logger.info( - f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}." - ) - self.logger.debug(f"Check resource status. Content: {content}") - check_resource_function = self._workflows.get(key, {}).get( - "check_resource_function" - ) - self.logger.info("check_resource function : {}".format(check_resource_function)) - if check_resource_function: - return await check_resource_function(op_id, op_params, content) - else: - return await self.check_dummy_operation(op_id, op_params, content) - - def check_force_delete_and_delete_from_db( - self, _id, workflow_status, resource_status, force - ): - self.logger.info( - f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}" - ) - if force and (not workflow_status or not resource_status): - self.db.del_one(self.db_collection, {"_id": _id}) - return True - return False - - def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]): - self.db.encrypt_decrypt_fields( - content, - "decrypt", - fields, - schema_version="1.11", - salt=content["_id"], - ) - - def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]): - self.db.encrypt_decrypt_fields( - content, - "encrypt", - fields, - schema_version="1.11", - salt=content["_id"], - ) - - def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]): - # This deep copy is intended to be passed to ODU workflows. - content_copy = copy.deepcopy(content) - - # decrypting the key - self.db.encrypt_decrypt_fields( - content_copy, - "decrypt", - fields, - schema_version="1.11", - salt=content_copy["_id"], - ) - return content_copy - - def delete_ksu_dependency(self, _id, data): - used_oka = [] - existing_oka = [] - - for oka_data in data["oka"]: - if oka_data.get("_id"): - used_oka.append(oka_data["_id"]) - - all_ksu_data = self.db.get_list("ksus", {}) - for ksu_data in all_ksu_data: - if ksu_data["_id"] != _id: - for oka_data in ksu_data["oka"]: - if oka_data.get("_id"): - if oka_data["_id"] not in existing_oka: - existing_oka.append(oka_data["_id"]) - - self.logger.info(f"Used OKA: {used_oka}") - self.logger.info(f"Existing OKA: {existing_oka}") - - for oka_id in used_oka: - if oka_id not in existing_oka: - self.db.set_one( - "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"} - ) - - return - - def delete_profile_ksu(self, _id, profile_type): - filter_q = {"profile": {"_id": _id, "profile_type": profile_type}} - ksu_list = self.db.get_list("ksus", filter_q) - for ksu_data in ksu_list: - self.delete_ksu_dependency(ksu_data["_id"], ksu_data) - - if ksu_list: - self.db.del_list("ksus", filter_q) - return - - def cluster_kubectl(self, db_cluster): - cluster_kubeconfig = db_cluster["credentials"] - kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml" - with open(kubeconfig_path, "w") as kubeconfig_file: - yaml.safe_dump(cluster_kubeconfig, kubeconfig_file) - return Kubectl(config_file=kubeconfig_path) - - class NodeGroupLcm(GitOpsLcm): db_collection = "nodegroups" @@ -405,7 +85,7 @@ class NodeGroupLcm(GitOpsLcm): } self.logger.info(f"Workflow content: {workflow_content}") - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "add_nodegroup", op_id, op_params, workflow_content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -502,7 +182,7 @@ class NodeGroupLcm(GitOpsLcm): } self.logger.info(f"Workflow content: {workflow_content}") - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "scale_nodegroup", op_id, op_params, workflow_content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -588,7 +268,7 @@ class NodeGroupLcm(GitOpsLcm): workflow_content = {"nodegroup": db_nodegroup, "cluster": db_cluster} - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_nodegroup", op_id, op_params, workflow_content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -713,7 +393,7 @@ class ClusterLcm(GitOpsLcm): db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]}) workflow_content["vim_account"] = db_vim - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "create_cluster", op_id, op_params, workflow_content ) if not workflow_res: @@ -998,7 +678,7 @@ class ClusterLcm(GitOpsLcm): self.logger.debug(f"Exception: {e}", exc_info=True) raise e - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_cluster", op_id, op_params, workflow_content ) if not workflow_res: @@ -1197,7 +877,7 @@ class ClusterLcm(GitOpsLcm): # content["profile"] = db_profile workflow_content["profile"] = db_profile - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "attach_profile_to_cluster", op_id, op_params, workflow_content ) if not workflow_res: @@ -1284,7 +964,7 @@ class ClusterLcm(GitOpsLcm): db_profile["profile_type"] = profile_type workflow_content["profile"] = db_profile - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "detach_profile_from_cluster", op_id, op_params, workflow_content ) if not workflow_res: @@ -1367,7 +1047,7 @@ class ClusterLcm(GitOpsLcm): "cluster": db_cluster_copy, } - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "register_cluster", op_id, op_params, workflow_content ) if not workflow_res: @@ -1503,7 +1183,7 @@ class ClusterLcm(GitOpsLcm): "cluster": self.decrypted_copy(db_cluster), } - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "deregister_cluster", op_id, op_params, workflow_content ) if not workflow_res: @@ -1603,7 +1283,7 @@ class ClusterLcm(GitOpsLcm): db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]}) workflow_content["vim_account"] = db_vim - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "update_cluster", op_id, op_params, workflow_content ) if not workflow_res: @@ -1795,7 +1475,7 @@ class CloudCredentialsLcm(GitOpsLcm): salt=vim_id, ) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "create_cloud_credentials", op_id, op_params, db_content ) @@ -1848,7 +1528,7 @@ class CloudCredentialsLcm(GitOpsLcm): salt=vim_id, ) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "update_cloud_credentials", op_id, op_params, db_content ) workflow_status, workflow_msg = await self.odu.check_workflow_status( @@ -1884,7 +1564,7 @@ class CloudCredentialsLcm(GitOpsLcm): op_params = params db_content = self.db.get_one("vim_accounts", {"_id": vim_id}) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_cloud_credentials", op_id, op_params, db_content ) workflow_status, workflow_msg = await self.odu.check_workflow_status( @@ -1919,7 +1599,7 @@ class K8sAppLcm(GitOpsLcm): super().__init__(msg, lcm_tasks, config) async def create(self, params, order_id): - self.logger.info("App Create Enter") + self.logger.info("App Profile Create Enter") op_id = params["operation_id"] profile_id = params["profile_id"] @@ -1932,7 +1612,7 @@ class K8sAppLcm(GitOpsLcm): op_params = self.get_operation_params(content, op_id) self.db.set_one("k8sapp", {"_id": content["_id"]}, content) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "create_profile", op_id, op_params, content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -1946,7 +1626,9 @@ class K8sAppLcm(GitOpsLcm): "create_profile", op_id, op_params, content ) self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) - self.logger.info(f"App Create Exit with resource status: {resource_status}") + self.logger.info( + f"App Profile Create Exit with resource status: {resource_status}" + ) return async def delete(self, params, order_id): @@ -1961,7 +1643,7 @@ class K8sAppLcm(GitOpsLcm): content = self.db.get_one("k8sapp", {"_id": profile_id}) op_params = self.get_operation_params(content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_profile", op_id, op_params, content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -1990,7 +1672,9 @@ class K8sAppLcm(GitOpsLcm): self.delete_profile_ksu(profile_id, profile_type) self.db.set_one(self.db_collection, {"_id": content["_id"]}, content) self.db.del_one(self.db_collection, {"_id": content["_id"]}) - self.logger.info(f"App Delete Exit with resource status: {resource_status}") + self.logger.info( + f"App Profile Delete Exit with resource status: {resource_status}" + ) return @@ -2006,7 +1690,7 @@ class K8sResourceLcm(GitOpsLcm): super().__init__(msg, lcm_tasks, config) async def create(self, params, order_id): - self.logger.info("Resource Create Enter") + self.logger.info("Resource Profile Create Enter") op_id = params["operation_id"] profile_id = params["profile_id"] @@ -2019,7 +1703,7 @@ class K8sResourceLcm(GitOpsLcm): op_params = self.get_operation_params(content, op_id) self.db.set_one("k8sresource", {"_id": content["_id"]}, content) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "create_profile", op_id, op_params, content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -2050,7 +1734,7 @@ class K8sResourceLcm(GitOpsLcm): content = self.db.get_one("k8sresource", {"_id": profile_id}) op_params = self.get_operation_params(content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_profile", op_id, op_params, content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -2096,7 +1780,7 @@ class K8sInfraControllerLcm(GitOpsLcm): super().__init__(msg, lcm_tasks, config) async def create(self, params, order_id): - self.logger.info("Infra controller Create Enter") + self.logger.info("Infra controller Profile Create Enter") op_id = params["operation_id"] profile_id = params["profile_id"] @@ -2109,7 +1793,7 @@ class K8sInfraControllerLcm(GitOpsLcm): op_params = self.get_operation_params(content, op_id) self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "create_profile", op_id, op_params, content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -2140,7 +1824,7 @@ class K8sInfraControllerLcm(GitOpsLcm): content = self.db.get_one("k8sinfra_controller", {"_id": profile_id}) op_params = self.get_operation_params(content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_profile", op_id, op_params, content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -2186,7 +1870,7 @@ class K8sInfraConfigLcm(GitOpsLcm): super().__init__(msg, lcm_tasks, config) async def create(self, params, order_id): - self.logger.info("Infra config Create Enter") + self.logger.info("Infra config Profile Create Enter") op_id = params["operation_id"] profile_id = params["profile_id"] @@ -2199,7 +1883,7 @@ class K8sInfraConfigLcm(GitOpsLcm): op_params = self.get_operation_params(content, op_id) self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "create_profile", op_id, op_params, content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -2230,7 +1914,7 @@ class K8sInfraConfigLcm(GitOpsLcm): content = self.db.get_one("k8sinfra_config", {"_id": profile_id}) op_params = self.get_operation_params(content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_profile", op_id, op_params, content ) self.logger.info("workflow_name is: {}".format(workflow_name)) @@ -2284,7 +1968,7 @@ class OkaLcm(GitOpsLcm): db_content = self.db.get_one(self.db_collection, {"_id": oka_id}) op_params = self.get_operation_params(db_content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "create_oka", op_id, op_params, db_content ) @@ -2316,7 +2000,7 @@ class OkaLcm(GitOpsLcm): db_content = self.db.get_one(self.db_collection, {"_id": oka_id}) op_params = self.get_operation_params(db_content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "update_oka", op_id, op_params, db_content ) workflow_status = await self.check_workflow_and_update_db( @@ -2346,7 +2030,7 @@ class OkaLcm(GitOpsLcm): db_content = self.db.get_one(self.db_collection, {"_id": oka_id}) op_params = self.get_operation_params(db_content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_oka", op_id, op_params, db_content ) workflow_status = await self.check_workflow_and_update_db( @@ -2451,7 +2135,7 @@ class KsuLcm(GitOpsLcm): op_params.append(ksu_params) # A single workflow is launched for all KSUs - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "create_ksus", op_id, op_params, db_content ) # Update workflow status in all KSUs @@ -2519,7 +2203,7 @@ class KsuLcm(GitOpsLcm): ] = f"{oka_type}/{db_oka['git_name']}/templates" op_params.append(ksu_params) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "update_ksus", op_id, op_params, db_content ) @@ -2572,7 +2256,7 @@ class KsuLcm(GitOpsLcm): ksu_params["profile"]["name"] = db_profile["name"] op_params.append(ksu_params) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "delete_ksus", op_id, op_params, db_content ) @@ -2610,7 +2294,7 @@ class KsuLcm(GitOpsLcm): self.initialize_operation(ksus_id, op_id) db_content = self.db.get_one(self.db_collection, {"_id": ksus_id}) op_params = self.get_operation_params(db_content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "clone_ksus", op_id, op_params, db_content ) @@ -2634,7 +2318,7 @@ class KsuLcm(GitOpsLcm): self.initialize_operation(ksus_id, op_id) db_content = self.db.get_one(self.db_collection, {"_id": ksus_id}) op_params = self.get_operation_params(db_content, op_id) - workflow_res, workflow_name = await self.odu.launch_workflow( + workflow_res, workflow_name, _ = await self.odu.launch_workflow( "move_ksus", op_id, op_params, db_content ) @@ -2753,3 +2437,309 @@ class KsuLcm(GitOpsLcm): self.logger.error(e) return False, f"Error checking KSU in cluster {db_cluster['name']}." return True, "OK" + + +class AppInstanceLcm(GitOpsLcm): + db_collection = "appinstances" + + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + super().__init__(msg, lcm_tasks, config) + self._workflows = { + "create_app": { + "check_resource_function": self.check_create_app, + }, + "update_app": { + "check_resource_function": self.check_update_app, + }, + "delete_app": { + "check_resource_function": self.check_delete_app, + }, + } + + def get_dbclusters_from_profile(self, profile_id, profile_type): + cluster_list = [] + db_clusters = self.db.get_list("clusters") + self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}") + for db_cluster in db_clusters: + if profile_id in db_cluster.get(profile_type, []): + self.logger.info( + f"Profile {profile_id} found in cluster {db_cluster['name']}" + ) + cluster_list.append(db_cluster) + return cluster_list + + def update_app_dependency(self, app_id, db_app): + self.logger.info(f"Updating AppInstance dependencies for AppInstance {app_id}") + oka_id = db_app.get("oka") + if not oka_id: + self.logger.info(f"No OKA associated with AppInstance {app_id}") + return + + used_oka = [] + all_apps = self.db.get_list(self.db_collection, {}) + for app in all_apps: + if app["_id"] != app_id: + app_oka_id = app["oka"] + if app_oka_id not in used_oka: + used_oka.append(app_oka_id) + self.logger.info(f"Used OKA: {used_oka}") + + if oka_id not in used_oka: + self.db.set_one( + "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"} + ) + return + + async def generic_operation(self, params, order_id, operation_name): + self.logger.info(f"Generic operation. Operation name: {operation_name}") + # self.logger.debug(f"Params: {params}") + try: + op_id = params["operation_id"] + app_id = params["appinstance"] + self.initialize_operation(app_id, op_id) + db_app = self.db.get_one(self.db_collection, {"_id": app_id}) + # self.logger.debug("Db App: {}".format(db_app)) + + # Initialize workflow_content with a copy of the db_app, decrypting fields to use in workflows + db_app_copy = self.decrypted_copy(db_app) + workflow_content = { + "app": db_app_copy, + } + + # Update workflow_content with profile info + profile_type = db_app["profile_type"] + profile_id = db_app["profile"] + profile_collection = self.profile_collection_mapping[profile_type] + db_profile = self.db.get_one(profile_collection, {"_id": profile_id}) + # db_profile is decrypted inline + # No need to use decrypted_copy because db_profile won't be updated. + self.decrypt_age_keys(db_profile) + workflow_content["profile"] = db_profile + + op_params = self.get_operation_params(db_app, op_id) + if not op_params: + op_params = {} + self.logger.debug("Operation Params: {}".format(op_params)) + + # Get SW catalog path from op_params or from DB + aux_dict = {} + if operation_name == "create_app": + aux_dict = op_params + else: + aux_dict = db_app + sw_catalog_path = "" + if "sw_catalog_path" in aux_dict: + sw_catalog_path = aux_dict.get("sw_catalog_path", "") + elif "oka" in aux_dict: + oka_id = aux_dict["oka"] + db_oka = self.db.get_one("okas", {"_id": oka_id}) + oka_type = MAP_PROFILE[ + db_oka.get("profile_type", "infra_controller_profiles") + ] + sw_catalog_path = f"{oka_type}/{db_oka['git_name'].lower()}" + else: + self.logger.error("SW Catalog path could not be determined.") + raise LcmException("SW Catalog path could not be determined.") + self.logger.debug(f"SW Catalog path: {sw_catalog_path}") + + # Get model from Git repo + # Clone the SW catalog repo + repodir = self.cloneGitRepo( + repo_url=self._full_repo_sw_catalogs_url, branch="main" + ) + model_file_path = os.path.join(repodir, sw_catalog_path, "model.yaml") + if not os.path.exists(model_file_path): + self.logger.error(f"Model file not found at path: {model_file_path}") + raise LcmException(f"Model file not found at path: {model_file_path}") + # Store the model content in workflow_content + with open(model_file_path) as model_file: + workflow_content["model"] = yaml.safe_load(model_file.read()) + + # A single workflow is launched for the App operation + self.logger.debug("Launching workflow {}".format(operation_name)) + ( + workflow_res, + workflow_name, + workflow_resources, + ) = await self.odu.launch_workflow( + operation_name, op_id, op_params, workflow_content + ) + + if not workflow_res: + self.logger.error(f"Failed to launch workflow: {workflow_name}") + if operation_name == "create_app": + db_app["state"] = "FAILED_CREATION" + elif operation_name == "delete_app": + db_app["state"] = "FAILED_DELETION" + db_app["resourceState"] = "ERROR" + db_app = self.update_operation_history( + db_app, op_id, workflow_status=False, resource_status=None + ) + self.db.set_one(self.db_collection, {"_id": db_app["_id"]}, db_app) + # Clean items used in the workflow, no matter if the workflow succeeded + clean_status, clean_msg = await self.odu.clean_items_workflow( + operation_name, op_id, op_params, workflow_content + ) + self.logger.info( + f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" + ) + return + + # Update resources created in workflow + db_app["app_model"] = workflow_resources.get("app_model", {}) + + # Update workflow status in App + workflow_status = await self.check_workflow_and_update_db( + op_id, workflow_name, db_app + ) + # Update resource status in DB + if workflow_status: + resource_status, db_app = await self.check_resource_and_update_db( + operation_name, op_id, op_params, db_app + ) + else: + resource_status = False + self.db.set_one(self.db_collection, {"_id": db_app["_id"]}, db_app) + + # Clean items used in the workflow, no matter if the workflow succeeded + clean_status, clean_msg = await self.odu.clean_items_workflow( + operation_name, op_id, op_params, workflow_content + ) + self.logger.info( + f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" + ) + + if operation_name == "delete_app": + force = params.get("force", False) + if force: + force_delete_status = self.check_force_delete_and_delete_from_db( + db_app["_id"], workflow_status, resource_status, force + ) + if force_delete_status: + return + if resource_status: + db_app["state"] == "DELETED" + self.update_app_dependency(db_app["_id"], db_app) + self.db.del_one(self.db_collection, {"_id": db_app["_id"]}) + + self.logger.info( + f"Generic app operation Exit {operation_name} with resource Status {resource_status}" + ) + return + except Exception as e: + self.logger.debug(traceback.format_exc()) + self.logger.debug(f"Exception: {e}", exc_info=True) + return + + async def create(self, params, order_id): + self.logger.info("App Create Enter") + return await self.generic_operation(params, order_id, "create_app") + + async def update(self, params, order_id): + self.logger.info("App Edit Enter") + return await self.generic_operation(params, order_id, "update_app") + + async def delete(self, params, order_id): + self.logger.info("App Delete Enter") + return await self.generic_operation(params, order_id, "delete_app") + + async def check_appinstance(self, op_id, op_params, content, deleted=False): + self.logger.info( + f"check_app_instance Operation {op_id}. Params: {op_params}. Deleted: {deleted}" + ) + self.logger.debug(f"Content: {content}") + db_app = content + profile_id = db_app["profile"] + profile_type = db_app["profile_type"] + app_name = db_app["name"] + self.logger.info( + f"Checking status of AppInstance {app_name} for profile {profile_id}." + ) + + # TODO: read app_model and get kustomization name and namespace + # app_model = db_app.get("app_model", {}) + kustomization_list = [ + { + "name": f"jenkins-{app_name}", + "namespace": "flux-system", + } + ] + checkings_list = [] + if deleted: + for kustomization in kustomization_list: + checkings_list.append( + { + "item": "kustomization", + "name": kustomization["name"].lower(), + "namespace": kustomization["namespace"], + "deleted": True, + "timeout": self._checkloop_kustomization_timeout, + "enable": True, + "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED", + } + ) + else: + for kustomization in kustomization_list: + checkings_list.append( + { + "item": "kustomization", + "name": kustomization["name"].lower(), + "namespace": kustomization["namespace"], + "condition": { + "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status", + "value": "True", + }, + "timeout": self._checkloop_kustomization_timeout, + "enable": True, + "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY", + } + ) + + dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type) + if not dbcluster_list: + self.logger.info(f"No clusters found for profile {profile_id}.") + for db_cluster in dbcluster_list: + try: + self.logger.info( + f"Checking status of AppInstance {app_name} in cluster {db_cluster['name']}." + ) + cluster_kubectl = self.cluster_kubectl(db_cluster) + result, message = await self.common_check_list( + op_id, + checkings_list, + self.db_collection, + db_app, + kubectl_obj=cluster_kubectl, + ) + if not result: + return False, message + except Exception as e: + self.logger.error( + f"Error checking AppInstance in cluster {db_cluster['name']}." + ) + self.logger.error(e) + return ( + False, + f"Error checking AppInstance in cluster {db_cluster['name']}.", + ) + return True, "OK" + + async def check_create_app(self, op_id, op_params, content): + self.logger.info(f"check_update_app Operation {op_id}. Params: {op_params}.") + # self.logger.debug(f"Content: {content}") + return await self.check_appinstance(op_id, op_params, content) + + async def check_update_app(self, op_id, op_params, content): + self.logger.info(f"check_update_app Operation {op_id}. Params: {op_params}.") + # self.logger.debug(f"Content: {content}") + return await self.check_appinstance(op_id, op_params, content) + + async def check_delete_app(self, op_id, op_params, content): + self.logger.info(f"check_delete_app Operation {op_id}. Params: {op_params}.") + # self.logger.debug(f"Content: {content}") + return await self.check_appinstance(op_id, op_params, content, deleted=True) diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 2d26e1b2..45fd9ad8 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -959,6 +959,27 @@ class Lcm: task = asyncio.ensure_future(self.ksu.move(params, order_id)) self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task) return + elif topic == "appinstance": + op_id = params["operation_id"] + appinstance_id = params["appinstance"] + if command == "create": + task = asyncio.ensure_future(self.appinstance.create(params, order_id)) + self.lcm_tasks.register( + "appinstance", appinstance_id, op_id, "app_create", task + ) + return + elif command == "update" or command == "updated": + task = asyncio.ensure_future(self.appinstance.update(params, order_id)) + self.lcm_tasks.register( + "appinstance", appinstance_id, op_id, "app_edit", task + ) + return + elif command == "delete": + task = asyncio.ensure_future(self.appinstance.delete(params, order_id)) + self.lcm_tasks.register( + "appinstance", appinstance_id, op_id, "app_delete", task + ) + return elif topic == "nodegroup": nodegroup_id = params["nodegroup_id"] op_id = params["operation_id"] @@ -1009,6 +1030,7 @@ class Lcm: "k8s_infra_config", "oka", "ksu", + "appinstance", "nodegroup", ) self.logger.debug( @@ -1095,6 +1117,9 @@ class Lcm: ) self.oka = k8s.OkaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.ksu = k8s.KsuLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.appinstance = k8s.AppInstanceLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) self.nodegroup = k8s.NodeGroupLcm( self.msg, self.lcm_tasks, self.main_config.to_dict() ) diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index d37b9b45..ef1200b1 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -504,6 +504,7 @@ class TaskRegistry(LcmBase): "k8s_infra_config", "oka", "ksu", + "appinstance", ] # Map topic to InstanceID @@ -526,6 +527,7 @@ class TaskRegistry(LcmBase): "k8s_infra_config": "k8sinfra_config", "oka": "oka", "ksu": "ksus", + "appinstance": "appinstances", } def __init__(self, worker_id=None, logger=None): @@ -546,6 +548,7 @@ class TaskRegistry(LcmBase): "oka": {}, "ksu": {}, "odu": {}, + "appinstance": {}, } self.worker_id = worker_id self.db = Database().instance.db diff --git a/osm_lcm/n2vc/kubectl.py b/osm_lcm/n2vc/kubectl.py index eac857a3..98c1240e 100644 --- a/osm_lcm/n2vc/kubectl.py +++ b/osm_lcm/n2vc/kubectl.py @@ -23,6 +23,7 @@ import json import yaml import tarfile import io +import os from time import sleep from distutils.version import LooseVersion @@ -1006,34 +1007,74 @@ class Kubectl: def copy_file_to_pod( self, namespace, pod_name, container_name, src_file, dest_path - ): - # Create an in-memory tar file containing the source file - tar_buffer = io.BytesIO() - with tarfile.open(fileobj=tar_buffer, mode="w") as tar: - tar.add(src_file, arcname=dest_path.split("/")[-1]) - - tar_buffer.seek(0) - - # Define the command to extract the tar file in the pod - exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]] - - # Execute the command - resp = stream( - self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec, - pod_name, - namespace, - command=exec_command, - container=container_name, - stdin=True, - stderr=True, - stdout=True, - tty=False, - _preload_content=False, - ) + ) -> bool: + try: + # Create the destination directory in the pod + dest_dir = os.path.dirname(dest_path) + if dest_dir: + self.logger.debug(f"Creating directory {dest_dir} in pod {pod_name}") + mkdir_command = ["mkdir", "-p", dest_dir] + + resp = stream( + self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec, + pod_name, + namespace, + command=mkdir_command, + container=container_name, + stdin=False, + stderr=True, + stdout=True, + tty=False, + _preload_content=False, + ) + resp.close() + self.logger.debug(f"Directory {dest_dir} created") - # Write the tar data to the pod - resp.write_stdin(tar_buffer.read()) - resp.close() + # Create an in-memory tar file containing the source file + self.logger.debug( + f"Creating in-memory tar of {src_file} as {dest_path.split('/')[-1]}" + ) + tar_buffer = io.BytesIO() + with tarfile.open(fileobj=tar_buffer, mode="w") as tar: + tar.add(src_file, arcname=dest_path.split("/")[-1]) + + tar_buffer.seek(0) + self.logger.debug( + f"Tar buffer created, size={tar_buffer.getbuffer().nbytes} bytes" + ) + + # Define the command to extract the tar file in the pod + exec_command = ["tar", "xvf", "-", "-C", dest_dir] + self.logger.debug(f"Exec command prepared: {exec_command}") + + # Execute the command + resp = stream( + self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec, + pod_name, + namespace, + command=exec_command, + container=container_name, + stdin=True, + stderr=True, + stdout=True, + tty=False, + _preload_content=False, + ) + self.logger.debug( + f"Started exec stream to pod {pod_name} (ns={namespace}, container={container_name})" + ) + + # Write the tar data to the pod + data = tar_buffer.read() + self.logger.debug(f"Writing {len(data)} bytes to pod stdin") + resp.write_stdin(data) + self.logger.debug("Data written to pod stdin") + resp.close() + self.logger.debug("Exec stream closed") + return True + except Exception as e: + self.logger.error(f"Failed to copy file {src_file} to pod: {e}") + return False @retry( attempts=10, @@ -1041,15 +1082,15 @@ class Kubectl: fallback=Exception("Failed creating the pvc"), ) async def create_pvc_with_content( - self, name: str, namespace: str, src_file: str, dest_filename: str + self, name: str, namespace: str, src_files: typing.List, dest_files: typing.List ): """ Create a PVC with content :param: name: Name of the pvc to be created :param: namespace: Name of the namespace where the pvc will be stored - :param: src_file: File to be copied - :param: filename: Name of the file in the destination folder + :param: src_files: List of source files to be copied + :param: dest_files: List of destination filenames (paired with src_files) """ pod_name = f"copy-pod-{name}" self.logger.debug(f"Creating pvc {name}") @@ -1061,13 +1102,24 @@ class Kubectl: self.logger.debug("Sleeping") sleep(40) self.logger.debug(f"Copying files to pod {pod_name}") - self.copy_file_to_pod( - namespace=namespace, - pod_name=pod_name, - container_name="copy-container", - src_file=src_file, - dest_path=f"/mnt/data/{dest_filename}", - ) + for src_f, dest_f in zip(src_files, dest_files): + dest_path = f"/mnt/data/{dest_f}" + self.logger.debug(f"Copying file {src_f} to {dest_path} in pod {pod_name}") + result = self.copy_file_to_pod( + namespace=namespace, + pod_name=pod_name, + container_name="copy-container", + src_file=src_f, + dest_path=dest_path, + ) + if result: + self.logger.debug( + f"Successfully copied file {src_f} to {dest_path} in pod {pod_name}" + ) + else: + raise Exception( + f"Failed copying file {src_f} to {dest_path} in pod {pod_name}" + ) self.logger.debug(f"Deleting pod {pod_name}") await self.delete_pod(pod_name, namespace) diff --git a/osm_lcm/odu_libs/app.py b/osm_lcm/odu_libs/app.py new file mode 100644 index 00000000..b23fb2a7 --- /dev/null +++ b/osm_lcm/odu_libs/app.py @@ -0,0 +1,303 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### + + +import yaml +import tempfile +import os + + +MAP_PROFILE = { + "infra_controller_profiles": "infra-controller-profiles", + "infra_config_profiles": "infra-config-profiles", + "resource_profiles": "managed-resources", + "app_profiles": "app-profiles", +} + + +def merge_model(base, override): + """Recursively merge override dictionary into base dictionary.""" + merge_model = base.copy() + for k, v in override.get("spec", {}).items(): + if k != "ksus": + merge_model["spec"][k] = v + for ksu_override in override.get("spec", {}).get("ksus", []): + for ksu_base in merge_model.get("spec", {}).get("ksus", []): + if ksu_base.get("name") == ksu_override.get("name"): + for k, v in ksu_override.items(): + if k != "patterns": + ksu_base[k] = v + continue + for pattern_override in ksu_override.get("patterns", []): + for pattern_base in ksu_base.get("patterns", []): + if pattern_base.get("name") == pattern_override.get("name"): + for kp, vp in pattern_override.items(): + if kp != "bricks": + pattern_base[kp] = vp + continue + for brick_override in pattern_override.get( + "bricks", [] + ): + for brick_base in pattern_base.get( + "bricks", [] + ): + if brick_base.get( + "name" + ) == brick_override.get("name"): + for kb, vb in brick_override.items(): + if kb != "hrset-values": + brick_base[kb] = vb + continue + for ( + hrset_override + ) in brick_override.get( + "hrset-values", [] + ): + for ( + hrset_base + ) in brick_base.get( + "hrset-values", [] + ): + if hrset_base.get( + "name" + ) == hrset_override.get( + "name" + ): + hrset_base |= ( + hrset_override + ) + break + else: + brick_base[ + "hrset-values" + ].append(hrset_override) + break + else: + pattern_base["bricks"].append( + brick_override + ) + break + else: + ksu_base["patterns"].append(pattern_override) + break + else: + merge_model["spec"]["ksus"].append(ksu_override) + return merge_model + + +async def launch_app(self, op_id, op_params, workflow_content, operation_type): + self.logger.info( + f"launch_app Enter. Operation {op_id}. Operation Type: {operation_type}" + ) + # self.logger.debug(f"Operation Params: {op_params}") + # self.logger.debug(f"Content: {workflow_content}") + + db_app = workflow_content["app"] + db_profile = workflow_content.get("profile") + + profile_t = db_app.get("profile_type") + profile_type = MAP_PROFILE[profile_t] + profile_name = db_profile.get("git_name").lower() + app_name = db_app["git_name"].lower() + app_command = f"app {operation_type} $environment" + age_public_key = db_profile.get("age_pubkey") + + sw_catalog_model = workflow_content.get("model") + self.logger.debug(f"SW catalog model: {sw_catalog_model}") + + # Update the app model, extending it also with the model from op_params + if operation_type == "update": + model = op_params.get("model", db_app.get("app_model", {})) + else: + model = op_params.get("model", {}) + app_model = merge_model(sw_catalog_model, model) + + app_model["kind"] = "AppInstantiation" + app_model["metadata"]["name"] = app_name + for ksu in app_model.get("spec", {}).get("ksus", []): + for pattern in ksu.get("patterns", []): + for brick in pattern.get("bricks", []): + brick["public-age-key"] = age_public_key + self.logger.debug(f"App model: {app_model}") + + if operation_type == "update": + params = op_params.get("params", db_app.get("params", {})) + else: + params = op_params.get("params", {}) + params["PROFILE_TYPE"] = profile_type + params["PROFILE_NAME"] = profile_name + params["APPNAME"] = app_name + self.logger.debug(f"Params: {params}") + + if operation_type == "update": + secret_params = op_params.get("secret_params", db_app.get("secret_params", {})) + else: + secret_params = op_params.get("secret_params", {}) + self.logger.debug(f"Secret Params: {secret_params}") + + # Create temporary folder for the app model and the parameters + temp_dir = tempfile.mkdtemp(prefix=f"app-{operation_type}-{op_id}-") + self.logger.debug(f"Temporary dir created: {temp_dir}") + with open(f"{temp_dir}/app_instance_model.yaml", "w") as f: + yaml.safe_dump( + app_model, f, indent=2, default_flow_style=False, sort_keys=False + ) + + os.makedirs(f"{temp_dir}/parameters/clear", exist_ok=True) + with open(f"{temp_dir}/parameters/clear/environment.yaml", "w") as f: + yaml.safe_dump(params, f, indent=2, default_flow_style=False, sort_keys=False) + + # Create PVC and copy app model and parameters to PVC + app_model_pvc = f"temp-pvc-app-{op_id}" + src_files = [ + f"{temp_dir}/app_instance_model.yaml", + f"{temp_dir}/parameters/clear/environment.yaml", + ] + dest_files = [ + "app_instance_model.yaml", + "parameters/clear/environment.yaml", + ] + self.logger.debug( + f"Copying files to PVC {app_model_pvc}: {src_files} -> {dest_files}" + ) + await self._kubectl.create_pvc_with_content( + name=app_model_pvc, + namespace="osm-workflows", + src_files=src_files, + dest_files=dest_files, + ) + + # Create secret with secret_params + secret_name = f"secret-app-{op_id}" + secret_namespace = "osm-workflows" + secret_key = "environment.yaml" + secret_value = yaml.safe_dump( + secret_params, indent=2, default_flow_style=False, sort_keys=False + ) + try: + self.logger.debug(f"Testing kubectl: {self._kubectl}") + self.logger.debug( + f"Testing kubectl configuration: {self._kubectl.configuration}" + ) + self.logger.debug( + f"Testing kubectl configuration Host: {self._kubectl.configuration.host}" + ) + self.logger.debug( + f"Creating secret {secret_name} in namespace {secret_namespace}" + ) + await self.create_secret( + secret_name, + secret_namespace, + secret_key, + secret_value, + ) + except Exception as e: + self.logger.info( + f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}" + ) + return ( + False, + f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}", + ) + + # Create workflow to launch the app + workflow_template = "launcher-app.j2" + workflow_name = f"{operation_type}-app-{op_id}" + # Additional params for the workflow + osm_project_name = workflow_content.get("project_name", "osm_admin") + + # Render workflow + manifest = self.render_jinja_template( + workflow_template, + output_file=None, + workflow_name=workflow_name, + app_command=app_command, + app_model_pvc=app_model_pvc, + app_secret_name=secret_name, + git_fleet_url=self._repo_fleet_url, + git_sw_catalogs_url=self._repo_sw_catalogs_url, + app_name=app_name, + profile_name=profile_name, + profile_type=profile_type, + osm_project_name=osm_project_name, + workflow_debug=self._workflow_debug, + workflow_dry_run=self._workflow_dry_run, + ) + self.logger.debug(f"Workflow manifest: {manifest}") + + # Submit workflow + self._kubectl.create_generic_object( + namespace="osm-workflows", + manifest_dict=yaml.safe_load(manifest), + api_group="argoproj.io", + api_plural="workflows", + api_version="v1alpha1", + ) + workflow_resources = { + "app_model": app_model, + "secret_params": secret_params, + "params": params, + } + return True, workflow_name, workflow_resources + + +async def create_app(self, op_id, op_params, content): + self.logger.info(f"create_app Enter. Operation {op_id}") + # self.logger.debug(f"Operation Params: {op_params}") + # self.logger.debug(f"Content: {workflow_content}") + return await self.launch_app(op_id, op_params, content, "create") + + +async def update_app(self, op_id, op_params, content): + self.logger.info(f"update_app Enter. Operation {op_id}") + # self.logger.debug(f"Operation Params: {op_params}") + # self.logger.debug(f"Content: {workflow_content}") + return await self.launch_app(op_id, op_params, content, "update") + + +async def delete_app(self, op_id, op_params, content): + self.logger.info(f"delete_app Enter. Operation {op_id}") + # self.logger.debug(f"Operation Params: {op_params}") + # self.logger.debug(f"Content: {workflow_content}") + return await self.launch_app(op_id, op_params, content, "delete") + + +async def clean_items_app_launch(self, op_id, op_params, workflow_content): + self.logger.info(f"clean_items_app_launch Enter. Operation {op_id}") + # self.logger.debug(f"Operation Params: {op_params}") + # self.logger.debug(f"Content: {workflow_content}") + try: + secret_name = f"secret-app-{op_id}" + volume_name = f"temp-pvc-app-{op_id}" + items = { + "secrets": [ + { + "name": secret_name, + "namespace": "osm-workflows", + } + ], + "pvcs": [ + { + "name": volume_name, + "namespace": "osm-workflows", + } + ], + } + await self.clean_items(items) + return True, "OK" + except Exception as e: + return False, f"Error while cleaning items: {e}" diff --git a/osm_lcm/odu_libs/cluster_mgmt.py b/osm_lcm/odu_libs/cluster_mgmt.py index 7f35c9d7..9c0cbafb 100644 --- a/osm_lcm/odu_libs/cluster_mgmt.py +++ b/osm_lcm/odu_libs/cluster_mgmt.py @@ -73,7 +73,7 @@ async def create_cluster(self, op_id, op_params, content): ) except Exception as e: self.logger.info(f"Cannot create secret {secret_name}: {e}") - return False, f"Cannot create secret {secret_name}: {e}" + return False, f"Cannot create secret {secret_name}: {e}", None # Additional params for the workflow cluster_kustomization_name = cluster_name @@ -119,7 +119,7 @@ async def create_cluster(self, op_id, op_params, content): ) except Exception as e: self.logger.info(f"Cannot create configmap {configmap_name}: {e}") - return False, f"Cannot create configmap {configmap_name}: {e}" + return False, f"Cannot create configmap {configmap_name}: {e}", None # Render workflow # workflow_kwargs = { @@ -174,7 +174,7 @@ async def create_cluster(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def update_cluster(self, op_id, op_params, content): @@ -215,7 +215,7 @@ async def update_cluster(self, op_id, op_params, content): ) except Exception as e: self.logger.info(f"Cannot create secret {secret_name}: {e}") - return False, f"Cannot create secret {secret_name}: {e}" + return False, f"Cannot create secret {secret_name}: {e}", None # Additional params for the workflow cluster_kustomization_name = cluster_name @@ -274,7 +274,7 @@ async def update_cluster(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def delete_cluster(self, op_id, op_params, content): @@ -320,7 +320,7 @@ async def delete_cluster(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def register_cluster(self, op_id, op_params, content): @@ -364,6 +364,7 @@ async def register_cluster(self, op_id, op_params, content): return ( False, f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}", + None, ) # Create secret with kubeconfig @@ -394,6 +395,7 @@ async def register_cluster(self, op_id, op_params, content): return ( False, f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}", + None, ) # Additional params for the workflow @@ -441,7 +443,7 @@ async def register_cluster(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def deregister_cluster(self, op_id, op_params, content): @@ -486,7 +488,7 @@ async def deregister_cluster(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def get_cluster_credentials(self, db_cluster): diff --git a/osm_lcm/odu_libs/ksu.py b/osm_lcm/odu_libs/ksu.py index e7c2f82d..c7fd50da 100644 --- a/osm_lcm/odu_libs/ksu.py +++ b/osm_lcm/odu_libs/ksu.py @@ -177,7 +177,7 @@ async def create_ksus(self, op_id, op_params_list, content_list): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def update_ksus(self, op_id, op_params_list, content_list): @@ -328,7 +328,7 @@ async def update_ksus(self, op_id, op_params_list, content_list): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def delete_ksus(self, op_id, op_params_list, content_list): @@ -374,21 +374,21 @@ async def delete_ksus(self, op_id, op_params_list, content_list): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def clone_ksu(self, op_id, op_params, content): self.logger.info(f"clone_ksu Enter. Operation {op_id}. Params: {op_params}") # self.logger.debug(f"Content: {content}") workflow_name = f"clone-ksu-{content['_id']}" - return True, workflow_name + return True, workflow_name, None async def move_ksu(self, op_id, op_params, content): self.logger.info(f"move_ksu Enter. Operation {op_id}. Params: {op_params}") # self.logger.debug(f"Content: {content}") workflow_name = f"move-ksu-{content['_id']}" - return True, workflow_name + return True, workflow_name, None async def clean_items_ksu_create(self, op_id, op_params_list, content_list): diff --git a/osm_lcm/odu_libs/nodegroup.py b/osm_lcm/odu_libs/nodegroup.py index ff40f9c8..d7038583 100644 --- a/osm_lcm/odu_libs/nodegroup.py +++ b/osm_lcm/odu_libs/nodegroup.py @@ -60,7 +60,7 @@ async def add_nodegroup(self, op_id, op_params, content): ) except Exception as e: self.logger.info(f"Cannot create secret {secret_name}: {e}") - return False, f"Cannot create secret {secret_name}: {e}" + return False, f"Cannot create secret {secret_name}: {e}", None private_subnet = op_params.get("private_subnet", []) public_subnet = op_params.get("public_subnet", []) @@ -78,7 +78,7 @@ async def add_nodegroup(self, op_id, op_params, content): ) except Exception as e: self.logger.info(f"Cannot create configmap {configmap_name}: {e}") - return False, f"Cannot create configmap {configmap_name}: {e}" + return False, f"Cannot create configmap {configmap_name}: {e}", None # Additional params for the workflow nodegroup_kustomization_name = nodegroup_name @@ -135,7 +135,7 @@ async def add_nodegroup(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def scale_nodegroup(self, op_id, op_params, content): @@ -170,7 +170,7 @@ async def scale_nodegroup(self, op_id, op_params, content): ) except Exception as e: self.logger.info(f"Cannot create secret {secret_name}: {e}") - return False, f"Cannot create secret {secret_name}: {e}" + return False, f"Cannot create secret {secret_name}: {e}", None # Additional params for the workflow nodegroup_kustomization_name = nodegroup_name @@ -213,7 +213,7 @@ async def scale_nodegroup(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def delete_nodegroup(self, op_id, op_params, content): @@ -254,7 +254,7 @@ async def delete_nodegroup(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def clean_items_nodegroup_add(self, op_id, op_params, content): diff --git a/osm_lcm/odu_libs/oka.py b/osm_lcm/odu_libs/oka.py index 63c4d366..564acbe9 100644 --- a/osm_lcm/odu_libs/oka.py +++ b/osm_lcm/odu_libs/oka.py @@ -55,8 +55,8 @@ async def create_oka(self, op_id, op_params, content): await self._kubectl.create_pvc_with_content( name=temp_volume_name, namespace="osm-workflows", - src_file=f"{oka_folder}/{oka_filename}", - dest_filename=f"{oka_name}.tar.gz", + src_files=[f"{oka_folder}/{oka_filename}"], + dest_files=[f"{oka_name}.tar.gz"], ) # Render workflow @@ -83,7 +83,7 @@ async def create_oka(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def update_oka(self, op_id, op_params, content): @@ -111,8 +111,8 @@ async def update_oka(self, op_id, op_params, content): await self._kubectl.create_pvc_with_content( name=temp_volume_name, namespace="osm-workflows", - src_folder=oka_folder, - filename=oka_filename, + src_files=[f"{oka_folder}/{oka_filename}"], + dest_files=[f"{oka_name}.tar.gz"], ) # Render workflow @@ -139,7 +139,7 @@ async def update_oka(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def delete_oka(self, op_id, op_params, content): @@ -178,7 +178,7 @@ async def delete_oka(self, op_id, op_params, content): api_plural="workflows", api_version="v1alpha1", ) - return True, workflow_name + return True, workflow_name, None async def clean_items_oka_create(self, op_id, op_params_list, content_list): diff --git a/osm_lcm/odu_libs/templates/launcher-app.j2 b/osm_lcm/odu_libs/templates/launcher-app.j2 new file mode 100644 index 00000000..87b52d75 --- /dev/null +++ b/osm_lcm/odu_libs/templates/launcher-app.j2 @@ -0,0 +1,87 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: {{ workflow_name }} +spec: + arguments: + parameters: + # Full OSM SDK command to execute + - name: command + description: | + Full command string to execute with the OSM SDK. + Examples: + - "app create $environment": Deploy new application instance + - "app update $environment": Update existing application instance + - "app delete $environment": Remove application instance + This parameter accepts any valid OSM SDK command for maximum flexibility. + value: {{ app_command }} + # Volume reference for application models and parameters + # NOTE: The PVC must be created and populated with the following structure: + # /model/app_instance_model.yaml + # /model/parameters/clear/environment.yaml + - name: model_volume_name + value: "{{ app_model_pvc }}" + # Secret reference for mounting sensitive parameters + # This secret will be mounted as a file at: + # /model/parameters/secret/environment.yaml + - name: secret_name + value: "{{ app_secret_name }}" + # Fleet repo configuration + - name: git_fleet_url + value: "{{ git_fleet_url }}" + - name: fleet_destination_folder + value: "/fleet/fleet-osm" + - name: git_fleet_cred_secret + value: fleet-repo + # SW-Catalogs repo configuration + - name: git_sw_catalogs_url + value: "{{ git_sw_catalogs_url }}" + - name: sw_catalogs_destination_folder + value: "/sw-catalogs/sw-catalogs-osm" + - name: git_sw_catalogs_cred_secret + value: sw-catalogs + # Target deployment information + - name: app_name + value: "{{ app_name }}" + - name: profile_name + value: "{{ profile_name }}" + - name: profile_type + value: "{{ profile_type }}" + - name: project_name + value: "{{ osm_project_name }}" + # OSM SDK container configuration + - name: osm_sdk_image_repository + value: "opensourcemano/osm-nushell-krm-functions" + - name: osm_sdk_image_tag + value: "testing-daily" + # Debug and dry-run flags + - name: debug + value: "{{ workflow_debug }}" + - name: dry_run + value: "{{ workflow_dry_run }}" + + # Cleanup policy + ttlStrategy: + secondsAfterCompletion: 1800 # Time to live after workflow is completed + secondsAfterSuccess: 1800 # Time to live after workflow is successful + secondsAfterFailure: 1800 # Time to live after workflow fails + + workflowTemplateRef: + name: full-app-management-wft diff --git a/osm_lcm/odu_workflows.py b/osm_lcm/odu_workflows.py index e352883c..0c51b2d8 100644 --- a/osm_lcm/odu_workflows.py +++ b/osm_lcm/odu_workflows.py @@ -21,6 +21,7 @@ from osm_lcm.odu_libs import ( vim_mgmt as odu_vim_mgmt, cluster_mgmt as odu_cluster_mgmt, nodegroup as odu_nodegroup, + app as odu_app, ksu as odu_ksu, oka as odu_oka, profiles as odu_profiles, @@ -127,6 +128,18 @@ class OduWorkflow(LcmBase): "move_ksu": { "workflow_function": self.move_ksu, }, + "create_app": { + "workflow_function": self.create_app, + "clean_function": self.clean_items_app_launch, + }, + "update_app": { + "workflow_function": self.update_app, + "clean_function": self.clean_items_app_launch, + }, + "delete_app": { + "workflow_function": self.delete_app, + "clean_function": self.clean_items_app_launch, + }, "create_cloud_credentials": { "workflow_function": self.create_cloud_credentials, "clean_function": self.clean_items_cloud_credentials_create, @@ -211,19 +224,28 @@ class OduWorkflow(LcmBase): delete_secret = odu_common.delete_secret create_configmap = odu_common.create_configmap delete_configmap = odu_common.delete_configmap + create_app = odu_app.create_app + update_app = odu_app.update_app + delete_app = odu_app.delete_app + launch_app = odu_app.launch_app + clean_items_app_launch = odu_app.clean_items_app_launch async def launch_workflow(self, key, op_id, op_params, content): self.logger.info( - f"Workflow is getting into launch. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}" + f"Workflow is getting into launch. Key: {key}. Operation: {op_id}" ) + # self.logger.debug(f"Operation Params: {op_params}") + # self.logger.debug(f"Content: {content}") workflow_function = self._workflows[key]["workflow_function"] self.logger.info("workflow function : {}".format(workflow_function)) try: - result, workflow_name = await workflow_function(op_id, op_params, content) - return result, workflow_name + result, workflow_name, workflow_resources = await workflow_function( + op_id, op_params, content + ) + return result, workflow_name, workflow_resources except Exception as e: self.logger.error(f"Error launching workflow: {e}") - return False, str(e) + return False, str(e), None async def dummy_clean_items(self, op_id, op_params, content): self.logger.info( @@ -245,7 +267,7 @@ class OduWorkflow(LcmBase): async def dummy_operation(self, op_id, op_params, content): self.logger.info("Empty operation status Enter") self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}") - return content["workflow_name"] + return True, content["workflow_name"], None async def clean_items(self, items): # Delete pods diff --git a/requirements.in b/requirements.in index 79f5cfed..8a93de8e 100644 --- a/requirements.in +++ b/requirements.in @@ -17,6 +17,7 @@ async-timeout==4.0.3 charset-normalizer checksumdir config-man +GitPython google-auth<2.18.0 grpcio-tools grpclib diff --git a/requirements.txt b/requirements.txt index ea671286..6dea1f0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -69,6 +69,10 @@ frozenlist==1.8.0 # via # aiohttp # aiosignal +gitdb==4.0.12 + # via gitpython +gitpython==3.1.45 + # via -r requirements.in glom==24.11.0 # via config-man google-auth==2.17.3 @@ -194,6 +198,8 @@ six==1.17.0 # paramiko # pymacaroons # python-dateutil +smmap==5.0.2 + # via gitdb termcolor==3.2.0 # via fire theblues==0.5.2 -- 2.25.1