RUN apk add --no-cache \
bash \
curl \
+ git \
openssh-client \
openssh-keygen \
openssl
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import logging
+import tempfile
+from time import time
+import traceback
+from git import Repo
+from osm_lcm.lcm_utils import LcmBase
+from osm_lcm import odu_workflows
+from osm_lcm.data_utils.list_utils import find_in_list
+from osm_lcm.n2vc.kubectl import Kubectl
+import yaml
+from urllib.parse import quote
+
+
+class GitOpsLcm(LcmBase):
+ db_collection = "gitops"
+ workflow_status = None
+ resource_status = None
+
+ profile_collection_mapping = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "resource_profiles": "k8sresource",
+ "app_profiles": "k8sapp",
+ }
+
+ profile_type_mapping = {
+ "infra-controllers": "infra_controller_profiles",
+ "infra-configs": "infra_config_profiles",
+ "managed-resources": "resource_profiles",
+ "applications": "app_profiles",
+ }
+
+ def __init__(self, msg, lcm_tasks, config):
+ self.logger = logging.getLogger("lcm.gitops")
+ self.lcm_tasks = lcm_tasks
+ self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+ self._checkloop_kustomization_timeout = 900
+ self._checkloop_resource_timeout = 900
+ self._workflows = {}
+ self.gitops_config = config["gitops"]
+ self.logger.debug(f"GitOps config: {self.gitops_config}")
+ self._repo_base_url = self.gitops_config.get("git_base_url")
+ self._repo_user = self.gitops_config.get("user")
+ self._repo_sw_catalogs_url = self.gitops_config.get(
+ "sw_catalogs_repo_url",
+ f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
+ )
+ self._repo_password = self.gitops_config.get("password", "OUM+O61Iy1")
+ self._full_repo_sw_catalogs_url = self.build_git_url_with_credentials(
+ self._repo_sw_catalogs_url
+ )
+ super().__init__(msg, self.logger)
+
+ def build_git_url_with_credentials(self, repo_url):
+ # Build authenticated URL if credentials were provided
+ if self._repo_password:
+ # URL-safe escape password
+ safe_user = quote(self._repo_user)
+ safe_pass = quote(self._repo_password)
+
+ # Insert credentials into the URL
+ # e.g. https://username:password@github.com/org/repo.git
+ auth_url = repo_url.replace("https://", f"https://{safe_user}:{safe_pass}@")
+ auth_url = repo_url.replace("http://", f"https://{safe_user}:{safe_pass}@")
+ else:
+ auth_url = repo_url
+ return auth_url
+
+ async def check_dummy_operation(self, op_id, op_params, content):
+ self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
+ return True, "OK"
+
+ def initialize_operation(self, item_id, op_id):
+ db_item = self.db.get_one(self.db_collection, {"_id": item_id})
+ operation = next(
+ (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
+ None,
+ )
+ operation["workflowState"] = "PROCESSING"
+ operation["resourceState"] = "NOT_READY"
+ operation["operationState"] = "IN_PROGRESS"
+ operation["gitOperationInfo"] = None
+ db_item["current_operation"] = operation["op_id"]
+ self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
+
+ def get_operation_params(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationParams", {})
+
+ def get_operation_type(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationType", {})
+
+ def update_state_operation_history(
+ self, content, op_id, workflow_state=None, resource_state=None
+ ):
+ self.logger.info(
+ f"Update state of operation {op_id} in Operation History in DB"
+ )
+ self.logger.info(
+ f"Workflow state: {workflow_state}. Resource state: {resource_state}"
+ )
+ self.logger.debug(f"Content: {content}")
+
+ op_num = 0
+ for operation in content["operationHistory"]:
+ self.logger.debug("Operations: {}".format(operation))
+ if operation["op_id"] == op_id:
+ self.logger.debug("Found operation number: {}".format(op_num))
+ if workflow_state is not None:
+ operation["workflowState"] = workflow_state
+
+ if resource_state is not None:
+ operation["resourceState"] = resource_state
+ break
+ op_num += 1
+ self.logger.debug("content: {}".format(content))
+
+ return content
+
+ def update_operation_history(
+ self, content, op_id, workflow_status=None, resource_status=None, op_end=True
+ ):
+ self.logger.info(
+ f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
+ )
+ self.logger.debug(f"Content: {content}")
+
+ op_num = 0
+ for operation in content["operationHistory"]:
+ self.logger.debug("Operations: {}".format(operation))
+ if operation["op_id"] == op_id:
+ self.logger.debug("Found operation number: {}".format(op_num))
+ if workflow_status is not None:
+ if workflow_status:
+ operation["workflowState"] = "COMPLETED"
+ operation["result"] = True
+ else:
+ operation["workflowState"] = "ERROR"
+ operation["operationState"] = "FAILED"
+ operation["result"] = False
+
+ if resource_status is not None:
+ if resource_status:
+ operation["resourceState"] = "READY"
+ operation["operationState"] = "COMPLETED"
+ operation["result"] = True
+ else:
+ operation["resourceState"] = "NOT_READY"
+ operation["operationState"] = "FAILED"
+ operation["result"] = False
+
+ if op_end:
+ now = time()
+ operation["endDate"] = now
+ break
+ op_num += 1
+ self.logger.debug("content: {}".format(content))
+
+ return content
+
+ async def check_workflow_and_update_db(self, op_id, workflow_name, db_content):
+ workflow_status, workflow_msg = await self.odu.check_workflow_status(
+ op_id, workflow_name
+ )
+ self.logger.info(
+ "Workflow Status: {} Workflow Message: {}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ operation_type = self.get_operation_type(db_content, op_id)
+ if operation_type == "create" and workflow_status:
+ db_content["state"] = "CREATED"
+ elif operation_type == "create" and not workflow_status:
+ db_content["state"] = "FAILED_CREATION"
+ elif operation_type == "delete" and workflow_status:
+ db_content["state"] = "DELETED"
+ elif operation_type == "delete" and not workflow_status:
+ db_content["state"] = "FAILED_DELETION"
+
+ if workflow_status:
+ db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ db_content["resourceState"] = "ERROR"
+
+ db_content = self.update_operation_history(
+ db_content, op_id, workflow_status, None
+ )
+ self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
+ return workflow_status
+
+ async def check_resource_and_update_db(
+ self, resource_name, op_id, op_params, db_content
+ ):
+ workflow_status = True
+
+ resource_status, resource_msg = await self.check_resource_status(
+ resource_name, op_id, op_params, db_content
+ )
+ self.logger.info(
+ "Resource Status: {} Resource Message: {}".format(
+ resource_status, resource_msg
+ )
+ )
+
+ if resource_status:
+ db_content["resourceState"] = "READY"
+ else:
+ db_content["resourceState"] = "ERROR"
+
+ db_content = self.update_operation_history(
+ db_content, op_id, workflow_status, resource_status
+ )
+ db_content["operatingState"] = "IDLE"
+ db_content["current_operation"] = None
+ return resource_status, db_content
+
+ async def common_check_list(
+ self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None
+ ):
+ try:
+ for checking in checkings_list:
+ if checking["enable"]:
+ status, message = await self.odu.readiness_loop(
+ op_id=op_id,
+ item=checking["item"],
+ name=checking["name"],
+ namespace=checking["namespace"],
+ condition=checking.get("condition"),
+ deleted=checking.get("deleted", False),
+ timeout=checking["timeout"],
+ kubectl_obj=kubectl_obj,
+ )
+ if not status:
+ error_message = "Resources not ready: "
+ error_message += checking.get("error_message", "")
+ return status, f"{error_message}: {message}"
+ else:
+ db_item["resourceState"] = checking["resourceState"]
+ db_item = self.update_state_operation_history(
+ db_item, op_id, None, checking["resourceState"]
+ )
+ self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
+ except Exception as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.debug(f"Exception: {e}", exc_info=True)
+ return False, f"Unexpected exception: {e}"
+ return True, "OK"
+
+ async def check_resource_status(self, key, op_id, op_params, content):
+ self.logger.info(
+ f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
+ )
+ self.logger.debug(f"Check resource status. Content: {content}")
+ check_resource_function = self._workflows.get(key, {}).get(
+ "check_resource_function"
+ )
+ self.logger.info("check_resource function : {}".format(check_resource_function))
+ if check_resource_function:
+ return await check_resource_function(op_id, op_params, content)
+ else:
+ return await self.check_dummy_operation(op_id, op_params, content)
+
+ def check_force_delete_and_delete_from_db(
+ self, _id, workflow_status, resource_status, force
+ ):
+ self.logger.info(
+ f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}"
+ )
+ if force and (not workflow_status or not resource_status):
+ self.db.del_one(self.db_collection, {"_id": _id})
+ return True
+ return False
+
+ def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
+ self.db.encrypt_decrypt_fields(
+ content,
+ "decrypt",
+ fields,
+ schema_version="1.11",
+ salt=content["_id"],
+ )
+
+ def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
+ self.db.encrypt_decrypt_fields(
+ content,
+ "encrypt",
+ fields,
+ schema_version="1.11",
+ salt=content["_id"],
+ )
+
+ def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
+ # This deep copy is intended to be passed to ODU workflows.
+ content_copy = copy.deepcopy(content)
+
+ # decrypting the key
+ self.db.encrypt_decrypt_fields(
+ content_copy,
+ "decrypt",
+ fields,
+ schema_version="1.11",
+ salt=content_copy["_id"],
+ )
+ return content_copy
+
+ def delete_ksu_dependency(self, _id, data):
+ used_oka = []
+ existing_oka = []
+
+ for oka_data in data["oka"]:
+ if oka_data.get("_id"):
+ used_oka.append(oka_data["_id"])
+
+ all_ksu_data = self.db.get_list("ksus", {})
+ for ksu_data in all_ksu_data:
+ if ksu_data["_id"] != _id:
+ for oka_data in ksu_data["oka"]:
+ if oka_data.get("_id"):
+ if oka_data["_id"] not in existing_oka:
+ existing_oka.append(oka_data["_id"])
+
+ self.logger.info(f"Used OKA: {used_oka}")
+ self.logger.info(f"Existing OKA: {existing_oka}")
+
+ for oka_id in used_oka:
+ if oka_id not in existing_oka:
+ self.db.set_one(
+ "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
+ )
+
+ return
+
+ def delete_profile_ksu(self, _id, profile_type):
+ filter_q = {"profile": {"_id": _id, "profile_type": profile_type}}
+ ksu_list = self.db.get_list("ksus", filter_q)
+ for ksu_data in ksu_list:
+ self.delete_ksu_dependency(ksu_data["_id"], ksu_data)
+
+ if ksu_list:
+ self.db.del_list("ksus", filter_q)
+ return
+
+ def cluster_kubectl(self, db_cluster):
+ cluster_kubeconfig = db_cluster["credentials"]
+ kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
+ with open(kubeconfig_path, "w") as kubeconfig_file:
+ yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
+ return Kubectl(config_file=kubeconfig_path)
+
+ def cloneGitRepo(self, repo_url, branch):
+ self.logger.debug(f"Cloning repo {repo_url}, branch {branch}")
+ tmpdir = tempfile.mkdtemp()
+ self.logger.debug(f"Created temp folder {tmpdir}")
+ cloned_repo = Repo.clone_from(
+ repo_url,
+ tmpdir,
+ allow_unsafe_options=True,
+ multi_options=["-c", "http.sslVerify=false"],
+ )
+ self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
+ assert cloned_repo
+ new_branch = cloned_repo.create_head(branch) # create a new branch
+ assert new_branch.checkout() == cloned_repo.active_branch
+ self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
+ self.logger.info(f"Repo {repo_url} cloned in {tmpdir}. New branch: {branch}")
+ return tmpdir
+
+ def createCommit(self, repo_dir, commit_msg):
+ repo = Repo(repo_dir)
+ self.logger.info(
+ f"Creating commit '{commit_msg}' in branch '{repo.active_branch}'"
+ )
+ self.logger.debug(f"Current active branch: {repo.active_branch}")
+ # repo.index.add('**')
+ repo.git.add(all=True)
+ repo.index.commit(commit_msg)
+ self.logger.info(
+ f"Commit '{commit_msg}' created in branch '{repo.active_branch}'"
+ )
+ self.logger.debug(f"Current active branch: {repo.active_branch}")
+ return repo.active_branch
+
+ def mergeGit(self, repo_dir, git_branch):
+ repo = Repo(repo_dir)
+ self.logger.info(f"Merging local branch '{git_branch}' into main")
+ with_git = False
+ if with_git:
+ try:
+ repo.git("checkout main")
+ repo.git(f"merge {git_branch}")
+ return True
+ except Exception as e:
+ self.logger.error(e)
+ return False
+ else:
+ # prepare a merge
+ main = repo.heads.main # right-hand side is ahead of us, in the future
+ merge_base = repo.merge_base(git_branch, main) # three-way merge
+ repo.index.merge_tree(main, base=merge_base) # write the merge into index
+ try:
+ # The merge is done in the branch
+ repo.index.commit(
+ f"Merged {git_branch} and main",
+ parent_commits=(git_branch.commit, main.commit),
+ )
+ # Now, git_branch is ahed of master. Now let master point to the recent commit
+ aux_head = repo.create_head("aux")
+ main.commit = aux_head.commit
+ repo.delete_head(aux_head)
+ assert main.checkout()
+ return True
+ except Exception as e:
+ self.logger.error(e)
+ return False
+
+ def pushToRemote(self, repo_dir):
+ repo = Repo(repo_dir)
+ self.logger.info("Pushing the change to remote")
+ # repo.remotes.origin.push(refspec='{}:{}'.format(local_branch, remote_branch))
+ repo.remotes.origin.push()
+ self.logger.info("Push done")
+ return True
"Shahithya Y <shahithya.y@tataelxsi.co.in>",
)
-import copy
-import logging
+import os
from time import time
import traceback
-from osm_lcm.lcm_utils import LcmBase
+import yaml
from copy import deepcopy
-from osm_lcm import odu_workflows
from osm_lcm import vim_sdn
-from osm_lcm.data_utils.list_utils import find_in_list
-from osm_lcm.n2vc.kubectl import Kubectl
-import yaml
+from osm_lcm.gitops import GitOpsLcm
+from osm_lcm.lcm_utils import LcmException
MAP_PROFILE = {
"infra_controller_profiles": "infra-controllers",
}
-class GitOpsLcm(LcmBase):
- db_collection = "gitops"
- workflow_status = None
- resource_status = None
-
- profile_collection_mapping = {
- "infra_controller_profiles": "k8sinfra_controller",
- "infra_config_profiles": "k8sinfra_config",
- "resource_profiles": "k8sresource",
- "app_profiles": "k8sapp",
- }
-
- profile_type_mapping = {
- "infra-controllers": "infra_controller_profiles",
- "infra-configs": "infra_config_profiles",
- "managed-resources": "resource_profiles",
- "applications": "app_profiles",
- }
-
- def __init__(self, msg, lcm_tasks, config):
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
- self._checkloop_kustomization_timeout = 900
- self._checkloop_resource_timeout = 900
- self._workflows = {}
- super().__init__(msg, self.logger)
-
- async def check_dummy_operation(self, op_id, op_params, content):
- self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
- return True, "OK"
-
- def initialize_operation(self, item_id, op_id):
- db_item = self.db.get_one(self.db_collection, {"_id": item_id})
- operation = next(
- (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
- None,
- )
- operation["workflowState"] = "PROCESSING"
- operation["resourceState"] = "NOT_READY"
- operation["operationState"] = "IN_PROGRESS"
- operation["gitOperationInfo"] = None
- db_item["current_operation"] = operation["op_id"]
- self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
-
- def get_operation_params(self, item, operation_id):
- operation_history = item.get("operationHistory", [])
- operation = find_in_list(
- operation_history, lambda op: op["op_id"] == operation_id
- )
- return operation.get("operationParams", {})
-
- def get_operation_type(self, item, operation_id):
- operation_history = item.get("operationHistory", [])
- operation = find_in_list(
- operation_history, lambda op: op["op_id"] == operation_id
- )
- return operation.get("operationType", {})
-
- def update_state_operation_history(
- self, content, op_id, workflow_state=None, resource_state=None
- ):
- self.logger.info(
- f"Update state of operation {op_id} in Operation History in DB"
- )
- self.logger.info(
- f"Workflow state: {workflow_state}. Resource state: {resource_state}"
- )
- self.logger.debug(f"Content: {content}")
-
- op_num = 0
- for operation in content["operationHistory"]:
- self.logger.debug("Operations: {}".format(operation))
- if operation["op_id"] == op_id:
- self.logger.debug("Found operation number: {}".format(op_num))
- if workflow_state is not None:
- operation["workflowState"] = workflow_state
-
- if resource_state is not None:
- operation["resourceState"] = resource_state
- break
- op_num += 1
- self.logger.debug("content: {}".format(content))
-
- return content
-
- def update_operation_history(
- self, content, op_id, workflow_status=None, resource_status=None, op_end=True
- ):
- self.logger.info(
- f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
- )
- self.logger.debug(f"Content: {content}")
-
- op_num = 0
- for operation in content["operationHistory"]:
- self.logger.debug("Operations: {}".format(operation))
- if operation["op_id"] == op_id:
- self.logger.debug("Found operation number: {}".format(op_num))
- if workflow_status is not None:
- if workflow_status:
- operation["workflowState"] = "COMPLETED"
- operation["result"] = True
- else:
- operation["workflowState"] = "ERROR"
- operation["operationState"] = "FAILED"
- operation["result"] = False
-
- if resource_status is not None:
- if resource_status:
- operation["resourceState"] = "READY"
- operation["operationState"] = "COMPLETED"
- operation["result"] = True
- else:
- operation["resourceState"] = "NOT_READY"
- operation["operationState"] = "FAILED"
- operation["result"] = False
-
- if op_end:
- now = time()
- operation["endDate"] = now
- break
- op_num += 1
- self.logger.debug("content: {}".format(content))
-
- return content
-
- async def check_workflow_and_update_db(self, op_id, workflow_name, db_content):
- workflow_status, workflow_msg = await self.odu.check_workflow_status(
- op_id, workflow_name
- )
- self.logger.info(
- "Workflow Status: {} Workflow Message: {}".format(
- workflow_status, workflow_msg
- )
- )
- operation_type = self.get_operation_type(db_content, op_id)
- if operation_type == "create" and workflow_status:
- db_content["state"] = "CREATED"
- elif operation_type == "create" and not workflow_status:
- db_content["state"] = "FAILED_CREATION"
- elif operation_type == "delete" and workflow_status:
- db_content["state"] = "DELETED"
- elif operation_type == "delete" and not workflow_status:
- db_content["state"] = "FAILED_DELETION"
-
- if workflow_status:
- db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, None
- )
- self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
- return workflow_status
-
- async def check_resource_and_update_db(
- self, resource_name, op_id, op_params, db_content
- ):
- workflow_status = True
-
- resource_status, resource_msg = await self.check_resource_status(
- resource_name, op_id, op_params, db_content
- )
- self.logger.info(
- "Resource Status: {} Resource Message: {}".format(
- resource_status, resource_msg
- )
- )
-
- if resource_status:
- db_content["resourceState"] = "READY"
- else:
- db_content["resourceState"] = "ERROR"
-
- db_content = self.update_operation_history(
- db_content, op_id, workflow_status, resource_status
- )
- db_content["operatingState"] = "IDLE"
- db_content["current_operation"] = None
- return resource_status, db_content
-
- async def common_check_list(
- self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None
- ):
- try:
- for checking in checkings_list:
- if checking["enable"]:
- status, message = await self.odu.readiness_loop(
- op_id=op_id,
- item=checking["item"],
- name=checking["name"],
- namespace=checking["namespace"],
- condition=checking.get("condition"),
- deleted=checking.get("deleted", False),
- timeout=checking["timeout"],
- kubectl_obj=kubectl_obj,
- )
- if not status:
- error_message = "Resources not ready: "
- error_message += checking.get("error_message", "")
- return status, f"{error_message}: {message}"
- else:
- db_item["resourceState"] = checking["resourceState"]
- db_item = self.update_state_operation_history(
- db_item, op_id, None, checking["resourceState"]
- )
- self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
- except Exception as e:
- self.logger.debug(traceback.format_exc())
- self.logger.debug(f"Exception: {e}", exc_info=True)
- return False, f"Unexpected exception: {e}"
- return True, "OK"
-
- async def check_resource_status(self, key, op_id, op_params, content):
- self.logger.info(
- f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
- )
- self.logger.debug(f"Check resource status. Content: {content}")
- check_resource_function = self._workflows.get(key, {}).get(
- "check_resource_function"
- )
- self.logger.info("check_resource function : {}".format(check_resource_function))
- if check_resource_function:
- return await check_resource_function(op_id, op_params, content)
- else:
- return await self.check_dummy_operation(op_id, op_params, content)
-
- def check_force_delete_and_delete_from_db(
- self, _id, workflow_status, resource_status, force
- ):
- self.logger.info(
- f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}"
- )
- if force and (not workflow_status or not resource_status):
- self.db.del_one(self.db_collection, {"_id": _id})
- return True
- return False
-
- def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
- self.db.encrypt_decrypt_fields(
- content,
- "decrypt",
- fields,
- schema_version="1.11",
- salt=content["_id"],
- )
-
- def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
- self.db.encrypt_decrypt_fields(
- content,
- "encrypt",
- fields,
- schema_version="1.11",
- salt=content["_id"],
- )
-
- def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
- # This deep copy is intended to be passed to ODU workflows.
- content_copy = copy.deepcopy(content)
-
- # decrypting the key
- self.db.encrypt_decrypt_fields(
- content_copy,
- "decrypt",
- fields,
- schema_version="1.11",
- salt=content_copy["_id"],
- )
- return content_copy
-
- def delete_ksu_dependency(self, _id, data):
- used_oka = []
- existing_oka = []
-
- for oka_data in data["oka"]:
- if oka_data.get("_id"):
- used_oka.append(oka_data["_id"])
-
- all_ksu_data = self.db.get_list("ksus", {})
- for ksu_data in all_ksu_data:
- if ksu_data["_id"] != _id:
- for oka_data in ksu_data["oka"]:
- if oka_data.get("_id"):
- if oka_data["_id"] not in existing_oka:
- existing_oka.append(oka_data["_id"])
-
- self.logger.info(f"Used OKA: {used_oka}")
- self.logger.info(f"Existing OKA: {existing_oka}")
-
- for oka_id in used_oka:
- if oka_id not in existing_oka:
- self.db.set_one(
- "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
- )
-
- return
-
- def delete_profile_ksu(self, _id, profile_type):
- filter_q = {"profile": {"_id": _id, "profile_type": profile_type}}
- ksu_list = self.db.get_list("ksus", filter_q)
- for ksu_data in ksu_list:
- self.delete_ksu_dependency(ksu_data["_id"], ksu_data)
-
- if ksu_list:
- self.db.del_list("ksus", filter_q)
- return
-
- def cluster_kubectl(self, db_cluster):
- cluster_kubeconfig = db_cluster["credentials"]
- kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
- with open(kubeconfig_path, "w") as kubeconfig_file:
- yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
- return Kubectl(config_file=kubeconfig_path)
-
-
class NodeGroupLcm(GitOpsLcm):
db_collection = "nodegroups"
}
self.logger.info(f"Workflow content: {workflow_content}")
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"add_nodegroup", op_id, op_params, workflow_content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
}
self.logger.info(f"Workflow content: {workflow_content}")
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"scale_nodegroup", op_id, op_params, workflow_content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
workflow_content = {"nodegroup": db_nodegroup, "cluster": db_cluster}
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_nodegroup", op_id, op_params, workflow_content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
workflow_content["vim_account"] = db_vim
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"create_cluster", op_id, op_params, workflow_content
)
if not workflow_res:
self.logger.debug(f"Exception: {e}", exc_info=True)
raise e
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_cluster", op_id, op_params, workflow_content
)
if not workflow_res:
# content["profile"] = db_profile
workflow_content["profile"] = db_profile
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"attach_profile_to_cluster", op_id, op_params, workflow_content
)
if not workflow_res:
db_profile["profile_type"] = profile_type
workflow_content["profile"] = db_profile
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"detach_profile_from_cluster", op_id, op_params, workflow_content
)
if not workflow_res:
"cluster": db_cluster_copy,
}
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"register_cluster", op_id, op_params, workflow_content
)
if not workflow_res:
"cluster": self.decrypted_copy(db_cluster),
}
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"deregister_cluster", op_id, op_params, workflow_content
)
if not workflow_res:
db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
workflow_content["vim_account"] = db_vim
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"update_cluster", op_id, op_params, workflow_content
)
if not workflow_res:
salt=vim_id,
)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"create_cloud_credentials", op_id, op_params, db_content
)
salt=vim_id,
)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"update_cloud_credentials", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
op_params = params
db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_cloud_credentials", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
super().__init__(msg, lcm_tasks, config)
async def create(self, params, order_id):
- self.logger.info("App Create Enter")
+ self.logger.info("App Profile Create Enter")
op_id = params["operation_id"]
profile_id = params["profile_id"]
op_params = self.get_operation_params(content, op_id)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
"create_profile", op_id, op_params, content
)
self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
- self.logger.info(f"App Create Exit with resource status: {resource_status}")
+ self.logger.info(
+ f"App Profile Create Exit with resource status: {resource_status}"
+ )
return
async def delete(self, params, order_id):
content = self.db.get_one("k8sapp", {"_id": profile_id})
op_params = self.get_operation_params(content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
self.delete_profile_ksu(profile_id, profile_type)
self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
self.db.del_one(self.db_collection, {"_id": content["_id"]})
- self.logger.info(f"App Delete Exit with resource status: {resource_status}")
+ self.logger.info(
+ f"App Profile Delete Exit with resource status: {resource_status}"
+ )
return
super().__init__(msg, lcm_tasks, config)
async def create(self, params, order_id):
- self.logger.info("Resource Create Enter")
+ self.logger.info("Resource Profile Create Enter")
op_id = params["operation_id"]
profile_id = params["profile_id"]
op_params = self.get_operation_params(content, op_id)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
content = self.db.get_one("k8sresource", {"_id": profile_id})
op_params = self.get_operation_params(content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
super().__init__(msg, lcm_tasks, config)
async def create(self, params, order_id):
- self.logger.info("Infra controller Create Enter")
+ self.logger.info("Infra controller Profile Create Enter")
op_id = params["operation_id"]
profile_id = params["profile_id"]
op_params = self.get_operation_params(content, op_id)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
op_params = self.get_operation_params(content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
super().__init__(msg, lcm_tasks, config)
async def create(self, params, order_id):
- self.logger.info("Infra config Create Enter")
+ self.logger.info("Infra config Profile Create Enter")
op_id = params["operation_id"]
profile_id = params["profile_id"]
op_params = self.get_operation_params(content, op_id)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
op_params = self.get_operation_params(content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is: {}".format(workflow_name))
db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
op_params = self.get_operation_params(db_content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"create_oka", op_id, op_params, db_content
)
db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
op_params = self.get_operation_params(db_content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"update_oka", op_id, op_params, db_content
)
workflow_status = await self.check_workflow_and_update_db(
db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
op_params = self.get_operation_params(db_content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_oka", op_id, op_params, db_content
)
workflow_status = await self.check_workflow_and_update_db(
op_params.append(ksu_params)
# A single workflow is launched for all KSUs
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"create_ksus", op_id, op_params, db_content
)
# Update workflow status in all KSUs
] = f"{oka_type}/{db_oka['git_name']}/templates"
op_params.append(ksu_params)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"update_ksus", op_id, op_params, db_content
)
ksu_params["profile"]["name"] = db_profile["name"]
op_params.append(ksu_params)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"delete_ksus", op_id, op_params, db_content
)
self.initialize_operation(ksus_id, op_id)
db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
op_params = self.get_operation_params(db_content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"clone_ksus", op_id, op_params, db_content
)
self.initialize_operation(ksus_id, op_id)
db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
op_params = self.get_operation_params(db_content, op_id)
- workflow_res, workflow_name = await self.odu.launch_workflow(
+ workflow_res, workflow_name, _ = await self.odu.launch_workflow(
"move_ksus", op_id, op_params, db_content
)
self.logger.error(e)
return False, f"Error checking KSU in cluster {db_cluster['name']}."
return True, "OK"
+
+
+class AppInstanceLcm(GitOpsLcm):
+ db_collection = "appinstances"
+
+ def __init__(self, msg, lcm_tasks, config):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+ super().__init__(msg, lcm_tasks, config)
+ self._workflows = {
+ "create_app": {
+ "check_resource_function": self.check_create_app,
+ },
+ "update_app": {
+ "check_resource_function": self.check_update_app,
+ },
+ "delete_app": {
+ "check_resource_function": self.check_delete_app,
+ },
+ }
+
+ def get_dbclusters_from_profile(self, profile_id, profile_type):
+ cluster_list = []
+ db_clusters = self.db.get_list("clusters")
+ self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}")
+ for db_cluster in db_clusters:
+ if profile_id in db_cluster.get(profile_type, []):
+ self.logger.info(
+ f"Profile {profile_id} found in cluster {db_cluster['name']}"
+ )
+ cluster_list.append(db_cluster)
+ return cluster_list
+
+ def update_app_dependency(self, app_id, db_app):
+ self.logger.info(f"Updating AppInstance dependencies for AppInstance {app_id}")
+ oka_id = db_app.get("oka")
+ if not oka_id:
+ self.logger.info(f"No OKA associated with AppInstance {app_id}")
+ return
+
+ used_oka = []
+ all_apps = self.db.get_list(self.db_collection, {})
+ for app in all_apps:
+ if app["_id"] != app_id:
+ app_oka_id = app["oka"]
+ if app_oka_id not in used_oka:
+ used_oka.append(app_oka_id)
+ self.logger.info(f"Used OKA: {used_oka}")
+
+ if oka_id not in used_oka:
+ self.db.set_one(
+ "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
+ )
+ return
+
+ async def generic_operation(self, params, order_id, operation_name):
+ self.logger.info(f"Generic operation. Operation name: {operation_name}")
+ # self.logger.debug(f"Params: {params}")
+ try:
+ op_id = params["operation_id"]
+ app_id = params["appinstance"]
+ self.initialize_operation(app_id, op_id)
+ db_app = self.db.get_one(self.db_collection, {"_id": app_id})
+ # self.logger.debug("Db App: {}".format(db_app))
+
+ # Initialize workflow_content with a copy of the db_app, decrypting fields to use in workflows
+ db_app_copy = self.decrypted_copy(db_app)
+ workflow_content = {
+ "app": db_app_copy,
+ }
+
+ # Update workflow_content with profile info
+ profile_type = db_app["profile_type"]
+ profile_id = db_app["profile"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ # db_profile is decrypted inline
+ # No need to use decrypted_copy because db_profile won't be updated.
+ self.decrypt_age_keys(db_profile)
+ workflow_content["profile"] = db_profile
+
+ op_params = self.get_operation_params(db_app, op_id)
+ if not op_params:
+ op_params = {}
+ self.logger.debug("Operation Params: {}".format(op_params))
+
+ # Get SW catalog path from op_params or from DB
+ aux_dict = {}
+ if operation_name == "create_app":
+ aux_dict = op_params
+ else:
+ aux_dict = db_app
+ sw_catalog_path = ""
+ if "sw_catalog_path" in aux_dict:
+ sw_catalog_path = aux_dict.get("sw_catalog_path", "")
+ elif "oka" in aux_dict:
+ oka_id = aux_dict["oka"]
+ db_oka = self.db.get_one("okas", {"_id": oka_id})
+ oka_type = MAP_PROFILE[
+ db_oka.get("profile_type", "infra_controller_profiles")
+ ]
+ sw_catalog_path = f"{oka_type}/{db_oka['git_name'].lower()}"
+ else:
+ self.logger.error("SW Catalog path could not be determined.")
+ raise LcmException("SW Catalog path could not be determined.")
+ self.logger.debug(f"SW Catalog path: {sw_catalog_path}")
+
+ # Get model from Git repo
+ # Clone the SW catalog repo
+ repodir = self.cloneGitRepo(
+ repo_url=self._full_repo_sw_catalogs_url, branch="main"
+ )
+ model_file_path = os.path.join(repodir, sw_catalog_path, "model.yaml")
+ if not os.path.exists(model_file_path):
+ self.logger.error(f"Model file not found at path: {model_file_path}")
+ raise LcmException(f"Model file not found at path: {model_file_path}")
+ # Store the model content in workflow_content
+ with open(model_file_path) as model_file:
+ workflow_content["model"] = yaml.safe_load(model_file.read())
+
+ # A single workflow is launched for the App operation
+ self.logger.debug("Launching workflow {}".format(operation_name))
+ (
+ workflow_res,
+ workflow_name,
+ workflow_resources,
+ ) = await self.odu.launch_workflow(
+ operation_name, op_id, op_params, workflow_content
+ )
+
+ if not workflow_res:
+ self.logger.error(f"Failed to launch workflow: {workflow_name}")
+ if operation_name == "create_app":
+ db_app["state"] = "FAILED_CREATION"
+ elif operation_name == "delete_app":
+ db_app["state"] = "FAILED_DELETION"
+ db_app["resourceState"] = "ERROR"
+ db_app = self.update_operation_history(
+ db_app, op_id, workflow_status=False, resource_status=None
+ )
+ self.db.set_one(self.db_collection, {"_id": db_app["_id"]}, db_app)
+ # Clean items used in the workflow, no matter if the workflow succeeded
+ clean_status, clean_msg = await self.odu.clean_items_workflow(
+ operation_name, op_id, op_params, workflow_content
+ )
+ self.logger.info(
+ f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
+ )
+ return
+
+ # Update resources created in workflow
+ db_app["app_model"] = workflow_resources.get("app_model", {})
+
+ # Update workflow status in App
+ workflow_status = await self.check_workflow_and_update_db(
+ op_id, workflow_name, db_app
+ )
+ # Update resource status in DB
+ if workflow_status:
+ resource_status, db_app = await self.check_resource_and_update_db(
+ operation_name, op_id, op_params, db_app
+ )
+ else:
+ resource_status = False
+ self.db.set_one(self.db_collection, {"_id": db_app["_id"]}, db_app)
+
+ # Clean items used in the workflow, no matter if the workflow succeeded
+ clean_status, clean_msg = await self.odu.clean_items_workflow(
+ operation_name, op_id, op_params, workflow_content
+ )
+ self.logger.info(
+ f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
+ )
+
+ if operation_name == "delete_app":
+ force = params.get("force", False)
+ if force:
+ force_delete_status = self.check_force_delete_and_delete_from_db(
+ db_app["_id"], workflow_status, resource_status, force
+ )
+ if force_delete_status:
+ return
+ if resource_status:
+ db_app["state"] == "DELETED"
+ self.update_app_dependency(db_app["_id"], db_app)
+ self.db.del_one(self.db_collection, {"_id": db_app["_id"]})
+
+ self.logger.info(
+ f"Generic app operation Exit {operation_name} with resource Status {resource_status}"
+ )
+ return
+ except Exception as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.debug(f"Exception: {e}", exc_info=True)
+ return
+
+ async def create(self, params, order_id):
+ self.logger.info("App Create Enter")
+ return await self.generic_operation(params, order_id, "create_app")
+
+ async def update(self, params, order_id):
+ self.logger.info("App Edit Enter")
+ return await self.generic_operation(params, order_id, "update_app")
+
+ async def delete(self, params, order_id):
+ self.logger.info("App Delete Enter")
+ return await self.generic_operation(params, order_id, "delete_app")
+
+ async def check_appinstance(self, op_id, op_params, content, deleted=False):
+ self.logger.info(
+ f"check_app_instance Operation {op_id}. Params: {op_params}. Deleted: {deleted}"
+ )
+ self.logger.debug(f"Content: {content}")
+ db_app = content
+ profile_id = db_app["profile"]
+ profile_type = db_app["profile_type"]
+ app_name = db_app["name"]
+ self.logger.info(
+ f"Checking status of AppInstance {app_name} for profile {profile_id}."
+ )
+
+ # TODO: read app_model and get kustomization name and namespace
+ # app_model = db_app.get("app_model", {})
+ kustomization_list = [
+ {
+ "name": f"jenkins-{app_name}",
+ "namespace": "flux-system",
+ }
+ ]
+ checkings_list = []
+ if deleted:
+ for kustomization in kustomization_list:
+ checkings_list.append(
+ {
+ "item": "kustomization",
+ "name": kustomization["name"].lower(),
+ "namespace": kustomization["namespace"],
+ "deleted": True,
+ "timeout": self._checkloop_kustomization_timeout,
+ "enable": True,
+ "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
+ }
+ )
+ else:
+ for kustomization in kustomization_list:
+ checkings_list.append(
+ {
+ "item": "kustomization",
+ "name": kustomization["name"].lower(),
+ "namespace": kustomization["namespace"],
+ "condition": {
+ "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
+ "value": "True",
+ },
+ "timeout": self._checkloop_kustomization_timeout,
+ "enable": True,
+ "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
+ }
+ )
+
+ dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
+ if not dbcluster_list:
+ self.logger.info(f"No clusters found for profile {profile_id}.")
+ for db_cluster in dbcluster_list:
+ try:
+ self.logger.info(
+ f"Checking status of AppInstance {app_name} in cluster {db_cluster['name']}."
+ )
+ cluster_kubectl = self.cluster_kubectl(db_cluster)
+ result, message = await self.common_check_list(
+ op_id,
+ checkings_list,
+ self.db_collection,
+ db_app,
+ kubectl_obj=cluster_kubectl,
+ )
+ if not result:
+ return False, message
+ except Exception as e:
+ self.logger.error(
+ f"Error checking AppInstance in cluster {db_cluster['name']}."
+ )
+ self.logger.error(e)
+ return (
+ False,
+ f"Error checking AppInstance in cluster {db_cluster['name']}.",
+ )
+ return True, "OK"
+
+ async def check_create_app(self, op_id, op_params, content):
+ self.logger.info(f"check_update_app Operation {op_id}. Params: {op_params}.")
+ # self.logger.debug(f"Content: {content}")
+ return await self.check_appinstance(op_id, op_params, content)
+
+ async def check_update_app(self, op_id, op_params, content):
+ self.logger.info(f"check_update_app Operation {op_id}. Params: {op_params}.")
+ # self.logger.debug(f"Content: {content}")
+ return await self.check_appinstance(op_id, op_params, content)
+
+ async def check_delete_app(self, op_id, op_params, content):
+ self.logger.info(f"check_delete_app Operation {op_id}. Params: {op_params}.")
+ # self.logger.debug(f"Content: {content}")
+ return await self.check_appinstance(op_id, op_params, content, deleted=True)
task = asyncio.ensure_future(self.ksu.move(params, order_id))
self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task)
return
+ elif topic == "appinstance":
+ op_id = params["operation_id"]
+ appinstance_id = params["appinstance"]
+ if command == "create":
+ task = asyncio.ensure_future(self.appinstance.create(params, order_id))
+ self.lcm_tasks.register(
+ "appinstance", appinstance_id, op_id, "app_create", task
+ )
+ return
+ elif command == "update" or command == "updated":
+ task = asyncio.ensure_future(self.appinstance.update(params, order_id))
+ self.lcm_tasks.register(
+ "appinstance", appinstance_id, op_id, "app_edit", task
+ )
+ return
+ elif command == "delete":
+ task = asyncio.ensure_future(self.appinstance.delete(params, order_id))
+ self.lcm_tasks.register(
+ "appinstance", appinstance_id, op_id, "app_delete", task
+ )
+ return
elif topic == "nodegroup":
nodegroup_id = params["nodegroup_id"]
op_id = params["operation_id"]
"k8s_infra_config",
"oka",
"ksu",
+ "appinstance",
"nodegroup",
)
self.logger.debug(
)
self.oka = k8s.OkaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
self.ksu = k8s.KsuLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+ self.appinstance = k8s.AppInstanceLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
+ )
self.nodegroup = k8s.NodeGroupLcm(
self.msg, self.lcm_tasks, self.main_config.to_dict()
)
"k8s_infra_config",
"oka",
"ksu",
+ "appinstance",
]
# Map topic to InstanceID
"k8s_infra_config": "k8sinfra_config",
"oka": "oka",
"ksu": "ksus",
+ "appinstance": "appinstances",
}
def __init__(self, worker_id=None, logger=None):
"oka": {},
"ksu": {},
"odu": {},
+ "appinstance": {},
}
self.worker_id = worker_id
self.db = Database().instance.db
import yaml
import tarfile
import io
+import os
from time import sleep
from distutils.version import LooseVersion
def copy_file_to_pod(
self, namespace, pod_name, container_name, src_file, dest_path
- ):
- # Create an in-memory tar file containing the source file
- tar_buffer = io.BytesIO()
- with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
- tar.add(src_file, arcname=dest_path.split("/")[-1])
-
- tar_buffer.seek(0)
-
- # Define the command to extract the tar file in the pod
- exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
-
- # Execute the command
- resp = stream(
- self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
- pod_name,
- namespace,
- command=exec_command,
- container=container_name,
- stdin=True,
- stderr=True,
- stdout=True,
- tty=False,
- _preload_content=False,
- )
+ ) -> bool:
+ try:
+ # Create the destination directory in the pod
+ dest_dir = os.path.dirname(dest_path)
+ if dest_dir:
+ self.logger.debug(f"Creating directory {dest_dir} in pod {pod_name}")
+ mkdir_command = ["mkdir", "-p", dest_dir]
+
+ resp = stream(
+ self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
+ pod_name,
+ namespace,
+ command=mkdir_command,
+ container=container_name,
+ stdin=False,
+ stderr=True,
+ stdout=True,
+ tty=False,
+ _preload_content=False,
+ )
+ resp.close()
+ self.logger.debug(f"Directory {dest_dir} created")
- # Write the tar data to the pod
- resp.write_stdin(tar_buffer.read())
- resp.close()
+ # Create an in-memory tar file containing the source file
+ self.logger.debug(
+ f"Creating in-memory tar of {src_file} as {dest_path.split('/')[-1]}"
+ )
+ tar_buffer = io.BytesIO()
+ with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
+ tar.add(src_file, arcname=dest_path.split("/")[-1])
+
+ tar_buffer.seek(0)
+ self.logger.debug(
+ f"Tar buffer created, size={tar_buffer.getbuffer().nbytes} bytes"
+ )
+
+ # Define the command to extract the tar file in the pod
+ exec_command = ["tar", "xvf", "-", "-C", dest_dir]
+ self.logger.debug(f"Exec command prepared: {exec_command}")
+
+ # Execute the command
+ resp = stream(
+ self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
+ pod_name,
+ namespace,
+ command=exec_command,
+ container=container_name,
+ stdin=True,
+ stderr=True,
+ stdout=True,
+ tty=False,
+ _preload_content=False,
+ )
+ self.logger.debug(
+ f"Started exec stream to pod {pod_name} (ns={namespace}, container={container_name})"
+ )
+
+ # Write the tar data to the pod
+ data = tar_buffer.read()
+ self.logger.debug(f"Writing {len(data)} bytes to pod stdin")
+ resp.write_stdin(data)
+ self.logger.debug("Data written to pod stdin")
+ resp.close()
+ self.logger.debug("Exec stream closed")
+ return True
+ except Exception as e:
+ self.logger.error(f"Failed to copy file {src_file} to pod: {e}")
+ return False
@retry(
attempts=10,
fallback=Exception("Failed creating the pvc"),
)
async def create_pvc_with_content(
- self, name: str, namespace: str, src_file: str, dest_filename: str
+ self, name: str, namespace: str, src_files: typing.List, dest_files: typing.List
):
"""
Create a PVC with content
:param: name: Name of the pvc to be created
:param: namespace: Name of the namespace where the pvc will be stored
- :param: src_file: File to be copied
- :param: filename: Name of the file in the destination folder
+ :param: src_files: List of source files to be copied
+ :param: dest_files: List of destination filenames (paired with src_files)
"""
pod_name = f"copy-pod-{name}"
self.logger.debug(f"Creating pvc {name}")
self.logger.debug("Sleeping")
sleep(40)
self.logger.debug(f"Copying files to pod {pod_name}")
- self.copy_file_to_pod(
- namespace=namespace,
- pod_name=pod_name,
- container_name="copy-container",
- src_file=src_file,
- dest_path=f"/mnt/data/{dest_filename}",
- )
+ for src_f, dest_f in zip(src_files, dest_files):
+ dest_path = f"/mnt/data/{dest_f}"
+ self.logger.debug(f"Copying file {src_f} to {dest_path} in pod {pod_name}")
+ result = self.copy_file_to_pod(
+ namespace=namespace,
+ pod_name=pod_name,
+ container_name="copy-container",
+ src_file=src_f,
+ dest_path=dest_path,
+ )
+ if result:
+ self.logger.debug(
+ f"Successfully copied file {src_f} to {dest_path} in pod {pod_name}"
+ )
+ else:
+ raise Exception(
+ f"Failed copying file {src_f} to {dest_path} in pod {pod_name}"
+ )
self.logger.debug(f"Deleting pod {pod_name}")
await self.delete_pod(pod_name, namespace)
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+
+
+import yaml
+import tempfile
+import os
+
+
+MAP_PROFILE = {
+ "infra_controller_profiles": "infra-controller-profiles",
+ "infra_config_profiles": "infra-config-profiles",
+ "resource_profiles": "managed-resources",
+ "app_profiles": "app-profiles",
+}
+
+
+def merge_model(base, override):
+ """Recursively merge override dictionary into base dictionary."""
+ merge_model = base.copy()
+ for k, v in override.get("spec", {}).items():
+ if k != "ksus":
+ merge_model["spec"][k] = v
+ for ksu_override in override.get("spec", {}).get("ksus", []):
+ for ksu_base in merge_model.get("spec", {}).get("ksus", []):
+ if ksu_base.get("name") == ksu_override.get("name"):
+ for k, v in ksu_override.items():
+ if k != "patterns":
+ ksu_base[k] = v
+ continue
+ for pattern_override in ksu_override.get("patterns", []):
+ for pattern_base in ksu_base.get("patterns", []):
+ if pattern_base.get("name") == pattern_override.get("name"):
+ for kp, vp in pattern_override.items():
+ if kp != "bricks":
+ pattern_base[kp] = vp
+ continue
+ for brick_override in pattern_override.get(
+ "bricks", []
+ ):
+ for brick_base in pattern_base.get(
+ "bricks", []
+ ):
+ if brick_base.get(
+ "name"
+ ) == brick_override.get("name"):
+ for kb, vb in brick_override.items():
+ if kb != "hrset-values":
+ brick_base[kb] = vb
+ continue
+ for (
+ hrset_override
+ ) in brick_override.get(
+ "hrset-values", []
+ ):
+ for (
+ hrset_base
+ ) in brick_base.get(
+ "hrset-values", []
+ ):
+ if hrset_base.get(
+ "name"
+ ) == hrset_override.get(
+ "name"
+ ):
+ hrset_base |= (
+ hrset_override
+ )
+ break
+ else:
+ brick_base[
+ "hrset-values"
+ ].append(hrset_override)
+ break
+ else:
+ pattern_base["bricks"].append(
+ brick_override
+ )
+ break
+ else:
+ ksu_base["patterns"].append(pattern_override)
+ break
+ else:
+ merge_model["spec"]["ksus"].append(ksu_override)
+ return merge_model
+
+
+async def launch_app(self, op_id, op_params, workflow_content, operation_type):
+ self.logger.info(
+ f"launch_app Enter. Operation {op_id}. Operation Type: {operation_type}"
+ )
+ # self.logger.debug(f"Operation Params: {op_params}")
+ # self.logger.debug(f"Content: {workflow_content}")
+
+ db_app = workflow_content["app"]
+ db_profile = workflow_content.get("profile")
+
+ profile_t = db_app.get("profile_type")
+ profile_type = MAP_PROFILE[profile_t]
+ profile_name = db_profile.get("git_name").lower()
+ app_name = db_app["git_name"].lower()
+ app_command = f"app {operation_type} $environment"
+ age_public_key = db_profile.get("age_pubkey")
+
+ sw_catalog_model = workflow_content.get("model")
+ self.logger.debug(f"SW catalog model: {sw_catalog_model}")
+
+ # Update the app model, extending it also with the model from op_params
+ if operation_type == "update":
+ model = op_params.get("model", db_app.get("app_model", {}))
+ else:
+ model = op_params.get("model", {})
+ app_model = merge_model(sw_catalog_model, model)
+
+ app_model["kind"] = "AppInstantiation"
+ app_model["metadata"]["name"] = app_name
+ for ksu in app_model.get("spec", {}).get("ksus", []):
+ for pattern in ksu.get("patterns", []):
+ for brick in pattern.get("bricks", []):
+ brick["public-age-key"] = age_public_key
+ self.logger.debug(f"App model: {app_model}")
+
+ if operation_type == "update":
+ params = op_params.get("params", db_app.get("params", {}))
+ else:
+ params = op_params.get("params", {})
+ params["PROFILE_TYPE"] = profile_type
+ params["PROFILE_NAME"] = profile_name
+ params["APPNAME"] = app_name
+ self.logger.debug(f"Params: {params}")
+
+ if operation_type == "update":
+ secret_params = op_params.get("secret_params", db_app.get("secret_params", {}))
+ else:
+ secret_params = op_params.get("secret_params", {})
+ self.logger.debug(f"Secret Params: {secret_params}")
+
+ # Create temporary folder for the app model and the parameters
+ temp_dir = tempfile.mkdtemp(prefix=f"app-{operation_type}-{op_id}-")
+ self.logger.debug(f"Temporary dir created: {temp_dir}")
+ with open(f"{temp_dir}/app_instance_model.yaml", "w") as f:
+ yaml.safe_dump(
+ app_model, f, indent=2, default_flow_style=False, sort_keys=False
+ )
+
+ os.makedirs(f"{temp_dir}/parameters/clear", exist_ok=True)
+ with open(f"{temp_dir}/parameters/clear/environment.yaml", "w") as f:
+ yaml.safe_dump(params, f, indent=2, default_flow_style=False, sort_keys=False)
+
+ # Create PVC and copy app model and parameters to PVC
+ app_model_pvc = f"temp-pvc-app-{op_id}"
+ src_files = [
+ f"{temp_dir}/app_instance_model.yaml",
+ f"{temp_dir}/parameters/clear/environment.yaml",
+ ]
+ dest_files = [
+ "app_instance_model.yaml",
+ "parameters/clear/environment.yaml",
+ ]
+ self.logger.debug(
+ f"Copying files to PVC {app_model_pvc}: {src_files} -> {dest_files}"
+ )
+ await self._kubectl.create_pvc_with_content(
+ name=app_model_pvc,
+ namespace="osm-workflows",
+ src_files=src_files,
+ dest_files=dest_files,
+ )
+
+ # Create secret with secret_params
+ secret_name = f"secret-app-{op_id}"
+ secret_namespace = "osm-workflows"
+ secret_key = "environment.yaml"
+ secret_value = yaml.safe_dump(
+ secret_params, indent=2, default_flow_style=False, sort_keys=False
+ )
+ try:
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(
+ f"Testing kubectl configuration: {self._kubectl.configuration}"
+ )
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
+ self.logger.debug(
+ f"Creating secret {secret_name} in namespace {secret_namespace}"
+ )
+ await self.create_secret(
+ secret_name,
+ secret_namespace,
+ secret_key,
+ secret_value,
+ )
+ except Exception as e:
+ self.logger.info(
+ f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}"
+ )
+ return (
+ False,
+ f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}",
+ )
+
+ # Create workflow to launch the app
+ workflow_template = "launcher-app.j2"
+ workflow_name = f"{operation_type}-app-{op_id}"
+ # Additional params for the workflow
+ osm_project_name = workflow_content.get("project_name", "osm_admin")
+
+ # Render workflow
+ manifest = self.render_jinja_template(
+ workflow_template,
+ output_file=None,
+ workflow_name=workflow_name,
+ app_command=app_command,
+ app_model_pvc=app_model_pvc,
+ app_secret_name=secret_name,
+ git_fleet_url=self._repo_fleet_url,
+ git_sw_catalogs_url=self._repo_sw_catalogs_url,
+ app_name=app_name,
+ profile_name=profile_name,
+ profile_type=profile_type,
+ osm_project_name=osm_project_name,
+ workflow_debug=self._workflow_debug,
+ workflow_dry_run=self._workflow_dry_run,
+ )
+ self.logger.debug(f"Workflow manifest: {manifest}")
+
+ # Submit workflow
+ self._kubectl.create_generic_object(
+ namespace="osm-workflows",
+ manifest_dict=yaml.safe_load(manifest),
+ api_group="argoproj.io",
+ api_plural="workflows",
+ api_version="v1alpha1",
+ )
+ workflow_resources = {
+ "app_model": app_model,
+ "secret_params": secret_params,
+ "params": params,
+ }
+ return True, workflow_name, workflow_resources
+
+
+async def create_app(self, op_id, op_params, content):
+ self.logger.info(f"create_app Enter. Operation {op_id}")
+ # self.logger.debug(f"Operation Params: {op_params}")
+ # self.logger.debug(f"Content: {workflow_content}")
+ return await self.launch_app(op_id, op_params, content, "create")
+
+
+async def update_app(self, op_id, op_params, content):
+ self.logger.info(f"update_app Enter. Operation {op_id}")
+ # self.logger.debug(f"Operation Params: {op_params}")
+ # self.logger.debug(f"Content: {workflow_content}")
+ return await self.launch_app(op_id, op_params, content, "update")
+
+
+async def delete_app(self, op_id, op_params, content):
+ self.logger.info(f"delete_app Enter. Operation {op_id}")
+ # self.logger.debug(f"Operation Params: {op_params}")
+ # self.logger.debug(f"Content: {workflow_content}")
+ return await self.launch_app(op_id, op_params, content, "delete")
+
+
+async def clean_items_app_launch(self, op_id, op_params, workflow_content):
+ self.logger.info(f"clean_items_app_launch Enter. Operation {op_id}")
+ # self.logger.debug(f"Operation Params: {op_params}")
+ # self.logger.debug(f"Content: {workflow_content}")
+ try:
+ secret_name = f"secret-app-{op_id}"
+ volume_name = f"temp-pvc-app-{op_id}"
+ items = {
+ "secrets": [
+ {
+ "name": secret_name,
+ "namespace": "osm-workflows",
+ }
+ ],
+ "pvcs": [
+ {
+ "name": volume_name,
+ "namespace": "osm-workflows",
+ }
+ ],
+ }
+ await self.clean_items(items)
+ return True, "OK"
+ except Exception as e:
+ return False, f"Error while cleaning items: {e}"
)
except Exception as e:
self.logger.info(f"Cannot create secret {secret_name}: {e}")
- return False, f"Cannot create secret {secret_name}: {e}"
+ return False, f"Cannot create secret {secret_name}: {e}", None
# Additional params for the workflow
cluster_kustomization_name = cluster_name
)
except Exception as e:
self.logger.info(f"Cannot create configmap {configmap_name}: {e}")
- return False, f"Cannot create configmap {configmap_name}: {e}"
+ return False, f"Cannot create configmap {configmap_name}: {e}", None
# Render workflow
# workflow_kwargs = {
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def update_cluster(self, op_id, op_params, content):
)
except Exception as e:
self.logger.info(f"Cannot create secret {secret_name}: {e}")
- return False, f"Cannot create secret {secret_name}: {e}"
+ return False, f"Cannot create secret {secret_name}: {e}", None
# Additional params for the workflow
cluster_kustomization_name = cluster_name
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def delete_cluster(self, op_id, op_params, content):
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def register_cluster(self, op_id, op_params, content):
return (
False,
f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}",
+ None,
)
# Create secret with kubeconfig
return (
False,
f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}",
+ None,
)
# Additional params for the workflow
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def deregister_cluster(self, op_id, op_params, content):
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def get_cluster_credentials(self, db_cluster):
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def update_ksus(self, op_id, op_params_list, content_list):
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def delete_ksus(self, op_id, op_params_list, content_list):
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def clone_ksu(self, op_id, op_params, content):
self.logger.info(f"clone_ksu Enter. Operation {op_id}. Params: {op_params}")
# self.logger.debug(f"Content: {content}")
workflow_name = f"clone-ksu-{content['_id']}"
- return True, workflow_name
+ return True, workflow_name, None
async def move_ksu(self, op_id, op_params, content):
self.logger.info(f"move_ksu Enter. Operation {op_id}. Params: {op_params}")
# self.logger.debug(f"Content: {content}")
workflow_name = f"move-ksu-{content['_id']}"
- return True, workflow_name
+ return True, workflow_name, None
async def clean_items_ksu_create(self, op_id, op_params_list, content_list):
)
except Exception as e:
self.logger.info(f"Cannot create secret {secret_name}: {e}")
- return False, f"Cannot create secret {secret_name}: {e}"
+ return False, f"Cannot create secret {secret_name}: {e}", None
private_subnet = op_params.get("private_subnet", [])
public_subnet = op_params.get("public_subnet", [])
)
except Exception as e:
self.logger.info(f"Cannot create configmap {configmap_name}: {e}")
- return False, f"Cannot create configmap {configmap_name}: {e}"
+ return False, f"Cannot create configmap {configmap_name}: {e}", None
# Additional params for the workflow
nodegroup_kustomization_name = nodegroup_name
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def scale_nodegroup(self, op_id, op_params, content):
)
except Exception as e:
self.logger.info(f"Cannot create secret {secret_name}: {e}")
- return False, f"Cannot create secret {secret_name}: {e}"
+ return False, f"Cannot create secret {secret_name}: {e}", None
# Additional params for the workflow
nodegroup_kustomization_name = nodegroup_name
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def delete_nodegroup(self, op_id, op_params, content):
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def clean_items_nodegroup_add(self, op_id, op_params, content):
await self._kubectl.create_pvc_with_content(
name=temp_volume_name,
namespace="osm-workflows",
- src_file=f"{oka_folder}/{oka_filename}",
- dest_filename=f"{oka_name}.tar.gz",
+ src_files=[f"{oka_folder}/{oka_filename}"],
+ dest_files=[f"{oka_name}.tar.gz"],
)
# Render workflow
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def update_oka(self, op_id, op_params, content):
await self._kubectl.create_pvc_with_content(
name=temp_volume_name,
namespace="osm-workflows",
- src_folder=oka_folder,
- filename=oka_filename,
+ src_files=[f"{oka_folder}/{oka_filename}"],
+ dest_files=[f"{oka_name}.tar.gz"],
)
# Render workflow
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def delete_oka(self, op_id, op_params, content):
api_plural="workflows",
api_version="v1alpha1",
)
- return True, workflow_name
+ return True, workflow_name, None
async def clean_items_oka_create(self, op_id, op_params_list, content_list):
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ name: {{ workflow_name }}
+spec:
+ arguments:
+ parameters:
+ # Full OSM SDK command to execute
+ - name: command
+ description: |
+ Full command string to execute with the OSM SDK.
+ Examples:
+ - "app create $environment": Deploy new application instance
+ - "app update $environment": Update existing application instance
+ - "app delete $environment": Remove application instance
+ This parameter accepts any valid OSM SDK command for maximum flexibility.
+ value: {{ app_command }}
+ # Volume reference for application models and parameters
+ # NOTE: The PVC must be created and populated with the following structure:
+ # /model/app_instance_model.yaml
+ # /model/parameters/clear/environment.yaml
+ - name: model_volume_name
+ value: "{{ app_model_pvc }}"
+ # Secret reference for mounting sensitive parameters
+ # This secret will be mounted as a file at:
+ # /model/parameters/secret/environment.yaml
+ - name: secret_name
+ value: "{{ app_secret_name }}"
+ # Fleet repo configuration
+ - name: git_fleet_url
+ value: "{{ git_fleet_url }}"
+ - name: fleet_destination_folder
+ value: "/fleet/fleet-osm"
+ - name: git_fleet_cred_secret
+ value: fleet-repo
+ # SW-Catalogs repo configuration
+ - name: git_sw_catalogs_url
+ value: "{{ git_sw_catalogs_url }}"
+ - name: sw_catalogs_destination_folder
+ value: "/sw-catalogs/sw-catalogs-osm"
+ - name: git_sw_catalogs_cred_secret
+ value: sw-catalogs
+ # Target deployment information
+ - name: app_name
+ value: "{{ app_name }}"
+ - name: profile_name
+ value: "{{ profile_name }}"
+ - name: profile_type
+ value: "{{ profile_type }}"
+ - name: project_name
+ value: "{{ osm_project_name }}"
+ # OSM SDK container configuration
+ - name: osm_sdk_image_repository
+ value: "opensourcemano/osm-nushell-krm-functions"
+ - name: osm_sdk_image_tag
+ value: "testing-daily"
+ # Debug and dry-run flags
+ - name: debug
+ value: "{{ workflow_debug }}"
+ - name: dry_run
+ value: "{{ workflow_dry_run }}"
+
+ # Cleanup policy
+ ttlStrategy:
+ secondsAfterCompletion: 1800 # Time to live after workflow is completed
+ secondsAfterSuccess: 1800 # Time to live after workflow is successful
+ secondsAfterFailure: 1800 # Time to live after workflow fails
+
+ workflowTemplateRef:
+ name: full-app-management-wft
vim_mgmt as odu_vim_mgmt,
cluster_mgmt as odu_cluster_mgmt,
nodegroup as odu_nodegroup,
+ app as odu_app,
ksu as odu_ksu,
oka as odu_oka,
profiles as odu_profiles,
"move_ksu": {
"workflow_function": self.move_ksu,
},
+ "create_app": {
+ "workflow_function": self.create_app,
+ "clean_function": self.clean_items_app_launch,
+ },
+ "update_app": {
+ "workflow_function": self.update_app,
+ "clean_function": self.clean_items_app_launch,
+ },
+ "delete_app": {
+ "workflow_function": self.delete_app,
+ "clean_function": self.clean_items_app_launch,
+ },
"create_cloud_credentials": {
"workflow_function": self.create_cloud_credentials,
"clean_function": self.clean_items_cloud_credentials_create,
delete_secret = odu_common.delete_secret
create_configmap = odu_common.create_configmap
delete_configmap = odu_common.delete_configmap
+ create_app = odu_app.create_app
+ update_app = odu_app.update_app
+ delete_app = odu_app.delete_app
+ launch_app = odu_app.launch_app
+ clean_items_app_launch = odu_app.clean_items_app_launch
async def launch_workflow(self, key, op_id, op_params, content):
self.logger.info(
- f"Workflow is getting into launch. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}"
+ f"Workflow is getting into launch. Key: {key}. Operation: {op_id}"
)
+ # self.logger.debug(f"Operation Params: {op_params}")
+ # self.logger.debug(f"Content: {content}")
workflow_function = self._workflows[key]["workflow_function"]
self.logger.info("workflow function : {}".format(workflow_function))
try:
- result, workflow_name = await workflow_function(op_id, op_params, content)
- return result, workflow_name
+ result, workflow_name, workflow_resources = await workflow_function(
+ op_id, op_params, content
+ )
+ return result, workflow_name, workflow_resources
except Exception as e:
self.logger.error(f"Error launching workflow: {e}")
- return False, str(e)
+ return False, str(e), None
async def dummy_clean_items(self, op_id, op_params, content):
self.logger.info(
async def dummy_operation(self, op_id, op_params, content):
self.logger.info("Empty operation status Enter")
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
- return content["workflow_name"]
+ return True, content["workflow_name"], None
async def clean_items(self, items):
# Delete pods
charset-normalizer
checksumdir
config-man
+GitPython
google-auth<2.18.0
grpcio-tools
grpclib
# via
# aiohttp
# aiosignal
+gitdb==4.0.12
+ # via gitpython
+gitpython==3.1.45
+ # via -r requirements.in
glom==24.11.0
# via config-man
google-auth==2.17.3
# paramiko
# pymacaroons
# python-dateutil
+smmap==5.0.2
+ # via gitdb
termcolor==3.2.0
# via fire
theblues==0.5.2