blob: 22d271726d44934ef2f6e81647a3aa0a0e5be04c [file] [log] [blame]
garciadeblas61a4c692025-07-17 13:04:13 +02001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import copy
17import logging
18import tempfile
19from time import time
20import traceback
21from git import Repo
22from osm_lcm.lcm_utils import LcmBase
23from osm_lcm import odu_workflows
24from osm_lcm.data_utils.list_utils import find_in_list
25from osm_lcm.n2vc.kubectl import Kubectl
26import yaml
27from urllib.parse import quote
28
29
30class GitOpsLcm(LcmBase):
31 db_collection = "gitops"
32 workflow_status = None
33 resource_status = None
34
35 profile_collection_mapping = {
36 "infra_controller_profiles": "k8sinfra_controller",
37 "infra_config_profiles": "k8sinfra_config",
38 "resource_profiles": "k8sresource",
39 "app_profiles": "k8sapp",
40 }
41
42 profile_type_mapping = {
43 "infra-controllers": "infra_controller_profiles",
44 "infra-configs": "infra_config_profiles",
45 "managed-resources": "resource_profiles",
46 "applications": "app_profiles",
47 }
48
49 def __init__(self, msg, lcm_tasks, config):
50 self.logger = logging.getLogger("lcm.gitops")
51 self.lcm_tasks = lcm_tasks
52 self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
53 self._checkloop_kustomization_timeout = 900
54 self._checkloop_resource_timeout = 900
55 self._workflows = {}
56 self.gitops_config = config["gitops"]
57 self.logger.debug(f"GitOps config: {self.gitops_config}")
58 self._repo_base_url = self.gitops_config.get("git_base_url")
59 self._repo_user = self.gitops_config.get("user")
60 self._repo_sw_catalogs_url = self.gitops_config.get(
61 "sw_catalogs_repo_url",
62 f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
63 )
64 self._repo_password = self.gitops_config.get("password", "OUM+O61Iy1")
65 self._full_repo_sw_catalogs_url = self.build_git_url_with_credentials(
66 self._repo_sw_catalogs_url
67 )
68 super().__init__(msg, self.logger)
69
70 def build_git_url_with_credentials(self, repo_url):
71 # Build authenticated URL if credentials were provided
72 if self._repo_password:
73 # URL-safe escape password
74 safe_user = quote(self._repo_user)
75 safe_pass = quote(self._repo_password)
76
77 # Insert credentials into the URL
78 # e.g. https://username:password@github.com/org/repo.git
79 auth_url = repo_url.replace("https://", f"https://{safe_user}:{safe_pass}@")
80 auth_url = repo_url.replace("http://", f"https://{safe_user}:{safe_pass}@")
81 else:
82 auth_url = repo_url
83 return auth_url
84
85 async def check_dummy_operation(self, op_id, op_params, content):
86 self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
87 return True, "OK"
88
89 def initialize_operation(self, item_id, op_id):
90 db_item = self.db.get_one(self.db_collection, {"_id": item_id})
91 operation = next(
92 (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
93 None,
94 )
95 operation["workflowState"] = "PROCESSING"
96 operation["resourceState"] = "NOT_READY"
97 operation["operationState"] = "IN_PROGRESS"
98 operation["gitOperationInfo"] = None
99 db_item["current_operation"] = operation["op_id"]
100 self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
101
102 def get_operation_params(self, item, operation_id):
103 operation_history = item.get("operationHistory", [])
104 operation = find_in_list(
105 operation_history, lambda op: op["op_id"] == operation_id
106 )
107 return operation.get("operationParams", {})
108
109 def get_operation_type(self, item, operation_id):
110 operation_history = item.get("operationHistory", [])
111 operation = find_in_list(
112 operation_history, lambda op: op["op_id"] == operation_id
113 )
114 return operation.get("operationType", {})
115
116 def update_state_operation_history(
117 self, content, op_id, workflow_state=None, resource_state=None
118 ):
119 self.logger.info(
120 f"Update state of operation {op_id} in Operation History in DB"
121 )
122 self.logger.info(
123 f"Workflow state: {workflow_state}. Resource state: {resource_state}"
124 )
125 self.logger.debug(f"Content: {content}")
126
127 op_num = 0
128 for operation in content["operationHistory"]:
129 self.logger.debug("Operations: {}".format(operation))
130 if operation["op_id"] == op_id:
131 self.logger.debug("Found operation number: {}".format(op_num))
132 if workflow_state is not None:
133 operation["workflowState"] = workflow_state
134
135 if resource_state is not None:
136 operation["resourceState"] = resource_state
137 break
138 op_num += 1
139 self.logger.debug("content: {}".format(content))
140
141 return content
142
143 def update_operation_history(
144 self, content, op_id, workflow_status=None, resource_status=None, op_end=True
145 ):
146 self.logger.info(
147 f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
148 )
149 self.logger.debug(f"Content: {content}")
150
151 op_num = 0
152 for operation in content["operationHistory"]:
153 self.logger.debug("Operations: {}".format(operation))
154 if operation["op_id"] == op_id:
155 self.logger.debug("Found operation number: {}".format(op_num))
156 if workflow_status is not None:
157 if workflow_status:
158 operation["workflowState"] = "COMPLETED"
159 operation["result"] = True
160 else:
161 operation["workflowState"] = "ERROR"
162 operation["operationState"] = "FAILED"
163 operation["result"] = False
164
165 if resource_status is not None:
166 if resource_status:
167 operation["resourceState"] = "READY"
168 operation["operationState"] = "COMPLETED"
169 operation["result"] = True
170 else:
171 operation["resourceState"] = "NOT_READY"
172 operation["operationState"] = "FAILED"
173 operation["result"] = False
174
175 if op_end:
176 now = time()
177 operation["endDate"] = now
178 break
179 op_num += 1
180 self.logger.debug("content: {}".format(content))
181
182 return content
183
184 async def check_workflow_and_update_db(self, op_id, workflow_name, db_content):
185 workflow_status, workflow_msg = await self.odu.check_workflow_status(
186 op_id, workflow_name
187 )
188 self.logger.info(
189 "Workflow Status: {} Workflow Message: {}".format(
190 workflow_status, workflow_msg
191 )
192 )
193 operation_type = self.get_operation_type(db_content, op_id)
194 if operation_type == "create" and workflow_status:
195 db_content["state"] = "CREATED"
196 elif operation_type == "create" and not workflow_status:
197 db_content["state"] = "FAILED_CREATION"
198 elif operation_type == "delete" and workflow_status:
199 db_content["state"] = "DELETED"
200 elif operation_type == "delete" and not workflow_status:
201 db_content["state"] = "FAILED_DELETION"
202
203 if workflow_status:
204 db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
205 else:
206 db_content["resourceState"] = "ERROR"
207
208 db_content = self.update_operation_history(
209 db_content, op_id, workflow_status, None
210 )
211 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
212 return workflow_status
213
214 async def check_resource_and_update_db(
215 self, resource_name, op_id, op_params, db_content
216 ):
217 workflow_status = True
218
219 resource_status, resource_msg = await self.check_resource_status(
220 resource_name, op_id, op_params, db_content
221 )
222 self.logger.info(
223 "Resource Status: {} Resource Message: {}".format(
224 resource_status, resource_msg
225 )
226 )
227
228 if resource_status:
229 db_content["resourceState"] = "READY"
230 else:
231 db_content["resourceState"] = "ERROR"
232
233 db_content = self.update_operation_history(
234 db_content, op_id, workflow_status, resource_status
235 )
236 db_content["operatingState"] = "IDLE"
237 db_content["current_operation"] = None
238 return resource_status, db_content
239
240 async def common_check_list(
241 self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None
242 ):
243 try:
244 for checking in checkings_list:
245 if checking["enable"]:
246 status, message = await self.odu.readiness_loop(
247 op_id=op_id,
248 item=checking["item"],
249 name=checking["name"],
250 namespace=checking["namespace"],
251 condition=checking.get("condition"),
252 deleted=checking.get("deleted", False),
253 timeout=checking["timeout"],
254 kubectl_obj=kubectl_obj,
255 )
256 if not status:
257 error_message = "Resources not ready: "
258 error_message += checking.get("error_message", "")
259 return status, f"{error_message}: {message}"
260 else:
261 db_item["resourceState"] = checking["resourceState"]
262 db_item = self.update_state_operation_history(
263 db_item, op_id, None, checking["resourceState"]
264 )
265 self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
266 except Exception as e:
267 self.logger.debug(traceback.format_exc())
268 self.logger.debug(f"Exception: {e}", exc_info=True)
269 return False, f"Unexpected exception: {e}"
270 return True, "OK"
271
272 async def check_resource_status(self, key, op_id, op_params, content):
273 self.logger.info(
274 f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
275 )
276 self.logger.debug(f"Check resource status. Content: {content}")
277 check_resource_function = self._workflows.get(key, {}).get(
278 "check_resource_function"
279 )
280 self.logger.info("check_resource function : {}".format(check_resource_function))
281 if check_resource_function:
282 return await check_resource_function(op_id, op_params, content)
283 else:
284 return await self.check_dummy_operation(op_id, op_params, content)
285
286 def check_force_delete_and_delete_from_db(
287 self, _id, workflow_status, resource_status, force
288 ):
289 self.logger.info(
290 f" Force: {force} Workflow status: {workflow_status} Resource Status: {resource_status}"
291 )
292 if force and (not workflow_status or not resource_status):
293 self.db.del_one(self.db_collection, {"_id": _id})
294 return True
295 return False
296
297 def decrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
298 self.db.encrypt_decrypt_fields(
299 content,
300 "decrypt",
301 fields,
302 schema_version="1.11",
303 salt=content["_id"],
304 )
305
306 def encrypt_age_keys(self, content, fields=["age_pubkey", "age_privkey"]):
307 self.db.encrypt_decrypt_fields(
308 content,
309 "encrypt",
310 fields,
311 schema_version="1.11",
312 salt=content["_id"],
313 )
314
315 def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
316 # This deep copy is intended to be passed to ODU workflows.
317 content_copy = copy.deepcopy(content)
318
319 # decrypting the key
320 self.db.encrypt_decrypt_fields(
321 content_copy,
322 "decrypt",
323 fields,
324 schema_version="1.11",
325 salt=content_copy["_id"],
326 )
327 return content_copy
328
329 def delete_ksu_dependency(self, _id, data):
330 used_oka = []
331 existing_oka = []
332
333 for oka_data in data["oka"]:
334 if oka_data.get("_id"):
335 used_oka.append(oka_data["_id"])
336
337 all_ksu_data = self.db.get_list("ksus", {})
338 for ksu_data in all_ksu_data:
339 if ksu_data["_id"] != _id:
340 for oka_data in ksu_data["oka"]:
341 if oka_data.get("_id"):
342 if oka_data["_id"] not in existing_oka:
343 existing_oka.append(oka_data["_id"])
344
345 self.logger.info(f"Used OKA: {used_oka}")
346 self.logger.info(f"Existing OKA: {existing_oka}")
347
348 for oka_id in used_oka:
349 if oka_id not in existing_oka:
350 self.db.set_one(
351 "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
352 )
353
354 return
355
356 def delete_profile_ksu(self, _id, profile_type):
357 filter_q = {"profile": {"_id": _id, "profile_type": profile_type}}
358 ksu_list = self.db.get_list("ksus", filter_q)
359 for ksu_data in ksu_list:
360 self.delete_ksu_dependency(ksu_data["_id"], ksu_data)
361
362 if ksu_list:
363 self.db.del_list("ksus", filter_q)
364 return
365
366 def cluster_kubectl(self, db_cluster):
367 cluster_kubeconfig = db_cluster["credentials"]
368 kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
369 with open(kubeconfig_path, "w") as kubeconfig_file:
370 yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
371 return Kubectl(config_file=kubeconfig_path)
372
373 def cloneGitRepo(self, repo_url, branch):
374 self.logger.debug(f"Cloning repo {repo_url}, branch {branch}")
375 tmpdir = tempfile.mkdtemp()
376 self.logger.debug(f"Created temp folder {tmpdir}")
377 cloned_repo = Repo.clone_from(
378 repo_url,
379 tmpdir,
380 allow_unsafe_options=True,
381 multi_options=["-c", "http.sslVerify=false"],
382 )
383 self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
384 assert cloned_repo
385 new_branch = cloned_repo.create_head(branch) # create a new branch
386 assert new_branch.checkout() == cloned_repo.active_branch
387 self.logger.debug(f"Current active branch: {cloned_repo.active_branch}")
388 self.logger.info(f"Repo {repo_url} cloned in {tmpdir}. New branch: {branch}")
389 return tmpdir
390
391 def createCommit(self, repo_dir, commit_msg):
392 repo = Repo(repo_dir)
393 self.logger.info(
394 f"Creating commit '{commit_msg}' in branch '{repo.active_branch}'"
395 )
396 self.logger.debug(f"Current active branch: {repo.active_branch}")
397 # repo.index.add('**')
398 repo.git.add(all=True)
399 repo.index.commit(commit_msg)
400 self.logger.info(
401 f"Commit '{commit_msg}' created in branch '{repo.active_branch}'"
402 )
403 self.logger.debug(f"Current active branch: {repo.active_branch}")
404 return repo.active_branch
405
406 def mergeGit(self, repo_dir, git_branch):
407 repo = Repo(repo_dir)
408 self.logger.info(f"Merging local branch '{git_branch}' into main")
409 with_git = False
410 if with_git:
411 try:
412 repo.git("checkout main")
413 repo.git(f"merge {git_branch}")
414 return True
415 except Exception as e:
416 self.logger.error(e)
417 return False
418 else:
419 # prepare a merge
420 main = repo.heads.main # right-hand side is ahead of us, in the future
421 merge_base = repo.merge_base(git_branch, main) # three-way merge
422 repo.index.merge_tree(main, base=merge_base) # write the merge into index
423 try:
424 # The merge is done in the branch
425 repo.index.commit(
426 f"Merged {git_branch} and main",
427 parent_commits=(git_branch.commit, main.commit),
428 )
429 # Now, git_branch is ahed of master. Now let master point to the recent commit
430 aux_head = repo.create_head("aux")
431 main.commit = aux_head.commit
432 repo.delete_head(aux_head)
433 assert main.checkout()
434 return True
435 except Exception as e:
436 self.logger.error(e)
437 return False
438
439 def pushToRemote(self, repo_dir):
440 repo = Repo(repo_dir)
441 self.logger.info("Pushing the change to remote")
442 # repo.remotes.origin.push(refspec='{}:{}'.format(local_branch, remote_branch))
443 repo.remotes.origin.push()
444 self.logger.info("Push done")
445 return True