| garciadeblas | 61a4c69 | 2025-07-17 13:04:13 +0200 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 12 | # implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | import copy |
| 17 | import logging |
| 18 | import tempfile |
| 19 | from time import time |
| 20 | import traceback |
| 21 | from git import Repo |
| 22 | from osm_lcm.lcm_utils import LcmBase |
| 23 | from osm_lcm import odu_workflows |
| 24 | from osm_lcm.data_utils.list_utils import find_in_list |
| 25 | from osm_lcm.n2vc.kubectl import Kubectl |
| 26 | import yaml |
| 27 | from urllib.parse import quote |
| 28 | |
| 29 | |
| 30 | class GitOpsLcm(LcmBase): |
| 31 | db_collection = "gitops" |
| 32 | workflow_status = None |
| 33 | resource_status = None |
| 34 | |
| 35 | profile_collection_mapping = { |
| 36 | "infra_controller_profiles": "k8sinfra_controller", |
| 37 | "infra_config_profiles": "k8sinfra_config", |
| 38 | "resource_profiles": "k8sresource", |
| 39 | "app_profiles": "k8sapp", |
| 40 | } |
| 41 | |
| 42 | profile_type_mapping = { |
| 43 | "infra-controllers": "infra_controller_profiles", |
| 44 | "infra-configs": "infra_config_profiles", |
| 45 | "managed-resources": "resource_profiles", |
| 46 | "applications": "app_profiles", |
| 47 | } |
| 48 | |
| 49 | def __init__(self, msg, lcm_tasks, config): |
| 50 | self.logger = logging.getLogger("lcm.gitops") |
| 51 | self.lcm_tasks = lcm_tasks |
| 52 | self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| 53 | self._checkloop_kustomization_timeout = 900 |
| 54 | self._checkloop_resource_timeout = 900 |
| 55 | self._workflows = {} |
| 56 | self.gitops_config = config["gitops"] |
| 57 | self.logger.debug(f"GitOps config: {self.gitops_config}") |
| 58 | self._repo_base_url = self.gitops_config.get("git_base_url") |
| 59 | self._repo_user = self.gitops_config.get("user") |
| 60 | self._repo_sw_catalogs_url = self.gitops_config.get( |
| 61 | "sw_catalogs_repo_url", |
| 62 | f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git", |
| 63 | ) |
| 64 | self._repo_password = self.gitops_config.get("password", "OUM+O61Iy1") |
| 65 | self._full_repo_sw_catalogs_url = self.build_git_url_with_credentials( |
| 66 | self._repo_sw_catalogs_url |
| 67 | ) |
| 68 | super().__init__(msg, self.logger) |
| 69 | |
| 70 | def build_git_url_with_credentials(self, repo_url): |
| 71 | # Build authenticated URL if credentials were provided |
| 72 | if self._repo_password: |
| 73 | # URL-safe escape password |
| 74 | safe_user = quote(self._repo_user) |
| 75 | safe_pass = quote(self._repo_password) |
| 76 | |
| 77 | # Insert credentials into the URL |
| 78 | # e.g. https://username:password@github.com/org/repo.git |
| 79 | auth_url = repo_url.replace("https://", f"https://{safe_user}:{safe_pass}@") |
| 80 | auth_url = repo_url.replace("http://", f"https://{safe_user}:{safe_pass}@") |
| 81 | else: |
| 82 | auth_url = repo_url |
| 83 | return auth_url |
| 84 | |
| 85 | async def check_dummy_operation(self, op_id, op_params, content): |
| 86 | self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}") |
| 87 | return True, "OK" |
| 88 | |
| 89 | def initialize_operation(self, item_id, op_id): |
| 90 | db_item = self.db.get_one(self.db_collection, {"_id": item_id}) |
| 91 | operation = next( |
| 92 | (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id), |
| 93 | None, |
| 94 | ) |
| 95 | operation["workflowState"] = "PROCESSING" |
| 96 | operation["resourceState"] = "NOT_READY" |
| 97 | operation["operationState"] = "IN_PROGRESS" |
| 98 | operation["gitOperationInfo"] = None |
| 99 | db_item["current_operation"] = operation["op_id"] |
| 100 | self.db.set_one(self.db_collection, {"_id": item_id}, db_item) |
| 101 | |
| 102 | def get_operation_params(self, item, operation_id): |
| 103 | operation_history = item.get("operationHistory", []) |
| 104 | operation = find_in_list( |
| 105 | operation_history, lambda op: op["op_id"] == operation_id |
| 106 | ) |
| 107 | return operation.get("operationParams", {}) |
| 108 | |
| 109 | def get_operation_type(self, item, operation_id): |
| 110 | operation_history = item.get("operationHistory", []) |
| 111 | operation = find_in_list( |
| 112 | operation_history, lambda op: op["op_id"] == operation_id |
| 113 | ) |
| 114 | return operation.get("operationType", {}) |
| 115 | |
| 116 | def update_state_operation_history( |
| 117 | self, content, op_id, workflow_state=None, resource_state=None |
| 118 | ): |
| 119 | self.logger.info( |
| 120 | f"Update state of operation {op_id} in Operation History in DB" |
| 121 | ) |
| 122 | self.logger.info( |
| 123 | f"Workflow state: {workflow_state}. Resource state: {resource_state}" |
| 124 | ) |
| 125 | self.logger.debug(f"Content: {content}") |
| 126 | |
| 127 | op_num = 0 |
| 128 | for operation in content["operationHistory"]: |
| 129 | self.logger.debug("Operations: {}".format(operation)) |
| 130 | if operation["op_id"] == op_id: |
| 131 | self.logger.debug("Found operation number: {}".format(op_num)) |
| 132 | if workflow_state is not None: |
| 133 | operation["workflowState"] = workflow_state |
| 134 | |
| 135 | if resource_state is not None: |
| 136 | operation["resourceState"] = resource_state |
| 137 | break |
| 138 | op_num += 1 |
| 139 | self.logger.debug("content: {}".format(content)) |
| 140 | |
| 141 | return content |
| 142 | |
| 143 | def update_operation_history( |
| 144 | self, content, op_id, workflow_status=None, resource_status=None, op_end=True |
| 145 | ): |
| 146 | self.logger.info( |
| 147 | f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}" |
| 148 | ) |
| 149 | self.logger.debug(f"Content: {content}") |
| 150 | |
| 151 | op_num = 0 |
| 152 | for operation in content["operationHistory"]: |
| 153 | self.logger.debug("Operations: {}".format(operation)) |
| 154 | if operation["op_id"] == op_id: |
| 155 | self.logger.debug("Found operation number: {}".format(op_num)) |
| 156 | if workflow_status is not None: |
| 157 | if workflow_status: |
| 158 | operation["workflowState"] = "COMPLETED" |
| 159 | operation["result"] = True |
| 160 | else: |
| 161 | operation["workflowState"] = "ERROR" |
| 162 | operation["operationState"] = "FAILED" |
| 163 | operation["result"] = False |
| 164 | |
| 165 | if resource_status is not None: |
| 166 | if resource_status: |
| 167 | operation["resourceState"] = "READY" |
| 168 | operation["operationState"] = "COMPLETED" |
| 169 | operation["result"] = True |
| 170 | else: |
| 171 | operation["resourceState"] = "NOT_READY" |
| 172 | operation["operationState"] = "FAILED" |
| 173 | operation["result"] = False |
| 174 | |
| 175 | if op_end: |
| 176 | now = time() |
| 177 | operation["endDate"] = now |
| 178 | break |
| 179 | op_num += 1 |
| 180 | self.logger.debug("content: {}".format(content)) |
| 181 | |
| 182 | return content |
| 183 | |
| 184 | async def check_workflow_and_update_db(self, op_id, workflow_name, db_content): |
| 185 | workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| 186 | op_id, workflow_name |
| 187 | ) |
| 188 | self.logger.info( |
| 189 | "Workflow Status: {} Workflow Message: {}".format( |
| 190 | workflow_status, workflow_msg |
| 191 | ) |
| 192 | ) |
| 193 | operation_type = self.get_operation_type(db_content, op_id) |
| 194 | if operation_type == "create" and workflow_status: |
| 195 | db_content["state"] = "CREATED" |
| 196 | elif operation_type == "create" and not workflow_status: |
| 197 | db_content["state"] = "FAILED_CREATION" |
| 198 | elif operation_type == "delete" and workflow_status: |
| 199 | db_content["state"] = "DELETED" |
| 200 | elif operation_type == "delete" and not workflow_status: |
| 201 | db_content["state"] = "FAILED_DELETION" |
| 202 | |
| 203 | if workflow_status: |
| 204 | db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| 205 | else: |
| 206 | db_content["resourceState"] = "ERROR" |
| 207 | |
| 208 | db_content = self.update_operation_history( |
| 209 | db_content, op_id, workflow_status, None |
| 210 | ) |
| 211 | self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| 212 | return workflow_status |
| 213 | |
| 214 | async def check_resource_and_update_db( |
| 215 | self, resource_name, op_id, op_params, db_content |
| 216 | ): |
| 217 | workflow_status = True |
| 218 | |
| 219 | resource_status, resource_msg = await self.check_resource_status( |
| 220 | resource_name, op_id, op_params, db_content |
| 221 | ) |
| 222 | self.logger.info( |
| 223 | "Resource Status: {} Resource Message: {}".format( |
| 224 | resource_status, resource_msg |
| 225 | ) |
| 226 | ) |
| 227 | |
| 228 | if resource_status: |
| 229 | db_content["resourceState"] = "READY" |
| 230 | else: |
| 231 | db_content["resourceState"] = "ERROR" |
| 232 | |
| 233 | db_content = self.update_operation_history( |
| 234 | db_content, op_id, workflow_status, resource_status |
| 235 | ) |
| 236 | db_content["operatingState"] = "IDLE" |
| 237 | db_content["current_operation"] = None |
| 238 | return resource_status, db_content |
| 239 | |
| 240 | async def common_check_list( |
| 241 | self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None |
| 242 | ): |
| 243 | try: |
| 244 | for checking in checkings_list: |
| 245 | if checking["enable"]: |
| 246 | status, message = await self.odu.readiness_loop( |
| 247 | op_id=op_id, |
| 248 | item=checking["item"], |
| 249 | name=checking["name"], |
| 250 | namespace=checking["namespace"], |
| 251 | condition=checking.get("condition"), |
| 252 | deleted=checking.get("deleted", False), |
| 253 | timeout=checking["timeout"], |
| 254 | kubectl_obj=kubectl_obj, |
| 255 | ) |
| 256 | if not status: |
| 257 | error_message = "Resources not ready: " |
| 258 | error_message += checking.get("error_message", "") |
| 259 | return status, f"{error_message}: {message}" |
| 260 | else: |
| 261 | db_item["resourceState"] = checking["resourceState"] |
| 262 | db_item = self.update_state_operation_history( |
| 263 | db_item, op_id, None, checking["resourceState"] |
| 264 | ) |
| 265 | self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item) |
| 266 | except Exception as e: |
| 267 | self.logger.debug(traceback.format_exc()) |
| 268 | self.logger.debug(f"Exception: {e}", exc_info=True) |
| 269 | return False, f"Unexpected exception: {e}" |
| 270 | return True, "OK" |
| 271 | |
| 272 | async def check_resource_status(self, key, op_id, op_params, content): |
| 273 | self.logger.info( |
| 274 | f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}." |
| 275 | ) |
| 276 | self.logger.debug(f"Check resource status. Content: {content}") |
| 277 | check_resource_function = self._workflows.get(key, {}).get( |
| 278 | "check_resource_function" |
| 279 | ) |
| 280 | self.logger.info("check_resource function : {}".format(check_resource_function)) |
| 281 | if check_resource_function: |
| 282 | return await check_resource_function(op_id, op_params, content) |
| 283 | else: |
| 284 | return await self.check_dummy_operation(op_id, op_params, content) |
| 285 | |
| 286 | def check_force_delete_and_delete_from_db( |
| 287 | self, _id, workflow_status, resource_status, force |
| 288 | ): |
| 289 | self.logger.info( |
| 290 | f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}" |
| 291 | ) |
| 292 | if force and (not workflow_status or not resource_status): |
| 293 | self.db.del_one(self.db_collection, {"_id": _id}) |
| 294 | return True |
| 295 | return False |
| 296 | |
| 297 | def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]): |
| 298 | self.db.encrypt_decrypt_fields( |
| 299 | content, |
| 300 | "decrypt", |
| 301 | fields, |
| 302 | schema_version="1.11", |
| 303 | salt=content["_id"], |
| 304 | ) |
| 305 | |
| 306 | def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]): |
| 307 | self.db.encrypt_decrypt_fields( |
| 308 | content, |
| 309 | "encrypt", |
| 310 | fields, |
| 311 | schema_version="1.11", |
| 312 | salt=content["_id"], |
| 313 | ) |
| 314 | |
| 315 | def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]): |
| 316 | # This deep copy is intended to be passed to ODU workflows. |
| 317 | content_copy = copy.deepcopy(content) |
| 318 | |
| 319 | # decrypting the key |
| 320 | self.db.encrypt_decrypt_fields( |
| 321 | content_copy, |
| 322 | "decrypt", |
| 323 | fields, |
| 324 | schema_version="1.11", |
| 325 | salt=content_copy["_id"], |
| 326 | ) |
| 327 | return content_copy |
| 328 | |
| 329 | def delete_ksu_dependency(self, _id, data): |
| 330 | used_oka = [] |
| 331 | existing_oka = [] |
| 332 | |
| 333 | for oka_data in data["oka"]: |
| 334 | if oka_data.get("_id"): |
| 335 | used_oka.append(oka_data["_id"]) |
| 336 | |
| 337 | all_ksu_data = self.db.get_list("ksus", {}) |
| 338 | for ksu_data in all_ksu_data: |
| 339 | if ksu_data["_id"] != _id: |
| 340 | for oka_data in ksu_data["oka"]: |
| 341 | if oka_data.get("_id"): |
| 342 | if oka_data["_id"] not in existing_oka: |
| 343 | existing_oka.append(oka_data["_id"]) |
| 344 | |
| 345 | self.logger.info(f"Used OKA: {used_oka}") |
| 346 | self.logger.info(f"Existing OKA: {existing_oka}") |
| 347 | |
| 348 | for oka_id in used_oka: |
| 349 | if oka_id not in existing_oka: |
| 350 | self.db.set_one( |
| 351 | "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"} |
| 352 | ) |
| 353 | |
| 354 | return |
| 355 | |
| 356 | def delete_profile_ksu(self, _id, profile_type): |
| 357 | filter_q = {"profile": {"_id": _id, "profile_type": profile_type}} |
| 358 | ksu_list = self.db.get_list("ksus", filter_q) |
| 359 | for ksu_data in ksu_list: |
| 360 | self.delete_ksu_dependency(ksu_data["_id"], ksu_data) |
| 361 | |
| 362 | if ksu_list: |
| 363 | self.db.del_list("ksus", filter_q) |
| 364 | return |
| 365 | |
| 366 | def cluster_kubectl(self, db_cluster): |
| 367 | cluster_kubeconfig = db_cluster["credentials"] |
| 368 | kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml" |
| 369 | with open(kubeconfig_path, "w") as kubeconfig_file: |
| 370 | yaml.safe_dump(cluster_kubeconfig, kubeconfig_file) |
| 371 | return Kubectl(config_file=kubeconfig_path) |
| 372 | |
| 373 | def cloneGitRepo(self, repo_url, branch): |
| 374 | self.logger.debug(f"Cloning repo {repo_url}, branch {branch}") |
| 375 | tmpdir = tempfile.mkdtemp() |
| 376 | self.logger.debug(f"Created temp folder {tmpdir}") |
| 377 | cloned_repo = Repo.clone_from( |
| 378 | repo_url, |
| 379 | tmpdir, |
| 380 | allow_unsafe_options=True, |
| 381 | multi_options=["-c", "http.sslVerify=false"], |
| 382 | ) |
| 383 | self.logger.debug(f"Current active branch: {cloned_repo.active_branch}") |
| 384 | assert cloned_repo |
| 385 | new_branch = cloned_repo.create_head(branch) # create a new branch |
| 386 | assert new_branch.checkout() == cloned_repo.active_branch |
| 387 | self.logger.debug(f"Current active branch: {cloned_repo.active_branch}") |
| 388 | self.logger.info(f"Repo {repo_url} cloned in {tmpdir}. New branch: {branch}") |
| 389 | return tmpdir |
| 390 | |
| 391 | def createCommit(self, repo_dir, commit_msg): |
| 392 | repo = Repo(repo_dir) |
| 393 | self.logger.info( |
| 394 | f"Creating commit '{commit_msg}' in branch '{repo.active_branch}'" |
| 395 | ) |
| 396 | self.logger.debug(f"Current active branch: {repo.active_branch}") |
| 397 | # repo.index.add('**') |
| 398 | repo.git.add(all=True) |
| 399 | repo.index.commit(commit_msg) |
| 400 | self.logger.info( |
| 401 | f"Commit '{commit_msg}' created in branch '{repo.active_branch}'" |
| 402 | ) |
| 403 | self.logger.debug(f"Current active branch: {repo.active_branch}") |
| 404 | return repo.active_branch |
| 405 | |
| 406 | def mergeGit(self, repo_dir, git_branch): |
| 407 | repo = Repo(repo_dir) |
| 408 | self.logger.info(f"Merging local branch '{git_branch}' into main") |
| 409 | with_git = False |
| 410 | if with_git: |
| 411 | try: |
| 412 | repo.git("checkout main") |
| 413 | repo.git(f"merge {git_branch}") |
| 414 | return True |
| 415 | except Exception as e: |
| 416 | self.logger.error(e) |
| 417 | return False |
| 418 | else: |
| 419 | # prepare a merge |
| 420 | main = repo.heads.main # right-hand side is ahead of us, in the future |
| 421 | merge_base = repo.merge_base(git_branch, main) # three-way merge |
| 422 | repo.index.merge_tree(main, base=merge_base) # write the merge into index |
| 423 | try: |
| 424 | # The merge is done in the branch |
| 425 | repo.index.commit( |
| 426 | f"Merged {git_branch} and main", |
| 427 | parent_commits=(git_branch.commit, main.commit), |
| 428 | ) |
| 429 | # Now, git_branch is ahed of master. Now let master point to the recent commit |
| 430 | aux_head = repo.create_head("aux") |
| 431 | main.commit = aux_head.commit |
| 432 | repo.delete_head(aux_head) |
| 433 | assert main.checkout() |
| 434 | return True |
| 435 | except Exception as e: |
| 436 | self.logger.error(e) |
| 437 | return False |
| 438 | |
| 439 | def pushToRemote(self, repo_dir): |
| 440 | repo = Repo(repo_dir) |
| 441 | self.logger.info("Pushing the change to remote") |
| 442 | # repo.remotes.origin.push(refspec='{}:{}'.format(local_branch, remote_branch)) |
| 443 | repo.remotes.origin.push() |
| 444 | self.logger.info("Push done") |
| 445 | return True |