blob: f3f7ad430ef9bab2d5c5ef7cae2bdbe2f7370fd9 [file] [log] [blame]
rshri932105f2024-07-05 15:11:55 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16__author__ = (
17 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
18 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
19)
20
garciadeblas61a4c692025-07-17 13:04:13 +020021import os
yshahd940c652024-10-17 06:11:12 +000022from time import time
garciadeblas72412282024-11-07 12:41:54 +010023import traceback
garciadeblasad6d1ba2025-01-22 16:02:18 +010024import yaml
garciadeblas61a4c692025-07-17 13:04:13 +020025from copy import deepcopy
26from osm_lcm import vim_sdn
27from osm_lcm.gitops import GitOpsLcm
28from osm_lcm.lcm_utils import LcmException
rshri932105f2024-07-05 15:11:55 +000029
yshah2f39b8a2024-12-19 11:06:24 +000030MAP_PROFILE = {
31 "infra_controller_profiles": "infra-controllers",
32 "infra_config_profiles": "infra-configs",
33 "resource_profiles": "managed_resources",
34 "app_profiles": "apps",
35}
36
rshri932105f2024-07-05 15:11:55 +000037
yshah83a30572025-06-13 08:38:49 +000038class NodeGroupLcm(GitOpsLcm):
39 db_collection = "nodegroups"
40
41 def __init__(self, msg, lcm_tasks, config):
42 """
43 Init, Connect to database, filesystem storage, and messaging
44 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
45 :return: None
46 """
47 super().__init__(msg, lcm_tasks, config)
48 self._workflows = {
49 "add_nodegroup": {
50 "check_resource_function": self.check_add_nodegroup,
51 },
52 "scale_nodegroup": {
53 "check_resource_function": self.check_scale_nodegroup,
54 },
55 "delete_nodegroup": {
56 "check_resource_function": self.check_delete_nodegroup,
57 },
58 }
59
60 async def create(self, params, order_id):
61 self.logger.info("Add NodeGroup Enter")
62
63 # To get the nodegroup and op ids
64 nodegroup_id = params["nodegroup_id"]
65 op_id = params["operation_id"]
66
67 # To initialize the operation states
68 self.initialize_operation(nodegroup_id, op_id)
69
70 # To get the nodegroup details and control plane from DB
71 db_nodegroup = self.db.get_one(self.db_collection, {"_id": nodegroup_id})
72 db_cluster = self.db.get_one("clusters", {"_id": db_nodegroup["cluster_id"]})
73
74 # To get the operation params details
75 op_params = self.get_operation_params(db_nodegroup, op_id)
76 self.logger.info(f"Operations Params: {op_params}")
77
78 db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
79
80 # To copy the cluster content and decrypting fields to use in workflows
81 workflow_content = {
82 "nodegroup": db_nodegroup,
83 "cluster": db_cluster,
84 "vim_account": db_vim,
85 }
86 self.logger.info(f"Workflow content: {workflow_content}")
87
garciadeblas61a4c692025-07-17 13:04:13 +020088 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah83a30572025-06-13 08:38:49 +000089 "add_nodegroup", op_id, op_params, workflow_content
90 )
91 self.logger.info("workflow_name is: {}".format(workflow_name))
92
93 workflow_status = await self.check_workflow_and_update_db(
94 op_id, workflow_name, db_nodegroup
95 )
96
97 # Clean items used in the workflow, no matter if the workflow succeeded
98 clean_status, clean_msg = await self.odu.clean_items_workflow(
99 "add_nodegroup", op_id, op_params, workflow_content
100 )
101 self.logger.info(
102 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
103 )
104 if workflow_status:
105 resource_status, content = await self.check_resource_and_update_db(
106 "add_nodegroup", op_id, op_params, db_nodegroup
107 )
108 self.db.set_one(self.db_collection, {"_id": db_nodegroup["_id"]}, db_nodegroup)
109 self.logger.info(f"Add NodeGroup Exit with resource status: {resource_status}")
110 return
111
112 async def check_add_nodegroup(self, op_id, op_params, content):
113 self.logger.info(f"check_add_nodegroup Operation {op_id}. Params: {op_params}.")
114 self.logger.info(f"Content: {content}")
115 db_nodegroup = content
116 nodegroup_name = db_nodegroup["git_name"].lower()
117 nodegroup_kustomization_name = nodegroup_name
118 checkings_list = [
119 {
120 "item": "kustomization",
121 "name": nodegroup_kustomization_name,
122 "namespace": "managed-resources",
123 "condition": {
124 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
125 "value": "True",
126 },
127 "timeout": self._checkloop_kustomization_timeout,
128 "enable": True,
129 "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
130 },
131 {
yshah4a2abbe2025-09-23 09:58:27 +0000132 "item": "nodegroup_aws",
yshah83a30572025-06-13 08:38:49 +0000133 "name": nodegroup_name,
134 "namespace": "",
135 "condition": {
136 "jsonpath_filter": "status.conditions[?(@.type=='Synced')].status",
137 "value": "True",
138 },
139 "timeout": self._checkloop_resource_timeout,
140 "enable": True,
141 "resourceState": "IN_PROGRESS.RESOURCE_SYNCED.NODEGROUP",
142 },
143 {
yshah4a2abbe2025-09-23 09:58:27 +0000144 "item": "nodegroup_aws",
yshah83a30572025-06-13 08:38:49 +0000145 "name": nodegroup_name,
146 "namespace": "",
147 "condition": {
148 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
149 "value": "True",
150 },
151 "timeout": self._checkloop_resource_timeout,
152 "enable": True,
153 "resourceState": "IN_PROGRESS.RESOURCE_READY.NODEGROUP",
154 },
155 ]
156 self.logger.info(f"Checking list: {checkings_list}")
157 result, message = await self.common_check_list(
158 op_id, checkings_list, "nodegroups", db_nodegroup
159 )
160 if not result:
161 return False, message
162 return True, "OK"
163
164 async def scale(self, params, order_id):
165 self.logger.info("Scale nodegroup Enter")
166
167 op_id = params["operation_id"]
168 nodegroup_id = params["nodegroup_id"]
169
170 # To initialize the operation states
171 self.initialize_operation(nodegroup_id, op_id)
172
173 db_nodegroup = self.db.get_one(self.db_collection, {"_id": nodegroup_id})
174 db_cluster = self.db.get_one("clusters", {"_id": db_nodegroup["cluster_id"]})
175 op_params = self.get_operation_params(db_nodegroup, op_id)
176 db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
177
178 workflow_content = {
179 "nodegroup": db_nodegroup,
180 "cluster": db_cluster,
181 "vim_account": db_vim,
182 }
183 self.logger.info(f"Workflow content: {workflow_content}")
184
garciadeblas61a4c692025-07-17 13:04:13 +0200185 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah83a30572025-06-13 08:38:49 +0000186 "scale_nodegroup", op_id, op_params, workflow_content
187 )
188 self.logger.info("workflow_name is: {}".format(workflow_name))
189
190 workflow_status = await self.check_workflow_and_update_db(
191 op_id, workflow_name, db_nodegroup
192 )
193
194 # Clean items used in the workflow, no matter if the workflow succeeded
195 clean_status, clean_msg = await self.odu.clean_items_workflow(
196 "scale_nodegroup", op_id, op_params, workflow_content
197 )
198 self.logger.info(
199 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
200 )
201
202 if workflow_status:
203 resource_status, content = await self.check_resource_and_update_db(
204 "scale_nodegroup", op_id, op_params, db_nodegroup
205 )
206
207 if resource_status:
208 db_nodegroup["state"] = "READY"
209 self.db.set_one(
210 self.db_collection, {"_id": db_nodegroup["_id"]}, db_nodegroup
211 )
212 self.logger.info(
213 f"Nodegroup Scale Exit with resource status: {resource_status}"
214 )
215 return
216
217 async def check_scale_nodegroup(self, op_id, op_params, content):
218 self.logger.info(
219 f"check_scale_nodegroup Operation {op_id}. Params: {op_params}."
220 )
221 self.logger.debug(f"Content: {content}")
222 db_nodegroup = content
223 nodegroup_name = db_nodegroup["git_name"].lower()
224 nodegroup_kustomization_name = nodegroup_name
225 checkings_list = [
226 {
227 "item": "kustomization",
228 "name": nodegroup_kustomization_name,
229 "namespace": "managed-resources",
230 "condition": {
231 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
232 "value": "True",
233 },
234 "timeout": self._checkloop_kustomization_timeout,
235 "enable": True,
236 "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
237 },
238 {
yshah4a2abbe2025-09-23 09:58:27 +0000239 "item": "nodegroup_aws",
yshah83a30572025-06-13 08:38:49 +0000240 "name": nodegroup_name,
241 "namespace": "",
242 "condition": {
243 "jsonpath_filter": "status.atProvider.scalingConfig[0].desiredSize",
244 "value": f"{op_params['node_count']}",
245 },
246 "timeout": self._checkloop_resource_timeout,
247 "enable": True,
248 "resourceState": "IN_PROGRESS.RESOURCE_SYNCED.NODEGROUP",
249 },
250 ]
251 self.logger.info(f"Checking list: {checkings_list}")
252 return await self.common_check_list(
253 op_id, checkings_list, "nodegroups", db_nodegroup
254 )
255
256 async def delete(self, params, order_id):
257 self.logger.info("Delete nodegroup Enter")
258
259 op_id = params["operation_id"]
260 nodegroup_id = params["nodegroup_id"]
261
262 # To initialize the operation states
263 self.initialize_operation(nodegroup_id, op_id)
264
265 db_nodegroup = self.db.get_one(self.db_collection, {"_id": nodegroup_id})
266 db_cluster = self.db.get_one("clusters", {"_id": db_nodegroup["cluster_id"]})
267 op_params = self.get_operation_params(db_nodegroup, op_id)
268
269 workflow_content = {"nodegroup": db_nodegroup, "cluster": db_cluster}
270
garciadeblas61a4c692025-07-17 13:04:13 +0200271 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah83a30572025-06-13 08:38:49 +0000272 "delete_nodegroup", op_id, op_params, workflow_content
273 )
274 self.logger.info("workflow_name is: {}".format(workflow_name))
275
276 workflow_status = await self.check_workflow_and_update_db(
277 op_id, workflow_name, db_nodegroup
278 )
279
280 # Clean items used in the workflow, no matter if the workflow succeeded
281 clean_status, clean_msg = await self.odu.clean_items_workflow(
282 "delete_nodegroup", op_id, op_params, workflow_content
283 )
284 self.logger.info(
285 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
286 )
287
288 if workflow_status:
289 resource_status, content = await self.check_resource_and_update_db(
290 "delete_nodegroup", op_id, op_params, db_nodegroup
291 )
292
293 if resource_status:
294 node_count = db_cluster.get("node_count")
295 new_node_count = node_count - 1
296 self.logger.info(f"New Node count: {new_node_count}")
297 db_cluster["node_count"] = new_node_count
298 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
299 db_nodegroup["state"] = "DELETED"
300 self.db.set_one(
301 self.db_collection, {"_id": db_nodegroup["_id"]}, db_nodegroup
302 )
303 self.db.del_one(self.db_collection, {"_id": db_nodegroup["_id"]})
304 self.logger.info(
305 f"Nodegroup Delete Exit with resource status: {resource_status}"
306 )
307 return
308
309 async def check_delete_nodegroup(self, op_id, op_params, content):
310 self.logger.info(
311 f"check_delete_nodegroup Operation {op_id}. Params: {op_params}."
312 )
313 db_nodegroup = content
314 nodegroup_name = db_nodegroup["git_name"].lower()
315 nodegroup_kustomization_name = nodegroup_name
316 checkings_list = [
317 {
318 "item": "kustomization",
319 "name": nodegroup_kustomization_name,
320 "namespace": "managed-resources",
321 "deleted": True,
322 "timeout": self._checkloop_kustomization_timeout,
323 "enable": True,
324 "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
325 },
326 {
yshah4a2abbe2025-09-23 09:58:27 +0000327 "item": "nodegroup_aws",
yshah83a30572025-06-13 08:38:49 +0000328 "name": nodegroup_name,
329 "namespace": "",
330 "deleted": True,
331 "timeout": self._checkloop_resource_timeout,
332 "enable": True,
333 "resourceState": "IN_PROGRESS.RESOURCE_DELETED.NODEGROUP",
334 },
335 ]
336 self.logger.info(f"Checking list: {checkings_list}")
337 return await self.common_check_list(
338 op_id, checkings_list, "nodegroups", db_nodegroup
339 )
340
341
garciadeblas72412282024-11-07 12:41:54 +0100342class ClusterLcm(GitOpsLcm):
garciadeblas96b94f52024-07-08 16:18:21 +0200343 db_collection = "clusters"
rshri932105f2024-07-05 15:11:55 +0000344
345 def __init__(self, msg, lcm_tasks, config):
346 """
347 Init, Connect to database, filesystem storage, and messaging
348 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
349 :return: None
350 """
garciadeblas72412282024-11-07 12:41:54 +0100351 super().__init__(msg, lcm_tasks, config)
352 self._workflows = {
353 "create_cluster": {
354 "check_resource_function": self.check_create_cluster,
355 },
garciadeblasb0a42c22024-11-13 16:00:10 +0100356 "register_cluster": {
357 "check_resource_function": self.check_register_cluster,
358 },
359 "update_cluster": {
360 "check_resource_function": self.check_update_cluster,
361 },
garciadeblasad6d1ba2025-01-22 16:02:18 +0100362 "delete_cluster": {
363 "check_resource_function": self.check_delete_cluster,
364 },
garciadeblasf6dc6042026-02-04 17:40:30 +0100365 "deregister_cluster": {
366 "check_resource_function": self.check_deregister_cluster,
367 },
368 "purge_cluster": {
369 "check_resource_function": self.check_purge_cluster,
370 },
garciadeblas72412282024-11-07 12:41:54 +0100371 }
rshri932105f2024-07-05 15:11:55 +0000372 self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
373
rshri948f7de2024-12-02 03:42:35 +0000374 async def create(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000375 self.logger.info("cluster Create Enter")
garciadeblas8bdb3d42025-04-04 00:19:13 +0200376 workflow_status = None
377 resource_status = None
rshri932105f2024-07-05 15:11:55 +0000378
garciadeblas995cbf32024-12-18 12:54:00 +0100379 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000380 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000381 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000382
383 # To initialize the operation states
384 self.initialize_operation(cluster_id, op_id)
385
garciadeblas995cbf32024-12-18 12:54:00 +0100386 # To get the cluster
387 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
388
389 # To get the operation params details
390 op_params = self.get_operation_params(db_cluster, op_id)
391
392 # To copy the cluster content and decrypting fields to use in workflows
garciadeblasd41e9292025-03-11 15:44:25 +0100393 db_cluster_copy = self.decrypted_copy(db_cluster)
garciadeblas995cbf32024-12-18 12:54:00 +0100394 workflow_content = {
garciadeblasd41e9292025-03-11 15:44:25 +0100395 "cluster": db_cluster_copy,
garciadeblas995cbf32024-12-18 12:54:00 +0100396 }
rshric3564942024-11-12 18:12:38 +0000397
rshri948f7de2024-12-02 03:42:35 +0000398 # To get the vim account details
rshric3564942024-11-12 18:12:38 +0000399 db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
garciadeblas995cbf32024-12-18 12:54:00 +0100400 workflow_content["vim_account"] = db_vim
rshric3564942024-11-12 18:12:38 +0000401
garciadeblas61a4c692025-07-17 13:04:13 +0200402 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100403 "create_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200404 )
garciadeblas41859ce2025-02-04 16:08:51 +0100405 if not workflow_res:
406 self.logger.error(f"Failed to launch workflow: {workflow_name}")
407 db_cluster["state"] = "FAILED_CREATION"
408 db_cluster["resourceState"] = "ERROR"
409 db_cluster = self.update_operation_history(
410 db_cluster, op_id, workflow_status=False, resource_status=None
411 )
412 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
413 # Clean items used in the workflow, no matter if the workflow succeeded
414 clean_status, clean_msg = await self.odu.clean_items_workflow(
415 "create_cluster", op_id, op_params, workflow_content
416 )
417 self.logger.info(
418 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
419 )
420 return
rshri932105f2024-07-05 15:11:55 +0000421
garciadeblas26d733c2025-02-03 16:12:43 +0100422 self.logger.info("workflow_name is: {}".format(workflow_name))
garciadeblas96b94f52024-07-08 16:18:21 +0200423 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +0100424 op_id, workflow_name
garciadeblas96b94f52024-07-08 16:18:21 +0200425 )
rshri932105f2024-07-05 15:11:55 +0000426 self.logger.info(
garciadeblas26d733c2025-02-03 16:12:43 +0100427 "workflow_status is: {} and workflow_msg is: {}".format(
rshri932105f2024-07-05 15:11:55 +0000428 workflow_status, workflow_msg
429 )
430 )
431 if workflow_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200432 db_cluster["state"] = "CREATED"
433 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
rshri932105f2024-07-05 15:11:55 +0000434 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200435 db_cluster["state"] = "FAILED_CREATION"
436 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000437 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000438 db_cluster = self.update_operation_history(
439 db_cluster, op_id, workflow_status, None
440 )
garciadeblas96b94f52024-07-08 16:18:21 +0200441 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +0000442
garciadeblas28bff0f2024-09-16 12:53:07 +0200443 # Clean items used in the workflow, no matter if the workflow succeeded
444 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100445 "create_cluster", op_id, op_params, workflow_content
garciadeblas28bff0f2024-09-16 12:53:07 +0200446 )
447 self.logger.info(
448 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
449 )
450
rshri932105f2024-07-05 15:11:55 +0000451 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +0100452 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +0100453 "create_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +0000454 )
455 self.logger.info(
456 "resource_status is :{} and resource_msg is :{}".format(
457 resource_status, resource_msg
458 )
459 )
460 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200461 db_cluster["resourceState"] = "READY"
rshri932105f2024-07-05 15:11:55 +0000462 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200463 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000464
garciadeblas96b94f52024-07-08 16:18:21 +0200465 db_cluster["operatingState"] = "IDLE"
466 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000467 db_cluster, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +0000468 )
shahithya70a3fc92024-11-12 11:01:05 +0000469 db_cluster["current_operation"] = None
garciadeblas41a600e2025-01-21 11:49:38 +0100470
rshrif8911b92025-06-11 18:19:07 +0000471 # Retrieve credentials and subnets and register the cluster in k8sclusters collection
garciadeblas41a600e2025-01-21 11:49:38 +0100472 cluster_creds = None
rshrif8911b92025-06-11 18:19:07 +0000473 db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]})
garciadeblas41a600e2025-01-21 11:49:38 +0100474 if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED":
rshrif8911b92025-06-11 18:19:07 +0000475 # Retrieve credentials
garciadeblas41a600e2025-01-21 11:49:38 +0100476 result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
477 # TODO: manage the case where the credentials are not available
478 if result:
479 db_cluster["credentials"] = cluster_creds
480
rshrif8911b92025-06-11 18:19:07 +0000481 # Retrieve subnets
yshah1f070ba2025-09-23 09:47:12 +0000482 if op_params.get("private_subnet") and op_params.get("public_subnet"):
483 db_cluster["private_subnet"] = op_params["private_subnet"]
484 db_cluster["public_subnet"] = op_params["public_subnet"]
485 else:
486 if db_vim["vim_type"] == "aws":
487 generic_object = await self.odu.list_object(
488 api_group="ec2.aws.upbound.io",
489 api_plural="subnets",
490 api_version="v1beta1",
491 )
492 private_subnet = []
493 public_subnet = []
494 for subnet in generic_object:
495 labels = subnet.get("metadata", {}).get("labels", {})
496 status = subnet.get("status", {}).get("atProvider", {})
497 # Extract relevant label values
498 cluster_label = labels.get("cluster")
499 access_label = labels.get("access")
500 subnet_id = status.get("id")
501 # Apply filtering
502 if cluster_label == db_cluster["name"] and subnet_id:
503 if access_label == "private":
504 private_subnet.append(subnet_id)
505 elif access_label == "public":
506 public_subnet.append(subnet_id)
507 # Update db_cluster
508 db_cluster["private_subnet"] = private_subnet
509 db_cluster["public_subnet"] = public_subnet
510 self.logger.info("DB cluster: {}".format(db_cluster))
rshri948f7de2024-12-02 03:42:35 +0000511
rshrif8911b92025-06-11 18:19:07 +0000512 # Register the cluster in k8sclusters collection
rshri948f7de2024-12-02 03:42:35 +0000513 db_register["credentials"] = cluster_creds
garciadeblas41a600e2025-01-21 11:49:38 +0100514 # To call the lcm.py for registering the cluster in k8scluster lcm.
rshri948f7de2024-12-02 03:42:35 +0000515 self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
516 register = await self.regist.create(db_register, order_id)
517 self.logger.debug(f"Register is : {register}")
518 else:
519 db_register["_admin"]["operationalState"] = "ERROR"
rshri948f7de2024-12-02 03:42:35 +0000520 self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
521
rshrif8911b92025-06-11 18:19:07 +0000522 # Update db_cluster
523 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
524 self.update_default_profile_agekeys(db_cluster_copy)
525 self.update_profile_state(db_cluster, workflow_status, resource_status)
526
rshri932105f2024-07-05 15:11:55 +0000527 return
528
garciadeblas72412282024-11-07 12:41:54 +0100529 async def check_create_cluster(self, op_id, op_params, content):
garciadeblas7eae6f42024-11-08 10:41:38 +0100530 self.logger.info(
531 f"check_create_cluster Operation {op_id}. Params: {op_params}."
532 )
garciadeblas72412282024-11-07 12:41:54 +0100533 db_cluster = content["cluster"]
534 cluster_name = db_cluster["git_name"].lower()
535 cluster_kustomization_name = cluster_name
536 db_vim_account = content["vim_account"]
537 cloud_type = db_vim_account["vim_type"]
garciadeblas1ca09852025-05-30 11:19:06 +0200538 nodegroup_name = ""
garciadeblas72412282024-11-07 12:41:54 +0100539 if cloud_type == "aws":
garciadeblas1ca09852025-05-30 11:19:06 +0200540 nodegroup_name = f"{cluster_name}-nodegroup"
garciadeblas72412282024-11-07 12:41:54 +0100541 cluster_name = f"{cluster_name}-cluster"
542 elif cloud_type == "gcp":
garciadeblas1ca09852025-05-30 11:19:06 +0200543 nodegroup_name = f"nodepool-{cluster_name}"
garciadeblas72412282024-11-07 12:41:54 +0100544 bootstrap = op_params.get("bootstrap", True)
545 if cloud_type in ("azure", "gcp", "aws"):
546 checkings_list = [
547 {
548 "item": "kustomization",
549 "name": cluster_kustomization_name,
550 "namespace": "managed-resources",
garciadeblas7cf480d2025-01-27 16:53:45 +0100551 "condition": {
552 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
553 "value": "True",
554 },
yshahcb9075f2024-11-22 12:08:57 +0000555 "timeout": 1500,
garciadeblas72412282024-11-07 12:41:54 +0100556 "enable": True,
garciadeblas7eae6f42024-11-08 10:41:38 +0100557 "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
garciadeblas72412282024-11-07 12:41:54 +0100558 },
559 {
560 "item": f"cluster_{cloud_type}",
561 "name": cluster_name,
562 "namespace": "",
garciadeblas7cf480d2025-01-27 16:53:45 +0100563 "condition": {
564 "jsonpath_filter": "status.conditions[?(@.type=='Synced')].status",
565 "value": "True",
566 },
garciadeblas72412282024-11-07 12:41:54 +0100567 "timeout": self._checkloop_resource_timeout,
568 "enable": True,
garciadeblas7eae6f42024-11-08 10:41:38 +0100569 "resourceState": "IN_PROGRESS.RESOURCE_SYNCED.CLUSTER",
garciadeblas72412282024-11-07 12:41:54 +0100570 },
571 {
572 "item": f"cluster_{cloud_type}",
573 "name": cluster_name,
574 "namespace": "",
garciadeblas7cf480d2025-01-27 16:53:45 +0100575 "condition": {
576 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
577 "value": "True",
578 },
garciadeblas72412282024-11-07 12:41:54 +0100579 "timeout": self._checkloop_resource_timeout,
580 "enable": True,
garciadeblas7eae6f42024-11-08 10:41:38 +0100581 "resourceState": "IN_PROGRESS.RESOURCE_READY.CLUSTER",
garciadeblas72412282024-11-07 12:41:54 +0100582 },
583 {
584 "item": "kustomization",
585 "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
586 "namespace": "managed-resources",
garciadeblas7cf480d2025-01-27 16:53:45 +0100587 "condition": {
588 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
589 "value": "True",
590 },
yshahcb9075f2024-11-22 12:08:57 +0000591 "timeout": self._checkloop_resource_timeout,
garciadeblas72412282024-11-07 12:41:54 +0100592 "enable": bootstrap,
garciadeblas7eae6f42024-11-08 10:41:38 +0100593 "resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
garciadeblas72412282024-11-07 12:41:54 +0100594 },
595 ]
596 else:
597 return False, "Not suitable VIM account to check cluster status"
rshrif8911b92025-06-11 18:19:07 +0000598 if cloud_type != "aws":
599 if nodegroup_name:
600 nodegroup_check = {
601 "item": f"nodegroup_{cloud_type}",
602 "name": nodegroup_name,
603 "namespace": "",
604 "condition": {
605 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
606 "value": "True",
607 },
608 "timeout": self._checkloop_resource_timeout,
609 "enable": True,
610 "resourceState": "IN_PROGRESS.RESOURCE_READY.NODEGROUP",
611 }
612 checkings_list.insert(3, nodegroup_check)
yshahcb9075f2024-11-22 12:08:57 +0000613 return await self.common_check_list(
614 op_id, checkings_list, "clusters", db_cluster
615 )
garciadeblas72412282024-11-07 12:41:54 +0100616
garciadeblasd41e9292025-03-11 15:44:25 +0100617 def update_default_profile_agekeys(self, db_cluster):
618 profiles = [
619 "infra_controller_profiles",
620 "infra_config_profiles",
621 "app_profiles",
622 "resource_profiles",
623 ]
624 self.logger.debug("the db_cluster is :{}".format(db_cluster))
625 for profile_type in profiles:
626 profile_id = db_cluster[profile_type]
627 db_collection = self.profile_collection_mapping[profile_type]
628 db_profile = self.db.get_one(db_collection, {"_id": profile_id})
629 db_profile["age_pubkey"] = db_cluster["age_pubkey"]
630 db_profile["age_privkey"] = db_cluster["age_privkey"]
631 self.encrypt_age_keys(db_profile)
632 self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
633
garciadeblas96b94f52024-07-08 16:18:21 +0200634 def update_profile_state(self, db_cluster, workflow_status, resource_status):
rshri932105f2024-07-05 15:11:55 +0000635 profiles = [
636 "infra_controller_profiles",
637 "infra_config_profiles",
638 "app_profiles",
639 "resource_profiles",
640 ]
garciadeblasd41e9292025-03-11 15:44:25 +0100641 self.logger.debug("the db_cluster is :{}".format(db_cluster))
rshri932105f2024-07-05 15:11:55 +0000642 for profile_type in profiles:
garciadeblas96b94f52024-07-08 16:18:21 +0200643 profile_id = db_cluster[profile_type]
rshri948f7de2024-12-02 03:42:35 +0000644 db_collection = self.profile_collection_mapping[profile_type]
rshri932105f2024-07-05 15:11:55 +0000645 db_profile = self.db.get_one(db_collection, {"_id": profile_id})
yshahcb9075f2024-11-22 12:08:57 +0000646 op_id = db_profile["operationHistory"][-1].get("op_id")
garciadeblas96b94f52024-07-08 16:18:21 +0200647 db_profile["state"] = db_cluster["state"]
648 db_profile["resourceState"] = db_cluster["resourceState"]
649 db_profile["operatingState"] = db_cluster["operatingState"]
rshri932105f2024-07-05 15:11:55 +0000650 db_profile = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000651 db_profile, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +0000652 )
rshri932105f2024-07-05 15:11:55 +0000653 self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
654
rshri948f7de2024-12-02 03:42:35 +0000655 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000656 self.logger.info("cluster delete Enter")
rshri948f7de2024-12-02 03:42:35 +0000657
garciadeblas926ffac2025-02-12 16:45:40 +0100658 try:
659 # To get the cluster and op ids
660 cluster_id = params["cluster_id"]
661 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000662
garciadeblas926ffac2025-02-12 16:45:40 +0100663 # To initialize the operation states
664 self.initialize_operation(cluster_id, op_id)
rshri948f7de2024-12-02 03:42:35 +0000665
garciadeblas926ffac2025-02-12 16:45:40 +0100666 # To get the cluster
667 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
garciadeblas995cbf32024-12-18 12:54:00 +0100668
garciadeblas926ffac2025-02-12 16:45:40 +0100669 # To get the operation params details
670 op_params = self.get_operation_params(db_cluster, op_id)
garciadeblas995cbf32024-12-18 12:54:00 +0100671
garciadeblas926ffac2025-02-12 16:45:40 +0100672 # To copy the cluster content and decrypting fields to use in workflows
673 workflow_content = {
674 "cluster": self.decrypted_copy(db_cluster),
675 }
rshri948f7de2024-12-02 03:42:35 +0000676
garciadeblas926ffac2025-02-12 16:45:40 +0100677 # To get the vim account details
678 db_vim = self.db.get_one(
679 "vim_accounts", {"name": db_cluster["vim_account"]}
680 )
681 workflow_content["vim_account"] = db_vim
682 except Exception as e:
683 self.logger.debug(traceback.format_exc())
684 self.logger.debug(f"Exception: {e}", exc_info=True)
685 raise e
garciadeblasad6d1ba2025-01-22 16:02:18 +0100686
garciadeblas61a4c692025-07-17 13:04:13 +0200687 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100688 "delete_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200689 )
garciadeblas41859ce2025-02-04 16:08:51 +0100690 if not workflow_res:
691 self.logger.error(f"Failed to launch workflow: {workflow_name}")
692 db_cluster["state"] = "FAILED_DELETION"
693 db_cluster["resourceState"] = "ERROR"
694 db_cluster = self.update_operation_history(
695 db_cluster, op_id, workflow_status=False, resource_status=None
696 )
697 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
698 # Clean items used in the workflow, no matter if the workflow succeeded
699 clean_status, clean_msg = await self.odu.clean_items_workflow(
700 "delete_cluster", op_id, op_params, workflow_content
701 )
702 self.logger.info(
703 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
704 )
705 return
rshri932105f2024-07-05 15:11:55 +0000706
garciadeblas26d733c2025-02-03 16:12:43 +0100707 self.logger.info("workflow_name is: {}".format(workflow_name))
garciadeblas96b94f52024-07-08 16:18:21 +0200708 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +0100709 op_id, workflow_name
garciadeblas96b94f52024-07-08 16:18:21 +0200710 )
rshri932105f2024-07-05 15:11:55 +0000711 self.logger.info(
garciadeblas26d733c2025-02-03 16:12:43 +0100712 "workflow_status is: {} and workflow_msg is: {}".format(
rshri932105f2024-07-05 15:11:55 +0000713 workflow_status, workflow_msg
714 )
715 )
716 if workflow_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200717 db_cluster["state"] = "DELETED"
718 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
rshri932105f2024-07-05 15:11:55 +0000719 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200720 db_cluster["state"] = "FAILED_DELETION"
721 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000722 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000723 db_cluster = self.update_operation_history(
724 db_cluster, op_id, workflow_status, None
725 )
garciadeblas96b94f52024-07-08 16:18:21 +0200726 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +0000727
garciadeblas98f9a3d2024-12-10 13:42:47 +0100728 # Clean items used in the workflow or in the cluster, no matter if the workflow succeeded
729 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100730 "delete_cluster", op_id, op_params, workflow_content
garciadeblas98f9a3d2024-12-10 13:42:47 +0100731 )
732 self.logger.info(
733 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
734 )
735
rshri932105f2024-07-05 15:11:55 +0000736 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +0100737 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +0100738 "delete_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +0000739 )
740 self.logger.info(
741 "resource_status is :{} and resource_msg is :{}".format(
742 resource_status, resource_msg
743 )
744 )
745 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200746 db_cluster["resourceState"] = "READY"
rshri932105f2024-07-05 15:11:55 +0000747 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200748 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000749
garciadeblas96b94f52024-07-08 16:18:21 +0200750 db_cluster["operatingState"] = "IDLE"
751 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000752 db_cluster, op_id, workflow_status, resource_status
garciadeblas96b94f52024-07-08 16:18:21 +0200753 )
shahithya70a3fc92024-11-12 11:01:05 +0000754 db_cluster["current_operation"] = None
garciadeblas96b94f52024-07-08 16:18:21 +0200755 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +0000756
yshahb36649f2025-02-28 09:01:51 +0000757 force = params.get("force", False)
758 if force:
759 force_delete_status = self.check_force_delete_and_delete_from_db(
760 cluster_id, workflow_status, resource_status, force
761 )
762 if force_delete_status:
763 return
764
garciadeblas96b94f52024-07-08 16:18:21 +0200765 # To delete it from DB
766 if db_cluster["state"] == "DELETED":
767 self.delete_cluster(db_cluster)
rshri948f7de2024-12-02 03:42:35 +0000768
769 # To delete it from k8scluster collection
770 self.db.del_one("k8sclusters", {"name": db_cluster["name"]})
771
rshri932105f2024-07-05 15:11:55 +0000772 return
773
garciadeblasad6d1ba2025-01-22 16:02:18 +0100774 async def check_delete_cluster(self, op_id, op_params, content):
775 self.logger.info(
776 f"check_delete_cluster Operation {op_id}. Params: {op_params}."
777 )
778 self.logger.debug(f"Content: {content}")
779 db_cluster = content["cluster"]
780 cluster_name = db_cluster["git_name"].lower()
781 cluster_kustomization_name = cluster_name
782 db_vim_account = content["vim_account"]
783 cloud_type = db_vim_account["vim_type"]
784 if cloud_type == "aws":
785 cluster_name = f"{cluster_name}-cluster"
786 if cloud_type in ("azure", "gcp", "aws"):
787 checkings_list = [
788 {
789 "item": "kustomization",
790 "name": cluster_kustomization_name,
791 "namespace": "managed-resources",
792 "deleted": True,
793 "timeout": self._checkloop_kustomization_timeout,
794 "enable": True,
795 "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
796 },
797 {
798 "item": f"cluster_{cloud_type}",
799 "name": cluster_name,
800 "namespace": "",
801 "deleted": True,
802 "timeout": self._checkloop_resource_timeout,
803 "enable": True,
804 "resourceState": "IN_PROGRESS.RESOURCE_DELETED.CLUSTER",
805 },
806 ]
807 else:
808 return False, "Not suitable VIM account to check cluster status"
809 return await self.common_check_list(
810 op_id, checkings_list, "clusters", db_cluster
811 )
812
garciadeblas96b94f52024-07-08 16:18:21 +0200813 def delete_cluster(self, db_cluster):
814 # Actually, item_content is equal to db_cluster
rshri932105f2024-07-05 15:11:55 +0000815 # detach profiles
816 update_dict = None
817 profiles_to_detach = [
818 "infra_controller_profiles",
819 "infra_config_profiles",
820 "app_profiles",
821 "resource_profiles",
822 ]
rshri948f7de2024-12-02 03:42:35 +0000823 """
rshri932105f2024-07-05 15:11:55 +0000824 profiles_collection = {
825 "infra_controller_profiles": "k8sinfra_controller",
826 "infra_config_profiles": "k8sinfra_config",
827 "app_profiles": "k8sapp",
828 "resource_profiles": "k8sresource",
829 }
rshri948f7de2024-12-02 03:42:35 +0000830 """
rshri932105f2024-07-05 15:11:55 +0000831 for profile_type in profiles_to_detach:
garciadeblas96b94f52024-07-08 16:18:21 +0200832 if db_cluster.get(profile_type):
garciadeblas96b94f52024-07-08 16:18:21 +0200833 profile_ids = db_cluster[profile_type]
rshri932105f2024-07-05 15:11:55 +0000834 profile_ids_copy = deepcopy(profile_ids)
rshri932105f2024-07-05 15:11:55 +0000835 for profile_id in profile_ids_copy:
rshri948f7de2024-12-02 03:42:35 +0000836 db_collection = self.profile_collection_mapping[profile_type]
rshri932105f2024-07-05 15:11:55 +0000837 db_profile = self.db.get_one(db_collection, {"_id": profile_id})
garciadeblasc2552852024-10-22 12:39:32 +0200838 self.logger.debug("the db_profile is :{}".format(db_profile))
839 self.logger.debug(
garciadeblas96b94f52024-07-08 16:18:21 +0200840 "the item_content name is :{}".format(db_cluster["name"])
rshri932105f2024-07-05 15:11:55 +0000841 )
garciadeblasc2552852024-10-22 12:39:32 +0200842 self.logger.debug(
rshri932105f2024-07-05 15:11:55 +0000843 "the db_profile name is :{}".format(db_profile["name"])
844 )
garciadeblas96b94f52024-07-08 16:18:21 +0200845 if db_cluster["name"] == db_profile["name"]:
yshah6bad8892025-02-11 12:37:04 +0000846 self.delete_profile_ksu(profile_id, profile_type)
rshri932105f2024-07-05 15:11:55 +0000847 self.db.del_one(db_collection, {"_id": profile_id})
848 else:
rshri932105f2024-07-05 15:11:55 +0000849 profile_ids.remove(profile_id)
850 update_dict = {profile_type: profile_ids}
rshri932105f2024-07-05 15:11:55 +0000851 self.db.set_one(
garciadeblas96b94f52024-07-08 16:18:21 +0200852 "clusters", {"_id": db_cluster["_id"]}, update_dict
rshri932105f2024-07-05 15:11:55 +0000853 )
garciadeblas96b94f52024-07-08 16:18:21 +0200854 self.db.del_one("clusters", {"_id": db_cluster["_id"]})
rshri932105f2024-07-05 15:11:55 +0000855
rshri948f7de2024-12-02 03:42:35 +0000856 async def attach_profile(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000857 self.logger.info("profile attach Enter")
rshri948f7de2024-12-02 03:42:35 +0000858
garciadeblas995cbf32024-12-18 12:54:00 +0100859 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000860 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000861 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000862
863 # To initialize the operation states
864 self.initialize_operation(cluster_id, op_id)
865
garciadeblas995cbf32024-12-18 12:54:00 +0100866 # To get the cluster
867 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
868
869 # To get the operation params details
870 op_params = self.get_operation_params(db_cluster, op_id)
871
872 # To copy the cluster content and decrypting fields to use in workflows
873 workflow_content = {
874 "cluster": self.decrypted_copy(db_cluster),
875 }
rshri948f7de2024-12-02 03:42:35 +0000876
877 # To get the profile details
878 profile_id = params["profile_id"]
879 profile_type = params["profile_type"]
880 profile_collection = self.profile_collection_mapping[profile_type]
881 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
882 db_profile["profile_type"] = profile_type
883 # content["profile"] = db_profile
garciadeblas995cbf32024-12-18 12:54:00 +0100884 workflow_content["profile"] = db_profile
rshri932105f2024-07-05 15:11:55 +0000885
garciadeblas61a4c692025-07-17 13:04:13 +0200886 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100887 "attach_profile_to_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200888 )
garciadeblas41859ce2025-02-04 16:08:51 +0100889 if not workflow_res:
890 self.logger.error(f"Failed to launch workflow: {workflow_name}")
891 db_cluster["resourceState"] = "ERROR"
892 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
893 db_cluster = self.update_operation_history(
894 db_cluster, op_id, workflow_status=False, resource_status=None
895 )
896 return
rshri932105f2024-07-05 15:11:55 +0000897
garciadeblas26d733c2025-02-03 16:12:43 +0100898 self.logger.info("workflow_name is: {}".format(workflow_name))
garciadeblas96b94f52024-07-08 16:18:21 +0200899 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +0100900 op_id, workflow_name
garciadeblas96b94f52024-07-08 16:18:21 +0200901 )
rshri932105f2024-07-05 15:11:55 +0000902 self.logger.info(
garciadeblas26d733c2025-02-03 16:12:43 +0100903 "workflow_status is: {} and workflow_msg is: {}".format(
rshri932105f2024-07-05 15:11:55 +0000904 workflow_status, workflow_msg
905 )
906 )
907 if workflow_status:
908 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
909 else:
910 db_cluster["resourceState"] = "ERROR"
911 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000912 db_cluster = self.update_operation_history(
913 db_cluster, op_id, workflow_status, None
914 )
rshri932105f2024-07-05 15:11:55 +0000915 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
916
917 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +0100918 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +0100919 "attach_profile_to_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +0000920 )
921 self.logger.info(
922 "resource_status is :{} and resource_msg is :{}".format(
923 resource_status, resource_msg
924 )
925 )
926 if resource_status:
927 db_cluster["resourceState"] = "READY"
928 else:
929 db_cluster["resourceState"] = "ERROR"
930
931 db_cluster["operatingState"] = "IDLE"
932 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000933 db_cluster, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +0000934 )
rshri932105f2024-07-05 15:11:55 +0000935 profile_list = db_cluster[profile_type]
rshri932105f2024-07-05 15:11:55 +0000936 if resource_status:
rshri932105f2024-07-05 15:11:55 +0000937 profile_list.append(profile_id)
rshri932105f2024-07-05 15:11:55 +0000938 db_cluster[profile_type] = profile_list
shahithya70a3fc92024-11-12 11:01:05 +0000939 db_cluster["current_operation"] = None
rshri932105f2024-07-05 15:11:55 +0000940 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
941
942 return
943
rshri948f7de2024-12-02 03:42:35 +0000944 async def detach_profile(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000945 self.logger.info("profile dettach Enter")
rshri948f7de2024-12-02 03:42:35 +0000946
garciadeblas995cbf32024-12-18 12:54:00 +0100947 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000948 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000949 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000950
951 # To initialize the operation states
952 self.initialize_operation(cluster_id, op_id)
953
garciadeblas995cbf32024-12-18 12:54:00 +0100954 # To get the cluster
955 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
956
957 # To get the operation params details
958 op_params = self.get_operation_params(db_cluster, op_id)
959
960 # To copy the cluster content and decrypting fields to use in workflows
961 workflow_content = {
962 "cluster": self.decrypted_copy(db_cluster),
963 }
rshri948f7de2024-12-02 03:42:35 +0000964
965 # To get the profile details
966 profile_id = params["profile_id"]
967 profile_type = params["profile_type"]
968 profile_collection = self.profile_collection_mapping[profile_type]
969 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
970 db_profile["profile_type"] = profile_type
garciadeblas995cbf32024-12-18 12:54:00 +0100971 workflow_content["profile"] = db_profile
rshri932105f2024-07-05 15:11:55 +0000972
garciadeblas61a4c692025-07-17 13:04:13 +0200973 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100974 "detach_profile_from_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200975 )
garciadeblas41859ce2025-02-04 16:08:51 +0100976 if not workflow_res:
977 self.logger.error(f"Failed to launch workflow: {workflow_name}")
978 db_cluster["resourceState"] = "ERROR"
979 db_cluster = self.update_operation_history(
980 db_cluster, op_id, workflow_status=False, resource_status=None
981 )
982 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
983 return
rshri932105f2024-07-05 15:11:55 +0000984
garciadeblas26d733c2025-02-03 16:12:43 +0100985 self.logger.info("workflow_name is: {}".format(workflow_name))
garciadeblas96b94f52024-07-08 16:18:21 +0200986 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +0100987 op_id, workflow_name
garciadeblas96b94f52024-07-08 16:18:21 +0200988 )
rshri932105f2024-07-05 15:11:55 +0000989 self.logger.info(
garciadeblas26d733c2025-02-03 16:12:43 +0100990 "workflow_status is: {} and workflow_msg is: {}".format(
rshri932105f2024-07-05 15:11:55 +0000991 workflow_status, workflow_msg
992 )
993 )
994 if workflow_status:
995 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
996 else:
997 db_cluster["resourceState"] = "ERROR"
998 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000999 db_cluster = self.update_operation_history(
1000 db_cluster, op_id, workflow_status, None
1001 )
rshri932105f2024-07-05 15:11:55 +00001002 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1003
1004 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001005 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +01001006 "detach_profile_from_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +00001007 )
1008 self.logger.info(
1009 "resource_status is :{} and resource_msg is :{}".format(
1010 resource_status, resource_msg
1011 )
1012 )
1013 if resource_status:
1014 db_cluster["resourceState"] = "READY"
1015 else:
1016 db_cluster["resourceState"] = "ERROR"
1017
1018 db_cluster["operatingState"] = "IDLE"
1019 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +00001020 db_cluster, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +00001021 )
rshri932105f2024-07-05 15:11:55 +00001022 profile_list = db_cluster[profile_type]
1023 self.logger.info("profile list is : {}".format(profile_list))
1024 if resource_status:
rshri932105f2024-07-05 15:11:55 +00001025 profile_list.remove(profile_id)
rshri932105f2024-07-05 15:11:55 +00001026 db_cluster[profile_type] = profile_list
shahithya70a3fc92024-11-12 11:01:05 +00001027 db_cluster["current_operation"] = None
rshri932105f2024-07-05 15:11:55 +00001028 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1029
1030 return
1031
rshri948f7de2024-12-02 03:42:35 +00001032 async def register(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001033 self.logger.info("cluster register enter")
garciadeblas8bdb3d42025-04-04 00:19:13 +02001034 workflow_status = None
1035 resource_status = None
rshri932105f2024-07-05 15:11:55 +00001036
garciadeblas995cbf32024-12-18 12:54:00 +01001037 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +00001038 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +00001039 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +00001040
1041 # To initialize the operation states
1042 self.initialize_operation(cluster_id, op_id)
1043
garciadeblas995cbf32024-12-18 12:54:00 +01001044 # To get the cluster
1045 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
1046
1047 # To get the operation params details
1048 op_params = self.get_operation_params(db_cluster, op_id)
1049
1050 # To copy the cluster content and decrypting fields to use in workflows
garciadeblas8bdb3d42025-04-04 00:19:13 +02001051 db_cluster_copy = self.decrypted_copy(db_cluster)
garciadeblas995cbf32024-12-18 12:54:00 +01001052 workflow_content = {
garciadeblas8bdb3d42025-04-04 00:19:13 +02001053 "cluster": db_cluster_copy,
garciadeblas995cbf32024-12-18 12:54:00 +01001054 }
rshric3564942024-11-12 18:12:38 +00001055
garciadeblas61a4c692025-07-17 13:04:13 +02001056 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001057 "register_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +02001058 )
garciadeblas41859ce2025-02-04 16:08:51 +01001059 if not workflow_res:
1060 self.logger.error(f"Failed to launch workflow: {workflow_name}")
1061 db_cluster["state"] = "FAILED_CREATION"
1062 db_cluster["resourceState"] = "ERROR"
1063 db_cluster = self.update_operation_history(
1064 db_cluster, op_id, workflow_status=False, resource_status=None
1065 )
1066 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1067 # Clean items used in the workflow, no matter if the workflow succeeded
1068 clean_status, clean_msg = await self.odu.clean_items_workflow(
1069 "register_cluster", op_id, op_params, workflow_content
1070 )
1071 self.logger.info(
1072 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1073 )
1074 return
rshri932105f2024-07-05 15:11:55 +00001075
garciadeblas26d733c2025-02-03 16:12:43 +01001076 self.logger.info("workflow_name is: {}".format(workflow_name))
garciadeblas96b94f52024-07-08 16:18:21 +02001077 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +01001078 op_id, workflow_name
garciadeblas96b94f52024-07-08 16:18:21 +02001079 )
rshri932105f2024-07-05 15:11:55 +00001080 self.logger.info(
garciadeblas26d733c2025-02-03 16:12:43 +01001081 "workflow_status is: {} and workflow_msg is: {}".format(
rshri932105f2024-07-05 15:11:55 +00001082 workflow_status, workflow_msg
1083 )
1084 )
1085 if workflow_status:
garciadeblas96b94f52024-07-08 16:18:21 +02001086 db_cluster["state"] = "CREATED"
1087 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
rshri932105f2024-07-05 15:11:55 +00001088 else:
garciadeblas96b94f52024-07-08 16:18:21 +02001089 db_cluster["state"] = "FAILED_CREATION"
1090 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +00001091 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +00001092 db_cluster = self.update_operation_history(
1093 db_cluster, op_id, workflow_status, None
1094 )
garciadeblas96b94f52024-07-08 16:18:21 +02001095 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +00001096
garciadeblasdde3a312024-09-17 13:25:06 +02001097 # Clean items used in the workflow, no matter if the workflow succeeded
1098 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001099 "register_cluster", op_id, op_params, workflow_content
garciadeblasdde3a312024-09-17 13:25:06 +02001100 )
1101 self.logger.info(
1102 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1103 )
1104
rshri932105f2024-07-05 15:11:55 +00001105 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001106 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +01001107 "register_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +00001108 )
1109 self.logger.info(
1110 "resource_status is :{} and resource_msg is :{}".format(
1111 resource_status, resource_msg
1112 )
1113 )
1114 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +02001115 db_cluster["resourceState"] = "READY"
rshri932105f2024-07-05 15:11:55 +00001116 else:
garciadeblas96b94f52024-07-08 16:18:21 +02001117 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +00001118
garciadeblas96b94f52024-07-08 16:18:21 +02001119 db_cluster["operatingState"] = "IDLE"
1120 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +00001121 db_cluster, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +00001122 )
shahithya70a3fc92024-11-12 11:01:05 +00001123 db_cluster["current_operation"] = None
garciadeblas96b94f52024-07-08 16:18:21 +02001124 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri948f7de2024-12-02 03:42:35 +00001125
garciadeblas8bdb3d42025-04-04 00:19:13 +02001126 # Update default profile agekeys and state
1127 self.update_default_profile_agekeys(db_cluster_copy)
1128 self.update_profile_state(db_cluster, workflow_status, resource_status)
1129
rshri948f7de2024-12-02 03:42:35 +00001130 db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]})
1131 db_register["credentials"] = db_cluster["credentials"]
1132 self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
1133
1134 if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED":
1135 # To call the lcm.py for registering the cluster in k8scluster lcm.
1136 register = await self.regist.create(db_register, order_id)
1137 self.logger.debug(f"Register is : {register}")
1138 else:
1139 db_register["_admin"]["operationalState"] = "ERROR"
1140 self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
1141
rshri932105f2024-07-05 15:11:55 +00001142 return
1143
garciadeblasad6d1ba2025-01-22 16:02:18 +01001144 async def check_register_cluster(self, op_id, op_params, content):
1145 self.logger.info(
1146 f"check_register_cluster Operation {op_id}. Params: {op_params}."
1147 )
1148 # self.logger.debug(f"Content: {content}")
1149 db_cluster = content["cluster"]
1150 cluster_name = db_cluster["git_name"].lower()
1151 cluster_kustomization_name = cluster_name
1152 bootstrap = op_params.get("bootstrap", True)
1153 checkings_list = [
1154 {
1155 "item": "kustomization",
1156 "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
1157 "namespace": "managed-resources",
garciadeblas7cf480d2025-01-27 16:53:45 +01001158 "condition": {
1159 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
1160 "value": "True",
1161 },
garciadeblasad6d1ba2025-01-22 16:02:18 +01001162 "timeout": self._checkloop_kustomization_timeout,
1163 "enable": bootstrap,
1164 "resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
1165 },
1166 ]
1167 return await self.common_check_list(
1168 op_id, checkings_list, "clusters", db_cluster
1169 )
1170
garciadeblasf6dc6042026-02-04 17:40:30 +01001171 async def check_deregister_cluster(self, op_id, op_params, content):
1172 self.logger.info(
1173 f"check_deregister_cluster Operation {op_id}. Params: {op_params}."
1174 )
1175 # self.logger.debug(f"Content: {content}")
1176 db_cluster = content["cluster"]
1177 cluster_name = db_cluster["git_name"].lower()
1178 cluster_kustomization_name = cluster_name
1179 checkings_list = [
1180 {
1181 "item": "kustomization",
1182 "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
1183 "namespace": "managed-resources",
1184 "deleted": True,
1185 "timeout": self._checkloop_kustomization_timeout,
1186 "enable": True,
1187 "resourceState": "IN_PROGRESS.CLUSTER_DISCONNECTED",
1188 },
1189 ]
1190 return await self.common_check_list(
1191 op_id, checkings_list, "clusters", db_cluster
1192 )
1193
1194 async def check_purge_cluster(self, op_id, op_params, content):
1195 self.logger.info(f"check_purge_cluster Operation {op_id}. Params: {op_params}.")
1196 # self.logger.debug(f"Content: {content}")
1197 db_cluster = content["cluster"]
1198 cluster_name = db_cluster["git_name"].lower()
1199 cluster_kustomization_name = cluster_name
1200 checkings_list = [
1201 {
1202 "item": "kustomization",
1203 "name": cluster_kustomization_name,
1204 "namespace": "managed-resources",
1205 "deleted": True,
1206 "timeout": self._checkloop_kustomization_timeout,
1207 "enable": True,
1208 "resourceState": "IN_PROGRESS.CLUSTER_DEREGISTERED",
1209 },
1210 ]
1211 return await self.common_check_list(
1212 op_id, checkings_list, "clusters", db_cluster
1213 )
1214
rshri948f7de2024-12-02 03:42:35 +00001215 async def deregister(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001216 self.logger.info("cluster deregister enter")
1217
garciadeblas995cbf32024-12-18 12:54:00 +01001218 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +00001219 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +00001220 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +00001221
1222 # To initialize the operation states
1223 self.initialize_operation(cluster_id, op_id)
1224
garciadeblas995cbf32024-12-18 12:54:00 +01001225 # To get the cluster
1226 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
1227
1228 # To get the operation params details
1229 op_params = self.get_operation_params(db_cluster, op_id)
1230
1231 # To copy the cluster content and decrypting fields to use in workflows
1232 workflow_content = {
1233 "cluster": self.decrypted_copy(db_cluster),
1234 }
rshri932105f2024-07-05 15:11:55 +00001235
garciadeblas61a4c692025-07-17 13:04:13 +02001236 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001237 "deregister_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +02001238 )
garciadeblas41859ce2025-02-04 16:08:51 +01001239 if not workflow_res:
1240 self.logger.error(f"Failed to launch workflow: {workflow_name}")
1241 db_cluster["state"] = "FAILED_DELETION"
1242 db_cluster["resourceState"] = "ERROR"
1243 db_cluster = self.update_operation_history(
1244 db_cluster, op_id, workflow_status=False, resource_status=None
1245 )
1246 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
garciadeblas41859ce2025-02-04 16:08:51 +01001247 return
rshri932105f2024-07-05 15:11:55 +00001248
garciadeblas26d733c2025-02-03 16:12:43 +01001249 self.logger.info("workflow_name is: {}".format(workflow_name))
garciadeblas96b94f52024-07-08 16:18:21 +02001250 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +01001251 op_id, workflow_name
garciadeblas96b94f52024-07-08 16:18:21 +02001252 )
rshri932105f2024-07-05 15:11:55 +00001253 self.logger.info(
garciadeblas26d733c2025-02-03 16:12:43 +01001254 "workflow_status is: {} and workflow_msg is: {}".format(
rshri932105f2024-07-05 15:11:55 +00001255 workflow_status, workflow_msg
1256 )
1257 )
1258 if workflow_status:
garciadeblas96b94f52024-07-08 16:18:21 +02001259 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
rshri932105f2024-07-05 15:11:55 +00001260 else:
garciadeblas96b94f52024-07-08 16:18:21 +02001261 db_cluster["state"] = "FAILED_DELETION"
1262 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +00001263 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +00001264 db_cluster = self.update_operation_history(
1265 db_cluster, op_id, workflow_status, None
1266 )
garciadeblas96b94f52024-07-08 16:18:21 +02001267 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +00001268
1269 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001270 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +01001271 "deregister_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +00001272 )
1273 self.logger.info(
1274 "resource_status is :{} and resource_msg is :{}".format(
1275 resource_status, resource_msg
1276 )
1277 )
1278 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +02001279 db_cluster["resourceState"] = "READY"
rshri932105f2024-07-05 15:11:55 +00001280 else:
garciadeblas96b94f52024-07-08 16:18:21 +02001281 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +00001282
garciadeblas96b94f52024-07-08 16:18:21 +02001283 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +00001284 db_cluster, op_id, workflow_status, resource_status
garciadeblas96b94f52024-07-08 16:18:21 +02001285 )
1286 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +00001287
garciadeblasf6dc6042026-02-04 17:40:30 +01001288 # Clean items used in the workflow, no matter if the workflow succeeded
garciadeblas93380452025-02-05 09:32:52 +01001289 clean_status, clean_msg = await self.odu.clean_items_workflow(
1290 "deregister_cluster", op_id, op_params, workflow_content
1291 )
1292 self.logger.info(
1293 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1294 )
garciadeblasf6dc6042026-02-04 17:40:30 +01001295 if not workflow_status or not resource_status:
1296 return
1297
1298 # Now, we launch the workflow to purge the cluster
1299 # Initialize the operation again
1300 self.initialize_operation(cluster_id, op_id)
1301 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
1302 "purge_cluster", op_id, op_params, workflow_content
1303 )
1304 if not workflow_res:
1305 self.logger.error(f"Failed to launch workflow: {workflow_name}")
1306 db_cluster["state"] = "FAILED_DELETION"
1307 db_cluster["resourceState"] = "ERROR"
1308 db_cluster = self.update_operation_history(
1309 db_cluster, op_id, workflow_status=False, resource_status=None
1310 )
1311 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1312 # Clean items used in the workflow, no matter if the workflow succeeded
1313 clean_status, clean_msg = await self.odu.clean_items_workflow(
1314 "purge_cluster", op_id, op_params, workflow_content
1315 )
1316 self.logger.info(
1317 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1318 )
1319 return
1320
1321 self.logger.info("workflow_name is: {}".format(workflow_name))
1322 workflow_status, workflow_msg = await self.odu.check_workflow_status(
1323 op_id, workflow_name
1324 )
1325 self.logger.info(
1326 "workflow_status is: {} and workflow_msg is: {}".format(
1327 workflow_status, workflow_msg
1328 )
1329 )
1330 if workflow_status:
1331 db_cluster["state"] = "DELETED"
1332 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
1333 else:
1334 db_cluster["state"] = "FAILED_DELETION"
1335 db_cluster["resourceState"] = "ERROR"
1336 # has to call update_operation_history return content
1337 db_cluster = self.update_operation_history(
1338 db_cluster, op_id, workflow_status, None
1339 )
1340 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1341
1342 # Clean items used in the workflow or in the cluster, no matter if the workflow succeeded
1343 clean_status, clean_msg = await self.odu.clean_items_workflow(
1344 "purge_cluster", op_id, op_params, workflow_content
1345 )
1346 self.logger.info(
1347 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1348 )
1349
1350 if workflow_status:
1351 resource_status, resource_msg = await self.check_resource_status(
1352 "purge_cluster", op_id, op_params, workflow_content
1353 )
1354 self.logger.info(
1355 "resource_status is :{} and resource_msg is :{}".format(
1356 resource_status, resource_msg
1357 )
1358 )
1359 if resource_status:
1360 db_cluster["resourceState"] = "READY"
1361 else:
1362 db_cluster["resourceState"] = "ERROR"
1363
1364 db_cluster["operatingState"] = "IDLE"
1365 db_cluster = self.update_operation_history(
1366 db_cluster, op_id, workflow_status, resource_status
1367 )
1368 db_cluster["current_operation"] = None
1369 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1370
1371 force = params.get("force", False)
1372 if force:
1373 force_delete_status = self.check_force_delete_and_delete_from_db(
1374 cluster_id, workflow_status, resource_status, force
1375 )
1376 if force_delete_status:
1377 return
1378
1379 # To delete it from DB
1380 if db_cluster["state"] == "DELETED":
1381 self.delete_cluster(db_cluster)
1382
1383 # To delete it from k8scluster collection
1384 self.db.del_one("k8sclusters", {"name": db_cluster["name"]})
1385
garciadeblas93380452025-02-05 09:32:52 +01001386 return
rshri932105f2024-07-05 15:11:55 +00001387
rshri948f7de2024-12-02 03:42:35 +00001388 async def get_creds(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001389 self.logger.info("Cluster get creds Enter")
rshri948f7de2024-12-02 03:42:35 +00001390 cluster_id = params["cluster_id"]
1391 op_id = params["operation_id"]
1392 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
garciadeblas96b94f52024-07-08 16:18:21 +02001393 result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
1394 if result:
1395 db_cluster["credentials"] = cluster_creds
yshahd940c652024-10-17 06:11:12 +00001396 op_len = 0
1397 for operations in db_cluster["operationHistory"]:
1398 if operations["op_id"] == op_id:
1399 db_cluster["operationHistory"][op_len]["result"] = result
1400 db_cluster["operationHistory"][op_len]["endDate"] = time()
1401 op_len += 1
shahithya70a3fc92024-11-12 11:01:05 +00001402 db_cluster["current_operation"] = None
yshahd940c652024-10-17 06:11:12 +00001403 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri948f7de2024-12-02 03:42:35 +00001404 self.logger.info("Cluster Get Creds Exit")
yshah771dea82024-07-05 15:11:49 +00001405 return
1406
rshri948f7de2024-12-02 03:42:35 +00001407 async def update(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001408 self.logger.info("Cluster update Enter")
rshri948f7de2024-12-02 03:42:35 +00001409 # To get the cluster details
1410 cluster_id = params["cluster_id"]
1411 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
1412
1413 # To get the operation params details
1414 op_id = params["operation_id"]
1415 op_params = self.get_operation_params(db_cluster, op_id)
yshah771dea82024-07-05 15:11:49 +00001416
garciadeblas995cbf32024-12-18 12:54:00 +01001417 # To copy the cluster content and decrypting fields to use in workflows
1418 workflow_content = {
1419 "cluster": self.decrypted_copy(db_cluster),
1420 }
rshric3564942024-11-12 18:12:38 +00001421
1422 # vim account details
1423 db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
garciadeblas995cbf32024-12-18 12:54:00 +01001424 workflow_content["vim_account"] = db_vim
rshric3564942024-11-12 18:12:38 +00001425
garciadeblas61a4c692025-07-17 13:04:13 +02001426 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001427 "update_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +02001428 )
garciadeblas41859ce2025-02-04 16:08:51 +01001429 if not workflow_res:
1430 self.logger.error(f"Failed to launch workflow: {workflow_name}")
1431 db_cluster["resourceState"] = "ERROR"
1432 db_cluster = self.update_operation_history(
1433 db_cluster, op_id, workflow_status=False, resource_status=None
1434 )
1435 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1436 # Clean items used in the workflow, no matter if the workflow succeeded
1437 clean_status, clean_msg = await self.odu.clean_items_workflow(
1438 "update_cluster", op_id, op_params, workflow_content
1439 )
1440 self.logger.info(
1441 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1442 )
1443 return
garciadeblas26d733c2025-02-03 16:12:43 +01001444 self.logger.info("workflow_name is: {}".format(workflow_name))
garciadeblas96b94f52024-07-08 16:18:21 +02001445 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +01001446 op_id, workflow_name
garciadeblas96b94f52024-07-08 16:18:21 +02001447 )
1448 self.logger.info(
1449 "Workflow Status: {} Workflow Message: {}".format(
1450 workflow_status, workflow_msg
yshah771dea82024-07-05 15:11:49 +00001451 )
garciadeblas96b94f52024-07-08 16:18:21 +02001452 )
1453
1454 if workflow_status:
1455 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
1456 else:
1457 db_cluster["resourceState"] = "ERROR"
1458
yshahcb9075f2024-11-22 12:08:57 +00001459 db_cluster = self.update_operation_history(
1460 db_cluster, op_id, workflow_status, None
1461 )
garciadeblas96b94f52024-07-08 16:18:21 +02001462 # self.logger.info("Db content: {}".format(db_content))
1463 # self.db.set_one(self.db_collection, {"_id": _id}, db_cluster)
1464 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1465
garciadeblas28bff0f2024-09-16 12:53:07 +02001466 # Clean items used in the workflow, no matter if the workflow succeeded
1467 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001468 "update_cluster", op_id, op_params, workflow_content
garciadeblas28bff0f2024-09-16 12:53:07 +02001469 )
1470 self.logger.info(
1471 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1472 )
garciadeblas96b94f52024-07-08 16:18:21 +02001473 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001474 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +01001475 "update_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +02001476 )
1477 self.logger.info(
1478 "Resource Status: {} Resource Message: {}".format(
1479 resource_status, resource_msg
1480 )
1481 )
yshah771dea82024-07-05 15:11:49 +00001482
1483 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +02001484 db_cluster["resourceState"] = "READY"
yshah771dea82024-07-05 15:11:49 +00001485 else:
garciadeblas96b94f52024-07-08 16:18:21 +02001486 db_cluster["resourceState"] = "ERROR"
yshah771dea82024-07-05 15:11:49 +00001487
yshah0defcd52024-11-18 07:41:35 +00001488 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +00001489 db_cluster, op_id, workflow_status, resource_status
yshah0defcd52024-11-18 07:41:35 +00001490 )
1491
garciadeblas96b94f52024-07-08 16:18:21 +02001492 db_cluster["operatingState"] = "IDLE"
garciadeblas96b94f52024-07-08 16:18:21 +02001493 # self.logger.info("db_cluster: {}".format(db_cluster))
garciadeblas7cf480d2025-01-27 16:53:45 +01001494 # TODO: verify condition
garciadeblas96b94f52024-07-08 16:18:21 +02001495 # For the moment, if the workflow completed successfully, then we update the db accordingly.
1496 if workflow_status:
1497 if "k8s_version" in op_params:
1498 db_cluster["k8s_version"] = op_params["k8s_version"]
yshah0defcd52024-11-18 07:41:35 +00001499 if "node_count" in op_params:
garciadeblas96b94f52024-07-08 16:18:21 +02001500 db_cluster["node_count"] = op_params["node_count"]
yshah0defcd52024-11-18 07:41:35 +00001501 if "node_size" in op_params:
1502 db_cluster["node_count"] = op_params["node_size"]
garciadeblas96b94f52024-07-08 16:18:21 +02001503 # self.db.set_one(self.db_collection, {"_id": _id}, db_content)
shahithya70a3fc92024-11-12 11:01:05 +00001504 db_cluster["current_operation"] = None
garciadeblas96b94f52024-07-08 16:18:21 +02001505 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
yshah771dea82024-07-05 15:11:49 +00001506 return
1507
garciadeblasad6d1ba2025-01-22 16:02:18 +01001508 async def check_update_cluster(self, op_id, op_params, content):
1509 self.logger.info(
1510 f"check_update_cluster Operation {op_id}. Params: {op_params}."
1511 )
1512 self.logger.debug(f"Content: {content}")
garciadeblasd7d8bde2025-01-27 18:31:06 +01001513 # return await self.check_dummy_operation(op_id, op_params, content)
1514 db_cluster = content["cluster"]
1515 cluster_name = db_cluster["git_name"].lower()
1516 cluster_kustomization_name = cluster_name
1517 db_vim_account = content["vim_account"]
1518 cloud_type = db_vim_account["vim_type"]
1519 if cloud_type == "aws":
1520 cluster_name = f"{cluster_name}-cluster"
1521 if cloud_type in ("azure", "gcp", "aws"):
1522 checkings_list = [
1523 {
1524 "item": "kustomization",
1525 "name": cluster_kustomization_name,
1526 "namespace": "managed-resources",
1527 "condition": {
1528 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
1529 "value": "True",
1530 },
1531 "timeout": self._checkloop_kustomization_timeout,
1532 "enable": True,
1533 "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
1534 },
1535 ]
1536 else:
1537 return False, "Not suitable VIM account to check cluster status"
1538 # Scale operation
1539 if "node_count" in op_params:
garciadeblas1ca09852025-05-30 11:19:06 +02001540 if cloud_type in ("azure", "gcp"):
1541 checkings_list.append(
1542 {
1543 "item": f"cluster_{cloud_type}",
1544 "name": cluster_name,
1545 "namespace": "",
1546 "condition": {
1547 "jsonpath_filter": "status.atProvider.defaultNodePool[0].nodeCount",
1548 "value": f"{op_params['node_count']}",
1549 },
1550 "timeout": self._checkloop_resource_timeout * 3,
1551 "enable": True,
1552 "resourceState": "IN_PROGRESS.RESOURCE_READY.NODE_COUNT.CLUSTER",
1553 }
1554 )
1555 elif cloud_type == "aws":
1556 checkings_list.append(
1557 {
1558 "item": f"nodegroup_{cloud_type}",
1559 "name": f"{cluster_name}-nodegroup",
1560 "namespace": "",
1561 "condition": {
1562 "jsonpath_filter": "status.atProvider.scalingConfig[0].desiredSize",
1563 "value": f"{op_params['node_count']}",
1564 },
1565 "timeout": self._checkloop_resource_timeout * 3,
1566 "enable": True,
1567 "resourceState": "IN_PROGRESS.RESOURCE_READY.NODE_COUNT.CLUSTER",
1568 }
1569 )
1570
garciadeblasd7d8bde2025-01-27 18:31:06 +01001571 # Upgrade operation
1572 if "k8s_version" in op_params:
1573 checkings_list.append(
1574 {
1575 "item": f"cluster_{cloud_type}",
1576 "name": cluster_name,
1577 "namespace": "",
1578 "condition": {
1579 "jsonpath_filter": "status.atProvider.defaultNodePool[0].orchestratorVersion",
1580 "value": op_params["k8s_version"],
1581 },
1582 "timeout": self._checkloop_resource_timeout * 2,
1583 "enable": True,
1584 "resourceState": "IN_PROGRESS.RESOURCE_READY.K8S_VERSION.CLUSTER",
1585 }
1586 )
1587 return await self.common_check_list(
1588 op_id, checkings_list, "clusters", db_cluster
1589 )
garciadeblasad6d1ba2025-01-22 16:02:18 +01001590
yshah771dea82024-07-05 15:11:49 +00001591
garciadeblas72412282024-11-07 12:41:54 +01001592class CloudCredentialsLcm(GitOpsLcm):
yshah771dea82024-07-05 15:11:49 +00001593 db_collection = "vim_accounts"
1594
1595 def __init__(self, msg, lcm_tasks, config):
1596 """
1597 Init, Connect to database, filesystem storage, and messaging
1598 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1599 :return: None
1600 """
garciadeblas72412282024-11-07 12:41:54 +01001601 super().__init__(msg, lcm_tasks, config)
yshah771dea82024-07-05 15:11:49 +00001602
yshah564ec9c2024-11-29 07:33:32 +00001603 async def add(self, params, order_id):
yshah771dea82024-07-05 15:11:49 +00001604 self.logger.info("Cloud Credentials create")
yshah564ec9c2024-11-29 07:33:32 +00001605 vim_id = params["_id"]
1606 op_id = vim_id
1607 op_params = params
1608 db_content = self.db.get_one(self.db_collection, {"_id": vim_id})
1609 vim_config = db_content.get("config", {})
1610 self.db.encrypt_decrypt_fields(
1611 vim_config.get("credentials"),
1612 "decrypt",
1613 ["password", "secret"],
1614 schema_version=db_content["schema_version"],
1615 salt=vim_id,
1616 )
1617
garciadeblas61a4c692025-07-17 13:04:13 +02001618 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001619 "create_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001620 )
1621
1622 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +01001623 op_id, workflow_name
yshah771dea82024-07-05 15:11:49 +00001624 )
1625
1626 self.logger.info(
1627 "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
1628 )
1629
garciadeblas28bff0f2024-09-16 12:53:07 +02001630 # Clean items used in the workflow, no matter if the workflow succeeded
1631 clean_status, clean_msg = await self.odu.clean_items_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001632 "create_cloud_credentials", op_id, op_params, db_content
garciadeblas28bff0f2024-09-16 12:53:07 +02001633 )
1634 self.logger.info(
1635 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1636 )
1637
yshah771dea82024-07-05 15:11:49 +00001638 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001639 resource_status, resource_msg = await self.check_resource_status(
yshah564ec9c2024-11-29 07:33:32 +00001640 "create_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001641 )
1642 self.logger.info(
1643 "Resource Status: {} Resource Message: {}".format(
1644 resource_status, resource_msg
1645 )
1646 )
garciadeblas15b8a302024-09-23 12:40:13 +02001647
yshah564ec9c2024-11-29 07:33:32 +00001648 db_content["_admin"]["operationalState"] = "ENABLED"
1649 for operation in db_content["_admin"]["operations"]:
garciadeblas15b8a302024-09-23 12:40:13 +02001650 if operation["lcmOperationType"] == "create":
1651 operation["operationState"] = "ENABLED"
yshah564ec9c2024-11-29 07:33:32 +00001652 self.logger.info("Content : {}".format(db_content))
1653 self.db.set_one("vim_accounts", {"_id": db_content["_id"]}, db_content)
yshah771dea82024-07-05 15:11:49 +00001654 return
1655
yshah564ec9c2024-11-29 07:33:32 +00001656 async def edit(self, params, order_id):
1657 self.logger.info("Cloud Credentials Update")
1658 vim_id = params["_id"]
1659 op_id = vim_id
1660 op_params = params
1661 db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
1662 vim_config = db_content.get("config", {})
1663 self.db.encrypt_decrypt_fields(
1664 vim_config.get("credentials"),
1665 "decrypt",
1666 ["password", "secret"],
1667 schema_version=db_content["schema_version"],
1668 salt=vim_id,
1669 )
1670
garciadeblas61a4c692025-07-17 13:04:13 +02001671 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001672 "update_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001673 )
1674 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +01001675 op_id, workflow_name
yshah771dea82024-07-05 15:11:49 +00001676 )
1677 self.logger.info(
1678 "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
1679 )
1680
garciadeblas28bff0f2024-09-16 12:53:07 +02001681 # Clean items used in the workflow, no matter if the workflow succeeded
1682 clean_status, clean_msg = await self.odu.clean_items_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001683 "update_cloud_credentials", op_id, op_params, db_content
garciadeblas28bff0f2024-09-16 12:53:07 +02001684 )
1685 self.logger.info(
1686 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1687 )
1688
yshah771dea82024-07-05 15:11:49 +00001689 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001690 resource_status, resource_msg = await self.check_resource_status(
yshah564ec9c2024-11-29 07:33:32 +00001691 "update_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001692 )
1693 self.logger.info(
1694 "Resource Status: {} Resource Message: {}".format(
1695 resource_status, resource_msg
1696 )
1697 )
1698 return
1699
yshah564ec9c2024-11-29 07:33:32 +00001700 async def remove(self, params, order_id):
1701 self.logger.info("Cloud Credentials remove")
1702 vim_id = params["_id"]
1703 op_id = vim_id
1704 op_params = params
1705 db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
1706
garciadeblas61a4c692025-07-17 13:04:13 +02001707 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001708 "delete_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001709 )
1710 workflow_status, workflow_msg = await self.odu.check_workflow_status(
garciadeblasc89134b2025-02-05 16:36:17 +01001711 op_id, workflow_name
yshah771dea82024-07-05 15:11:49 +00001712 )
1713 self.logger.info(
1714 "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
1715 )
1716
1717 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001718 resource_status, resource_msg = await self.check_resource_status(
yshah564ec9c2024-11-29 07:33:32 +00001719 "delete_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001720 )
1721 self.logger.info(
1722 "Resource Status: {} Resource Message: {}".format(
1723 resource_status, resource_msg
1724 )
1725 )
yshah564ec9c2024-11-29 07:33:32 +00001726 self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
yshah771dea82024-07-05 15:11:49 +00001727 return
1728
rshri932105f2024-07-05 15:11:55 +00001729
garciadeblas72412282024-11-07 12:41:54 +01001730class K8sAppLcm(GitOpsLcm):
rshri948f7de2024-12-02 03:42:35 +00001731 db_collection = "k8sapp"
1732
rshri932105f2024-07-05 15:11:55 +00001733 def __init__(self, msg, lcm_tasks, config):
1734 """
1735 Init, Connect to database, filesystem storage, and messaging
1736 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1737 :return: None
1738 """
garciadeblas72412282024-11-07 12:41:54 +01001739 super().__init__(msg, lcm_tasks, config)
rshri932105f2024-07-05 15:11:55 +00001740
rshri948f7de2024-12-02 03:42:35 +00001741 async def create(self, params, order_id):
garciadeblas61a4c692025-07-17 13:04:13 +02001742 self.logger.info("App Profile Create Enter")
rshri932105f2024-07-05 15:11:55 +00001743
rshri948f7de2024-12-02 03:42:35 +00001744 op_id = params["operation_id"]
1745 profile_id = params["profile_id"]
1746
1747 # To initialize the operation states
1748 self.initialize_operation(profile_id, op_id)
1749
1750 content = self.db.get_one("k8sapp", {"_id": profile_id})
1751 content["profile_type"] = "applications"
1752 op_params = self.get_operation_params(content, op_id)
1753 self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
1754
garciadeblas61a4c692025-07-17 13:04:13 +02001755 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001756 "create_profile", op_id, op_params, content
1757 )
garciadeblas26d733c2025-02-03 16:12:43 +01001758 self.logger.info("workflow_name is: {}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00001759
garciadeblas33b36e72025-01-17 12:49:19 +01001760 workflow_status = await self.check_workflow_and_update_db(
1761 op_id, workflow_name, content
1762 )
rshri932105f2024-07-05 15:11:55 +00001763
1764 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01001765 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001766 "create_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001767 )
yshah564ec9c2024-11-29 07:33:32 +00001768 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
garciadeblas61a4c692025-07-17 13:04:13 +02001769 self.logger.info(
1770 f"App Profile Create Exit with resource status: {resource_status}"
1771 )
rshri932105f2024-07-05 15:11:55 +00001772 return
1773
rshri948f7de2024-12-02 03:42:35 +00001774 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001775 self.logger.info("App delete Enter")
rshri932105f2024-07-05 15:11:55 +00001776
rshri948f7de2024-12-02 03:42:35 +00001777 op_id = params["operation_id"]
1778 profile_id = params["profile_id"]
1779
1780 # To initialize the operation states
1781 self.initialize_operation(profile_id, op_id)
1782
1783 content = self.db.get_one("k8sapp", {"_id": profile_id})
1784 op_params = self.get_operation_params(content, op_id)
1785
garciadeblas61a4c692025-07-17 13:04:13 +02001786 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001787 "delete_profile", op_id, op_params, content
1788 )
garciadeblas26d733c2025-02-03 16:12:43 +01001789 self.logger.info("workflow_name is: {}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00001790
garciadeblas33b36e72025-01-17 12:49:19 +01001791 workflow_status = await self.check_workflow_and_update_db(
1792 op_id, workflow_name, content
1793 )
rshri932105f2024-07-05 15:11:55 +00001794
1795 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01001796 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001797 "delete_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001798 )
rshri932105f2024-07-05 15:11:55 +00001799
yshahb36649f2025-02-28 09:01:51 +00001800 force = params.get("force", False)
1801 if force:
1802 force_delete_status = self.check_force_delete_and_delete_from_db(
1803 profile_id, workflow_status, resource_status, force
1804 )
1805 if force_delete_status:
1806 return
1807
1808 self.logger.info(f"Resource status: {resource_status}")
yshah564ec9c2024-11-29 07:33:32 +00001809 if resource_status:
1810 content["state"] = "DELETED"
yshah6bad8892025-02-11 12:37:04 +00001811 profile_type = self.profile_type_mapping[content["profile_type"]]
1812 self.delete_profile_ksu(profile_id, profile_type)
yshah564ec9c2024-11-29 07:33:32 +00001813 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1814 self.db.del_one(self.db_collection, {"_id": content["_id"]})
garciadeblas61a4c692025-07-17 13:04:13 +02001815 self.logger.info(
1816 f"App Profile Delete Exit with resource status: {resource_status}"
1817 )
rshri932105f2024-07-05 15:11:55 +00001818 return
1819
1820
garciadeblas72412282024-11-07 12:41:54 +01001821class K8sResourceLcm(GitOpsLcm):
rshri948f7de2024-12-02 03:42:35 +00001822 db_collection = "k8sresource"
1823
rshri932105f2024-07-05 15:11:55 +00001824 def __init__(self, msg, lcm_tasks, config):
1825 """
1826 Init, Connect to database, filesystem storage, and messaging
1827 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1828 :return: None
1829 """
garciadeblas72412282024-11-07 12:41:54 +01001830 super().__init__(msg, lcm_tasks, config)
rshri932105f2024-07-05 15:11:55 +00001831
rshri948f7de2024-12-02 03:42:35 +00001832 async def create(self, params, order_id):
garciadeblas61a4c692025-07-17 13:04:13 +02001833 self.logger.info("Resource Profile Create Enter")
rshri932105f2024-07-05 15:11:55 +00001834
rshri948f7de2024-12-02 03:42:35 +00001835 op_id = params["operation_id"]
1836 profile_id = params["profile_id"]
1837
1838 # To initialize the operation states
1839 self.initialize_operation(profile_id, op_id)
1840
1841 content = self.db.get_one("k8sresource", {"_id": profile_id})
1842 content["profile_type"] = "managed-resources"
1843 op_params = self.get_operation_params(content, op_id)
1844 self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
1845
garciadeblas61a4c692025-07-17 13:04:13 +02001846 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001847 "create_profile", op_id, op_params, content
1848 )
garciadeblas26d733c2025-02-03 16:12:43 +01001849 self.logger.info("workflow_name is: {}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00001850
garciadeblas33b36e72025-01-17 12:49:19 +01001851 workflow_status = await self.check_workflow_and_update_db(
1852 op_id, workflow_name, content
1853 )
rshri932105f2024-07-05 15:11:55 +00001854
1855 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01001856 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001857 "create_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001858 )
yshah564ec9c2024-11-29 07:33:32 +00001859 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1860 self.logger.info(
1861 f"Resource Create Exit with resource status: {resource_status}"
rshri932105f2024-07-05 15:11:55 +00001862 )
rshri932105f2024-07-05 15:11:55 +00001863 return
1864
rshri948f7de2024-12-02 03:42:35 +00001865 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001866 self.logger.info("Resource delete Enter")
rshri948f7de2024-12-02 03:42:35 +00001867
1868 op_id = params["operation_id"]
1869 profile_id = params["profile_id"]
1870
1871 # To initialize the operation states
1872 self.initialize_operation(profile_id, op_id)
1873
1874 content = self.db.get_one("k8sresource", {"_id": profile_id})
1875 op_params = self.get_operation_params(content, op_id)
rshri932105f2024-07-05 15:11:55 +00001876
garciadeblas61a4c692025-07-17 13:04:13 +02001877 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001878 "delete_profile", op_id, op_params, content
1879 )
garciadeblas26d733c2025-02-03 16:12:43 +01001880 self.logger.info("workflow_name is: {}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00001881
garciadeblas33b36e72025-01-17 12:49:19 +01001882 workflow_status = await self.check_workflow_and_update_db(
1883 op_id, workflow_name, content
1884 )
rshri932105f2024-07-05 15:11:55 +00001885
1886 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01001887 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001888 "delete_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001889 )
rshri932105f2024-07-05 15:11:55 +00001890
yshahb36649f2025-02-28 09:01:51 +00001891 force = params.get("force", False)
1892 if force:
1893 force_delete_status = self.check_force_delete_and_delete_from_db(
1894 profile_id, workflow_status, resource_status, force
1895 )
1896 if force_delete_status:
1897 return
1898
yshah564ec9c2024-11-29 07:33:32 +00001899 if resource_status:
1900 content["state"] = "DELETED"
yshah6bad8892025-02-11 12:37:04 +00001901 profile_type = self.profile_type_mapping[content["profile_type"]]
1902 self.delete_profile_ksu(profile_id, profile_type)
yshah564ec9c2024-11-29 07:33:32 +00001903 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1904 self.db.del_one(self.db_collection, {"_id": content["_id"]})
1905 self.logger.info(
1906 f"Resource Delete Exit with resource status: {resource_status}"
garciadeblas96b94f52024-07-08 16:18:21 +02001907 )
rshri932105f2024-07-05 15:11:55 +00001908 return
1909
1910
garciadeblas72412282024-11-07 12:41:54 +01001911class K8sInfraControllerLcm(GitOpsLcm):
rshri948f7de2024-12-02 03:42:35 +00001912 db_collection = "k8sinfra_controller"
1913
rshri932105f2024-07-05 15:11:55 +00001914 def __init__(self, msg, lcm_tasks, config):
1915 """
1916 Init, Connect to database, filesystem storage, and messaging
1917 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1918 :return: None
1919 """
garciadeblas72412282024-11-07 12:41:54 +01001920 super().__init__(msg, lcm_tasks, config)
rshri932105f2024-07-05 15:11:55 +00001921
rshri948f7de2024-12-02 03:42:35 +00001922 async def create(self, params, order_id):
garciadeblas61a4c692025-07-17 13:04:13 +02001923 self.logger.info("Infra controller Profile Create Enter")
rshri932105f2024-07-05 15:11:55 +00001924
rshri948f7de2024-12-02 03:42:35 +00001925 op_id = params["operation_id"]
1926 profile_id = params["profile_id"]
1927
1928 # To initialize the operation states
1929 self.initialize_operation(profile_id, op_id)
1930
1931 content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
1932 content["profile_type"] = "infra-controllers"
1933 op_params = self.get_operation_params(content, op_id)
1934 self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
1935
garciadeblas61a4c692025-07-17 13:04:13 +02001936 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001937 "create_profile", op_id, op_params, content
1938 )
garciadeblas26d733c2025-02-03 16:12:43 +01001939 self.logger.info("workflow_name is: {}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00001940
garciadeblas33b36e72025-01-17 12:49:19 +01001941 workflow_status = await self.check_workflow_and_update_db(
1942 op_id, workflow_name, content
1943 )
rshri932105f2024-07-05 15:11:55 +00001944
1945 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01001946 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001947 "create_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001948 )
yshah564ec9c2024-11-29 07:33:32 +00001949 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1950 self.logger.info(
1951 f"Infra Controller Create Exit with resource status: {resource_status}"
rshri932105f2024-07-05 15:11:55 +00001952 )
rshri932105f2024-07-05 15:11:55 +00001953 return
1954
rshri948f7de2024-12-02 03:42:35 +00001955 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001956 self.logger.info("Infra controller delete Enter")
rshri932105f2024-07-05 15:11:55 +00001957
rshri948f7de2024-12-02 03:42:35 +00001958 op_id = params["operation_id"]
1959 profile_id = params["profile_id"]
1960
1961 # To initialize the operation states
1962 self.initialize_operation(profile_id, op_id)
1963
1964 content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
1965 op_params = self.get_operation_params(content, op_id)
1966
garciadeblas61a4c692025-07-17 13:04:13 +02001967 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001968 "delete_profile", op_id, op_params, content
1969 )
garciadeblas26d733c2025-02-03 16:12:43 +01001970 self.logger.info("workflow_name is: {}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00001971
garciadeblas33b36e72025-01-17 12:49:19 +01001972 workflow_status = await self.check_workflow_and_update_db(
1973 op_id, workflow_name, content
1974 )
rshri932105f2024-07-05 15:11:55 +00001975
1976 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01001977 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001978 "delete_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001979 )
rshri932105f2024-07-05 15:11:55 +00001980
yshahb36649f2025-02-28 09:01:51 +00001981 force = params.get("force", False)
1982 if force:
1983 force_delete_status = self.check_force_delete_and_delete_from_db(
1984 profile_id, workflow_status, resource_status, force
1985 )
1986 if force_delete_status:
1987 return
1988
yshah564ec9c2024-11-29 07:33:32 +00001989 if resource_status:
1990 content["state"] = "DELETED"
yshah6bad8892025-02-11 12:37:04 +00001991 profile_type = self.profile_type_mapping[content["profile_type"]]
1992 self.delete_profile_ksu(profile_id, profile_type)
yshah564ec9c2024-11-29 07:33:32 +00001993 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1994 self.db.del_one(self.db_collection, {"_id": content["_id"]})
1995 self.logger.info(
1996 f"Infra Controller Delete Exit with resource status: {resource_status}"
garciadeblas96b94f52024-07-08 16:18:21 +02001997 )
rshri932105f2024-07-05 15:11:55 +00001998 return
1999
2000
garciadeblas72412282024-11-07 12:41:54 +01002001class K8sInfraConfigLcm(GitOpsLcm):
rshri948f7de2024-12-02 03:42:35 +00002002 db_collection = "k8sinfra_config"
2003
rshri932105f2024-07-05 15:11:55 +00002004 def __init__(self, msg, lcm_tasks, config):
2005 """
2006 Init, Connect to database, filesystem storage, and messaging
2007 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
2008 :return: None
2009 """
garciadeblas72412282024-11-07 12:41:54 +01002010 super().__init__(msg, lcm_tasks, config)
rshri932105f2024-07-05 15:11:55 +00002011
rshri948f7de2024-12-02 03:42:35 +00002012 async def create(self, params, order_id):
garciadeblas61a4c692025-07-17 13:04:13 +02002013 self.logger.info("Infra config Profile Create Enter")
rshri932105f2024-07-05 15:11:55 +00002014
rshri948f7de2024-12-02 03:42:35 +00002015 op_id = params["operation_id"]
2016 profile_id = params["profile_id"]
2017
2018 # To initialize the operation states
2019 self.initialize_operation(profile_id, op_id)
2020
2021 content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
2022 content["profile_type"] = "infra-configs"
2023 op_params = self.get_operation_params(content, op_id)
2024 self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
2025
garciadeblas61a4c692025-07-17 13:04:13 +02002026 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02002027 "create_profile", op_id, op_params, content
2028 )
garciadeblas26d733c2025-02-03 16:12:43 +01002029 self.logger.info("workflow_name is: {}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00002030
garciadeblas33b36e72025-01-17 12:49:19 +01002031 workflow_status = await self.check_workflow_and_update_db(
2032 op_id, workflow_name, content
2033 )
rshri932105f2024-07-05 15:11:55 +00002034
2035 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002036 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02002037 "create_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00002038 )
yshah564ec9c2024-11-29 07:33:32 +00002039 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
2040 self.logger.info(
2041 f"Infra Config Create Exit with resource status: {resource_status}"
rshri932105f2024-07-05 15:11:55 +00002042 )
rshri932105f2024-07-05 15:11:55 +00002043 return
2044
rshri948f7de2024-12-02 03:42:35 +00002045 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00002046 self.logger.info("Infra config delete Enter")
2047
rshri948f7de2024-12-02 03:42:35 +00002048 op_id = params["operation_id"]
2049 profile_id = params["profile_id"]
2050
2051 # To initialize the operation states
2052 self.initialize_operation(profile_id, op_id)
2053
2054 content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
2055 op_params = self.get_operation_params(content, op_id)
2056
garciadeblas61a4c692025-07-17 13:04:13 +02002057 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02002058 "delete_profile", op_id, op_params, content
2059 )
garciadeblas26d733c2025-02-03 16:12:43 +01002060 self.logger.info("workflow_name is: {}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00002061
garciadeblas33b36e72025-01-17 12:49:19 +01002062 workflow_status = await self.check_workflow_and_update_db(
2063 op_id, workflow_name, content
2064 )
yshah564ec9c2024-11-29 07:33:32 +00002065
rshri932105f2024-07-05 15:11:55 +00002066 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002067 resource_status, content = await self.check_resource_and_update_db(
yshah564ec9c2024-11-29 07:33:32 +00002068 "delete_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00002069 )
yshah564ec9c2024-11-29 07:33:32 +00002070
yshahb36649f2025-02-28 09:01:51 +00002071 force = params.get("force", False)
2072 if force:
2073 force_delete_status = self.check_force_delete_and_delete_from_db(
2074 profile_id, workflow_status, resource_status, force
2075 )
2076 if force_delete_status:
2077 return
2078
rshri932105f2024-07-05 15:11:55 +00002079 if resource_status:
yshah564ec9c2024-11-29 07:33:32 +00002080 content["state"] = "DELETED"
yshah6bad8892025-02-11 12:37:04 +00002081 profile_type = self.profile_type_mapping[content["profile_type"]]
2082 self.delete_profile_ksu(profile_id, profile_type)
yshah564ec9c2024-11-29 07:33:32 +00002083 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
2084 self.db.del_one(self.db_collection, {"_id": content["_id"]})
2085 self.logger.info(
2086 f"Infra Config Delete Exit with resource status: {resource_status}"
garciadeblas96b94f52024-07-08 16:18:21 +02002087 )
rshri932105f2024-07-05 15:11:55 +00002088
rshri932105f2024-07-05 15:11:55 +00002089 return
yshah771dea82024-07-05 15:11:49 +00002090
2091
garciadeblas72412282024-11-07 12:41:54 +01002092class OkaLcm(GitOpsLcm):
yshah771dea82024-07-05 15:11:49 +00002093 db_collection = "okas"
2094
2095 def __init__(self, msg, lcm_tasks, config):
2096 """
2097 Init, Connect to database, filesystem storage, and messaging
2098 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
2099 :return: None
2100 """
garciadeblas72412282024-11-07 12:41:54 +01002101 super().__init__(msg, lcm_tasks, config)
yshah771dea82024-07-05 15:11:49 +00002102
yshah564ec9c2024-11-29 07:33:32 +00002103 async def create(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02002104 self.logger.info("OKA Create Enter")
yshah564ec9c2024-11-29 07:33:32 +00002105 op_id = params["operation_id"]
2106 oka_id = params["oka_id"]
2107 self.initialize_operation(oka_id, op_id)
2108 db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
2109 op_params = self.get_operation_params(db_content, op_id)
yshah771dea82024-07-05 15:11:49 +00002110
garciadeblas61a4c692025-07-17 13:04:13 +02002111 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02002112 "create_oka", op_id, op_params, db_content
2113 )
yshah564ec9c2024-11-29 07:33:32 +00002114
garciadeblas33b36e72025-01-17 12:49:19 +01002115 workflow_status = await self.check_workflow_and_update_db(
2116 op_id, workflow_name, db_content
2117 )
yshah771dea82024-07-05 15:11:49 +00002118
2119 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002120 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02002121 "create_oka", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00002122 )
garciadeblas96b94f52024-07-08 16:18:21 +02002123 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
garciadeblasb23d2dc2025-02-21 10:15:49 +01002124
2125 # Clean items used in the workflow, no matter if the workflow succeeded
2126 clean_status, clean_msg = await self.odu.clean_items_workflow(
2127 "create_oka", op_id, op_params, db_content
2128 )
2129 self.logger.info(
2130 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
2131 )
yshah564ec9c2024-11-29 07:33:32 +00002132 self.logger.info(f"OKA Create Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00002133 return
2134
yshah564ec9c2024-11-29 07:33:32 +00002135 async def edit(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02002136 self.logger.info("OKA Edit Enter")
yshah564ec9c2024-11-29 07:33:32 +00002137 op_id = params["operation_id"]
2138 oka_id = params["oka_id"]
2139 self.initialize_operation(oka_id, op_id)
2140 db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
2141 op_params = self.get_operation_params(db_content, op_id)
yshah771dea82024-07-05 15:11:49 +00002142
garciadeblas61a4c692025-07-17 13:04:13 +02002143 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00002144 "update_oka", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02002145 )
garciadeblas33b36e72025-01-17 12:49:19 +01002146 workflow_status = await self.check_workflow_and_update_db(
2147 op_id, workflow_name, db_content
2148 )
yshah771dea82024-07-05 15:11:49 +00002149
2150 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002151 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02002152 "update_oka", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00002153 )
garciadeblas96b94f52024-07-08 16:18:21 +02002154 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
garciadeblasb23d2dc2025-02-21 10:15:49 +01002155 # Clean items used in the workflow, no matter if the workflow succeeded
2156 clean_status, clean_msg = await self.odu.clean_items_workflow(
2157 "update_oka", op_id, op_params, db_content
2158 )
2159 self.logger.info(
2160 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
2161 )
yshah564ec9c2024-11-29 07:33:32 +00002162 self.logger.info(f"OKA Update Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00002163 return
2164
yshah564ec9c2024-11-29 07:33:32 +00002165 async def delete(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02002166 self.logger.info("OKA delete Enter")
yshah564ec9c2024-11-29 07:33:32 +00002167 op_id = params["operation_id"]
2168 oka_id = params["oka_id"]
2169 self.initialize_operation(oka_id, op_id)
2170 db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
2171 op_params = self.get_operation_params(db_content, op_id)
yshah771dea82024-07-05 15:11:49 +00002172
garciadeblas61a4c692025-07-17 13:04:13 +02002173 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00002174 "delete_oka", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02002175 )
garciadeblas33b36e72025-01-17 12:49:19 +01002176 workflow_status = await self.check_workflow_and_update_db(
2177 op_id, workflow_name, db_content
2178 )
yshah771dea82024-07-05 15:11:49 +00002179
2180 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002181 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02002182 "delete_oka", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00002183 )
yshah771dea82024-07-05 15:11:49 +00002184
yshahb36649f2025-02-28 09:01:51 +00002185 force = params.get("force", False)
2186 if force:
2187 force_delete_status = self.check_force_delete_and_delete_from_db(
2188 oka_id, workflow_status, resource_status, force
2189 )
2190 if force_delete_status:
2191 return
2192
yshah564ec9c2024-11-29 07:33:32 +00002193 if resource_status:
2194 db_content["state"] == "DELETED"
2195 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
garciadeblas96b94f52024-07-08 16:18:21 +02002196 self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
garciadeblasb23d2dc2025-02-21 10:15:49 +01002197 # Clean items used in the workflow, no matter if the workflow succeeded
2198 clean_status, clean_msg = await self.odu.clean_items_workflow(
2199 "delete_oka", op_id, op_params, db_content
2200 )
2201 self.logger.info(
2202 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
2203 )
yshah564ec9c2024-11-29 07:33:32 +00002204 self.logger.info(f"OKA Delete Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00002205 return
2206
2207
garciadeblas72412282024-11-07 12:41:54 +01002208class KsuLcm(GitOpsLcm):
yshah771dea82024-07-05 15:11:49 +00002209 db_collection = "ksus"
2210
2211 def __init__(self, msg, lcm_tasks, config):
2212 """
2213 Init, Connect to database, filesystem storage, and messaging
2214 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
2215 :return: None
2216 """
garciadeblas72412282024-11-07 12:41:54 +01002217 super().__init__(msg, lcm_tasks, config)
garciadeblasad6d1ba2025-01-22 16:02:18 +01002218 self._workflows = {
2219 "create_ksus": {
2220 "check_resource_function": self.check_create_ksus,
2221 },
2222 "delete_ksus": {
2223 "check_resource_function": self.check_delete_ksus,
2224 },
2225 }
2226
2227 def get_dbclusters_from_profile(self, profile_id, profile_type):
2228 cluster_list = []
2229 db_clusters = self.db.get_list("clusters")
2230 self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}")
2231 for db_cluster in db_clusters:
2232 if profile_id in db_cluster.get(profile_type, []):
2233 self.logger.info(
2234 f"Profile {profile_id} found in cluster {db_cluster['name']}"
2235 )
2236 cluster_list.append(db_cluster)
2237 return cluster_list
yshah771dea82024-07-05 15:11:49 +00002238
yshah564ec9c2024-11-29 07:33:32 +00002239 async def create(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02002240 self.logger.info("ksu Create Enter")
yshah564ec9c2024-11-29 07:33:32 +00002241 db_content = []
2242 op_params = []
2243 op_id = params["operation_id"]
2244 for ksu_id in params["ksus_list"]:
2245 self.logger.info("Ksu ID: {}".format(ksu_id))
2246 self.initialize_operation(ksu_id, op_id)
2247 db_ksu = self.db.get_one(self.db_collection, {"_id": ksu_id})
2248 self.logger.info("Db KSU: {}".format(db_ksu))
2249 db_content.append(db_ksu)
2250 ksu_params = {}
2251 ksu_params = self.get_operation_params(db_ksu, op_id)
2252 self.logger.info("Operation Params: {}".format(ksu_params))
2253 # Update ksu_params["profile"] with profile name and age-pubkey
2254 profile_type = ksu_params["profile"]["profile_type"]
2255 profile_id = ksu_params["profile"]["_id"]
2256 profile_collection = self.profile_collection_mapping[profile_type]
2257 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
garciadeblasd41e9292025-03-11 15:44:25 +01002258 # db_profile is decrypted inline
2259 # No need to use decrypted_copy because db_profile won't be updated.
2260 self.decrypt_age_keys(db_profile)
yshah564ec9c2024-11-29 07:33:32 +00002261 ksu_params["profile"]["name"] = db_profile["name"]
2262 ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
2263 # Update ksu_params["oka"] with sw_catalog_path (when missing)
garciadeblas9f7c9c52025-01-17 01:06:05 +01002264 # TODO: remove this in favor of doing it in ODU workflow
yshah564ec9c2024-11-29 07:33:32 +00002265 for oka in ksu_params["oka"]:
2266 if "sw_catalog_path" not in oka:
2267 oka_id = oka["_id"]
2268 db_oka = self.db.get_one("okas", {"_id": oka_id})
yshah2f39b8a2024-12-19 11:06:24 +00002269 oka_type = MAP_PROFILE[
2270 db_oka.get("profile_type", "infra_controller_profiles")
2271 ]
garciadeblas9f7c9c52025-01-17 01:06:05 +01002272 oka[
2273 "sw_catalog_path"
garciadeblas29f8bcf2025-01-24 14:24:41 +01002274 ] = f"{oka_type}/{db_oka['git_name'].lower()}/templates"
yshah564ec9c2024-11-29 07:33:32 +00002275 op_params.append(ksu_params)
yshah771dea82024-07-05 15:11:49 +00002276
garciadeblasad6d1ba2025-01-22 16:02:18 +01002277 # A single workflow is launched for all KSUs
garciadeblas61a4c692025-07-17 13:04:13 +02002278 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00002279 "create_ksus", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02002280 )
garciadeblasad6d1ba2025-01-22 16:02:18 +01002281 # Update workflow status in all KSUs
2282 wf_status_list = []
yshah564ec9c2024-11-29 07:33:32 +00002283 for db_ksu, ksu_params in zip(db_content, op_params):
garciadeblas33b36e72025-01-17 12:49:19 +01002284 workflow_status = await self.check_workflow_and_update_db(
2285 op_id, workflow_name, db_ksu
2286 )
garciadeblasad6d1ba2025-01-22 16:02:18 +01002287 wf_status_list.append(workflow_status)
2288 # Update resource status in all KSUs
2289 # TODO: Is an operation correct if n KSUs are right and 1 is not OK?
2290 res_status_list = []
2291 for db_ksu, ksu_params, wf_status in zip(db_content, op_params, wf_status_list):
2292 if wf_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002293 resource_status, db_ksu = await self.check_resource_and_update_db(
yshah564ec9c2024-11-29 07:33:32 +00002294 "create_ksus", op_id, ksu_params, db_ksu
2295 )
garciadeblasad6d1ba2025-01-22 16:02:18 +01002296 else:
2297 resource_status = False
2298 res_status_list.append(resource_status)
garciadeblas96b94f52024-07-08 16:18:21 +02002299 self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
2300
garciadeblasd8429852024-10-17 15:30:30 +02002301 # Clean items used in the workflow, no matter if the workflow succeeded
2302 clean_status, clean_msg = await self.odu.clean_items_workflow(
yshah564ec9c2024-11-29 07:33:32 +00002303 "create_ksus", op_id, op_params, db_content
garciadeblasd8429852024-10-17 15:30:30 +02002304 )
2305 self.logger.info(
2306 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
2307 )
garciadeblasad6d1ba2025-01-22 16:02:18 +01002308 self.logger.info(f"KSU Create EXIT with Resource Status {res_status_list}")
yshah771dea82024-07-05 15:11:49 +00002309 return
2310
yshah564ec9c2024-11-29 07:33:32 +00002311 async def edit(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02002312 self.logger.info("ksu edit Enter")
yshah564ec9c2024-11-29 07:33:32 +00002313 db_content = []
2314 op_params = []
2315 op_id = params["operation_id"]
2316 for ksu_id in params["ksus_list"]:
2317 self.initialize_operation(ksu_id, op_id)
2318 db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
2319 db_content.append(db_ksu)
2320 ksu_params = {}
2321 ksu_params = self.get_operation_params(db_ksu, op_id)
2322 # Update ksu_params["profile"] with profile name and age-pubkey
2323 profile_type = ksu_params["profile"]["profile_type"]
2324 profile_id = ksu_params["profile"]["_id"]
2325 profile_collection = self.profile_collection_mapping[profile_type]
2326 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
garciadeblasd41e9292025-03-11 15:44:25 +01002327 # db_profile is decrypted inline
2328 # No need to use decrypted_copy because db_profile won't be updated.
2329 self.decrypt_age_keys(db_profile)
yshah564ec9c2024-11-29 07:33:32 +00002330 ksu_params["profile"]["name"] = db_profile["name"]
2331 ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
2332 # Update ksu_params["oka"] with sw_catalog_path (when missing)
garciadeblas9f7c9c52025-01-17 01:06:05 +01002333 # TODO: remove this in favor of doing it in ODU workflow
yshah564ec9c2024-11-29 07:33:32 +00002334 for oka in ksu_params["oka"]:
2335 if "sw_catalog_path" not in oka:
2336 oka_id = oka["_id"]
2337 db_oka = self.db.get_one("okas", {"_id": oka_id})
yshah2f39b8a2024-12-19 11:06:24 +00002338 oka_type = MAP_PROFILE[
2339 db_oka.get("profile_type", "infra_controller_profiles")
2340 ]
garciadeblas9f7c9c52025-01-17 01:06:05 +01002341 oka[
2342 "sw_catalog_path"
2343 ] = f"{oka_type}/{db_oka['git_name']}/templates"
yshah564ec9c2024-11-29 07:33:32 +00002344 op_params.append(ksu_params)
yshah771dea82024-07-05 15:11:49 +00002345
garciadeblas61a4c692025-07-17 13:04:13 +02002346 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00002347 "update_ksus", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02002348 )
yshah771dea82024-07-05 15:11:49 +00002349
yshah564ec9c2024-11-29 07:33:32 +00002350 for db_ksu, ksu_params in zip(db_content, op_params):
garciadeblas33b36e72025-01-17 12:49:19 +01002351 workflow_status = await self.check_workflow_and_update_db(
2352 op_id, workflow_name, db_ksu
2353 )
yshah564ec9c2024-11-29 07:33:32 +00002354
garciadeblas96b94f52024-07-08 16:18:21 +02002355 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002356 resource_status, db_ksu = await self.check_resource_and_update_db(
yshah564ec9c2024-11-29 07:33:32 +00002357 "update_ksus", op_id, ksu_params, db_ksu
garciadeblas96b94f52024-07-08 16:18:21 +02002358 )
garciadeblas96b94f52024-07-08 16:18:21 +02002359 db_ksu["name"] = ksu_params["name"]
2360 db_ksu["description"] = ksu_params["description"]
2361 db_ksu["profile"]["profile_type"] = ksu_params["profile"][
2362 "profile_type"
2363 ]
2364 db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"]
2365 db_ksu["oka"] = ksu_params["oka"]
2366 self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
2367
yshah564ec9c2024-11-29 07:33:32 +00002368 # Clean items used in the workflow, no matter if the workflow succeeded
2369 clean_status, clean_msg = await self.odu.clean_items_workflow(
2370 "create_ksus", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02002371 )
2372 self.logger.info(
yshah564ec9c2024-11-29 07:33:32 +00002373 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
garciadeblas96b94f52024-07-08 16:18:21 +02002374 )
yshah564ec9c2024-11-29 07:33:32 +00002375 self.logger.info(f"KSU Update EXIT with Resource Status {resource_status}")
yshah771dea82024-07-05 15:11:49 +00002376 return
2377
yshah564ec9c2024-11-29 07:33:32 +00002378 async def delete(self, params, order_id):
2379 self.logger.info("ksu delete Enter")
2380 db_content = []
2381 op_params = []
2382 op_id = params["operation_id"]
2383 for ksu_id in params["ksus_list"]:
2384 self.initialize_operation(ksu_id, op_id)
2385 db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
2386 db_content.append(db_ksu)
2387 ksu_params = {}
2388 ksu_params["profile"] = {}
2389 ksu_params["profile"]["profile_type"] = db_ksu["profile"]["profile_type"]
2390 ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"]
garciadeblasd41e9292025-03-11 15:44:25 +01002391 # Update ksu_params["profile"] with profile name
yshah564ec9c2024-11-29 07:33:32 +00002392 profile_type = ksu_params["profile"]["profile_type"]
2393 profile_id = ksu_params["profile"]["_id"]
2394 profile_collection = self.profile_collection_mapping[profile_type]
2395 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
2396 ksu_params["profile"]["name"] = db_profile["name"]
yshah564ec9c2024-11-29 07:33:32 +00002397 op_params.append(ksu_params)
yshah771dea82024-07-05 15:11:49 +00002398
garciadeblas61a4c692025-07-17 13:04:13 +02002399 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00002400 "delete_ksus", op_id, op_params, db_content
2401 )
2402
2403 for db_ksu, ksu_params in zip(db_content, op_params):
garciadeblas33b36e72025-01-17 12:49:19 +01002404 workflow_status = await self.check_workflow_and_update_db(
2405 op_id, workflow_name, db_ksu
2406 )
yshah564ec9c2024-11-29 07:33:32 +00002407
2408 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002409 resource_status, db_ksu = await self.check_resource_and_update_db(
yshah564ec9c2024-11-29 07:33:32 +00002410 "delete_ksus", op_id, ksu_params, db_ksu
2411 )
2412
yshahb36649f2025-02-28 09:01:51 +00002413 force = params.get("force", False)
2414 if force:
2415 force_delete_status = self.check_force_delete_and_delete_from_db(
2416 db_ksu["_id"], workflow_status, resource_status, force
2417 )
2418 if force_delete_status:
2419 return
2420
yshah564ec9c2024-11-29 07:33:32 +00002421 if resource_status:
2422 db_ksu["state"] == "DELETED"
yshah5e109152025-05-19 12:29:01 +00002423 self.delete_ksu_dependency(db_ksu["_id"], db_ksu)
yshah564ec9c2024-11-29 07:33:32 +00002424 self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
2425 self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]})
2426
2427 self.logger.info(f"KSU Delete Exit with resource status: {resource_status}")
2428 return
2429
2430 async def clone(self, params, order_id):
2431 self.logger.info("ksu clone Enter")
2432 op_id = params["operation_id"]
2433 ksus_id = params["ksus_list"][0]
2434 self.initialize_operation(ksus_id, op_id)
2435 db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
2436 op_params = self.get_operation_params(db_content, op_id)
garciadeblas61a4c692025-07-17 13:04:13 +02002437 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02002438 "clone_ksus", op_id, op_params, db_content
2439 )
yshah564ec9c2024-11-29 07:33:32 +00002440
garciadeblas33b36e72025-01-17 12:49:19 +01002441 workflow_status = await self.check_workflow_and_update_db(
2442 op_id, workflow_name, db_content
2443 )
yshah771dea82024-07-05 15:11:49 +00002444
2445 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002446 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02002447 "clone_ksus", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00002448 )
garciadeblas96b94f52024-07-08 16:18:21 +02002449 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
yshah564ec9c2024-11-29 07:33:32 +00002450
2451 self.logger.info(f"KSU Clone Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00002452 return
2453
yshah564ec9c2024-11-29 07:33:32 +00002454 async def move(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02002455 self.logger.info("ksu move Enter")
yshah564ec9c2024-11-29 07:33:32 +00002456 op_id = params["operation_id"]
2457 ksus_id = params["ksus_list"][0]
2458 self.initialize_operation(ksus_id, op_id)
2459 db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
2460 op_params = self.get_operation_params(db_content, op_id)
garciadeblas61a4c692025-07-17 13:04:13 +02002461 workflow_res, workflow_name, _ = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02002462 "move_ksus", op_id, op_params, db_content
2463 )
yshah564ec9c2024-11-29 07:33:32 +00002464
garciadeblas33b36e72025-01-17 12:49:19 +01002465 workflow_status = await self.check_workflow_and_update_db(
2466 op_id, workflow_name, db_content
2467 )
yshah771dea82024-07-05 15:11:49 +00002468
2469 if workflow_status:
garciadeblas33b36e72025-01-17 12:49:19 +01002470 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02002471 "move_ksus", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00002472 )
garciadeblas96b94f52024-07-08 16:18:21 +02002473 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
yshah564ec9c2024-11-29 07:33:32 +00002474
2475 self.logger.info(f"KSU Move Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00002476 return
garciadeblasad6d1ba2025-01-22 16:02:18 +01002477
2478 async def check_create_ksus(self, op_id, op_params, content):
2479 self.logger.info(f"check_create_ksus Operation {op_id}. Params: {op_params}.")
2480 self.logger.debug(f"Content: {content}")
2481 db_ksu = content
2482 kustomization_name = db_ksu["git_name"].lower()
2483 oka_list = op_params["oka"]
2484 oka_item = oka_list[0]
2485 oka_params = oka_item.get("transformation", {})
garciadeblas167dde32025-02-14 00:44:58 +01002486 kustomization_ns = oka_params.get("kustomization_namespace", "flux-system")
garciadeblasad6d1ba2025-01-22 16:02:18 +01002487 profile_id = op_params.get("profile", {}).get("_id")
2488 profile_type = op_params.get("profile", {}).get("profile_type")
2489 self.logger.info(
2490 f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
2491 )
2492 dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
2493 if not dbcluster_list:
2494 self.logger.info(f"No clusters found for profile {profile_id}.")
2495 for db_cluster in dbcluster_list:
2496 try:
2497 self.logger.info(
garciadeblasae238482025-02-03 08:44:19 +01002498 f"Checking status of KSU {db_ksu['name']} in cluster {db_cluster['name']}."
garciadeblasad6d1ba2025-01-22 16:02:18 +01002499 )
2500 cluster_kubectl = self.cluster_kubectl(db_cluster)
2501 checkings_list = [
2502 {
2503 "item": "kustomization",
2504 "name": kustomization_name,
garciadeblas167dde32025-02-14 00:44:58 +01002505 "namespace": kustomization_ns,
garciadeblas7cf480d2025-01-27 16:53:45 +01002506 "condition": {
2507 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
2508 "value": "True",
2509 },
garciadeblasad6d1ba2025-01-22 16:02:18 +01002510 "timeout": self._checkloop_kustomization_timeout,
2511 "enable": True,
2512 "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
2513 },
2514 ]
2515 self.logger.info(
2516 f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
2517 )
2518 result, message = await self.common_check_list(
garciadeblas6d8acf32025-02-06 13:34:37 +01002519 op_id, checkings_list, "ksus", db_ksu, kubectl_obj=cluster_kubectl
garciadeblasad6d1ba2025-01-22 16:02:18 +01002520 )
2521 if not result:
2522 return False, message
2523 except Exception as e:
2524 self.logger.error(
2525 f"Error checking KSU in cluster {db_cluster['name']}."
2526 )
2527 self.logger.error(e)
2528 return False, f"Error checking KSU in cluster {db_cluster['name']}."
2529 return True, "OK"
2530
2531 async def check_delete_ksus(self, op_id, op_params, content):
2532 self.logger.info(f"check_delete_ksus Operation {op_id}. Params: {op_params}.")
2533 self.logger.debug(f"Content: {content}")
2534 db_ksu = content
2535 kustomization_name = db_ksu["git_name"].lower()
2536 oka_list = db_ksu["oka"]
2537 oka_item = oka_list[0]
2538 oka_params = oka_item.get("transformation", {})
garciadeblas167dde32025-02-14 00:44:58 +01002539 kustomization_ns = oka_params.get("kustomization_namespace", "flux-system")
garciadeblasad6d1ba2025-01-22 16:02:18 +01002540 profile_id = op_params.get("profile", {}).get("_id")
2541 profile_type = op_params.get("profile", {}).get("profile_type")
2542 self.logger.info(
2543 f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
2544 )
2545 dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
2546 if not dbcluster_list:
2547 self.logger.info(f"No clusters found for profile {profile_id}.")
2548 for db_cluster in dbcluster_list:
2549 try:
2550 self.logger.info(
2551 f"Checking status of KSU in cluster {db_cluster['name']}."
2552 )
2553 cluster_kubectl = self.cluster_kubectl(db_cluster)
2554 checkings_list = [
2555 {
2556 "item": "kustomization",
2557 "name": kustomization_name,
garciadeblas167dde32025-02-14 00:44:58 +01002558 "namespace": kustomization_ns,
garciadeblasad6d1ba2025-01-22 16:02:18 +01002559 "deleted": True,
2560 "timeout": self._checkloop_kustomization_timeout,
2561 "enable": True,
2562 "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
2563 },
2564 ]
2565 self.logger.info(
2566 f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
2567 )
2568 result, message = await self.common_check_list(
garciadeblas6d8acf32025-02-06 13:34:37 +01002569 op_id, checkings_list, "ksus", db_ksu, kubectl_obj=cluster_kubectl
garciadeblasad6d1ba2025-01-22 16:02:18 +01002570 )
2571 if not result:
2572 return False, message
2573 except Exception as e:
2574 self.logger.error(
2575 f"Error checking KSU in cluster {db_cluster['name']}."
2576 )
2577 self.logger.error(e)
2578 return False, f"Error checking KSU in cluster {db_cluster['name']}."
2579 return True, "OK"
garciadeblas61a4c692025-07-17 13:04:13 +02002580
2581
2582class AppInstanceLcm(GitOpsLcm):
2583 db_collection = "appinstances"
2584
2585 def __init__(self, msg, lcm_tasks, config):
2586 """
2587 Init, Connect to database, filesystem storage, and messaging
2588 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
2589 :return: None
2590 """
2591 super().__init__(msg, lcm_tasks, config)
2592 self._workflows = {
2593 "create_app": {
2594 "check_resource_function": self.check_create_app,
2595 },
2596 "update_app": {
2597 "check_resource_function": self.check_update_app,
2598 },
2599 "delete_app": {
2600 "check_resource_function": self.check_delete_app,
2601 },
2602 }
2603
2604 def get_dbclusters_from_profile(self, profile_id, profile_type):
2605 cluster_list = []
2606 db_clusters = self.db.get_list("clusters")
2607 self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}")
2608 for db_cluster in db_clusters:
2609 if profile_id in db_cluster.get(profile_type, []):
2610 self.logger.info(
2611 f"Profile {profile_id} found in cluster {db_cluster['name']}"
2612 )
2613 cluster_list.append(db_cluster)
2614 return cluster_list
2615
2616 def update_app_dependency(self, app_id, db_app):
2617 self.logger.info(f"Updating AppInstance dependencies for AppInstance {app_id}")
2618 oka_id = db_app.get("oka")
2619 if not oka_id:
2620 self.logger.info(f"No OKA associated with AppInstance {app_id}")
2621 return
2622
2623 used_oka = []
2624 all_apps = self.db.get_list(self.db_collection, {})
2625 for app in all_apps:
2626 if app["_id"] != app_id:
2627 app_oka_id = app["oka"]
2628 if app_oka_id not in used_oka:
2629 used_oka.append(app_oka_id)
2630 self.logger.info(f"Used OKA: {used_oka}")
2631
2632 if oka_id not in used_oka:
2633 self.db.set_one(
2634 "okas", {"_id": oka_id}, {"_admin.usageState": "NOT_IN_USE"}
2635 )
2636 return
2637
2638 async def generic_operation(self, params, order_id, operation_name):
2639 self.logger.info(f"Generic operation. Operation name: {operation_name}")
2640 # self.logger.debug(f"Params: {params}")
2641 try:
2642 op_id = params["operation_id"]
2643 app_id = params["appinstance"]
2644 self.initialize_operation(app_id, op_id)
2645 db_app = self.db.get_one(self.db_collection, {"_id": app_id})
2646 # self.logger.debug("Db App: {}".format(db_app))
2647
2648 # Initialize workflow_content with a copy of the db_app, decrypting fields to use in workflows
2649 db_app_copy = self.decrypted_copy(db_app)
2650 workflow_content = {
2651 "app": db_app_copy,
2652 }
2653
2654 # Update workflow_content with profile info
2655 profile_type = db_app["profile_type"]
2656 profile_id = db_app["profile"]
2657 profile_collection = self.profile_collection_mapping[profile_type]
2658 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
2659 # db_profile is decrypted inline
2660 # No need to use decrypted_copy because db_profile won't be updated.
2661 self.decrypt_age_keys(db_profile)
2662 workflow_content["profile"] = db_profile
2663
2664 op_params = self.get_operation_params(db_app, op_id)
2665 if not op_params:
2666 op_params = {}
2667 self.logger.debug("Operation Params: {}".format(op_params))
2668
2669 # Get SW catalog path from op_params or from DB
2670 aux_dict = {}
2671 if operation_name == "create_app":
2672 aux_dict = op_params
2673 else:
2674 aux_dict = db_app
2675 sw_catalog_path = ""
2676 if "sw_catalog_path" in aux_dict:
2677 sw_catalog_path = aux_dict.get("sw_catalog_path", "")
2678 elif "oka" in aux_dict:
2679 oka_id = aux_dict["oka"]
2680 db_oka = self.db.get_one("okas", {"_id": oka_id})
2681 oka_type = MAP_PROFILE[
2682 db_oka.get("profile_type", "infra_controller_profiles")
2683 ]
2684 sw_catalog_path = f"{oka_type}/{db_oka['git_name'].lower()}"
2685 else:
2686 self.logger.error("SW Catalog path could not be determined.")
2687 raise LcmException("SW Catalog path could not be determined.")
2688 self.logger.debug(f"SW Catalog path: {sw_catalog_path}")
2689
2690 # Get model from Git repo
2691 # Clone the SW catalog repo
2692 repodir = self.cloneGitRepo(
2693 repo_url=self._full_repo_sw_catalogs_url, branch="main"
2694 )
2695 model_file_path = os.path.join(repodir, sw_catalog_path, "model.yaml")
2696 if not os.path.exists(model_file_path):
2697 self.logger.error(f"Model file not found at path: {model_file_path}")
2698 raise LcmException(f"Model file not found at path: {model_file_path}")
2699 # Store the model content in workflow_content
2700 with open(model_file_path) as model_file:
2701 workflow_content["model"] = yaml.safe_load(model_file.read())
2702
2703 # A single workflow is launched for the App operation
2704 self.logger.debug("Launching workflow {}".format(operation_name))
2705 (
2706 workflow_res,
2707 workflow_name,
2708 workflow_resources,
2709 ) = await self.odu.launch_workflow(
2710 operation_name, op_id, op_params, workflow_content
2711 )
2712
2713 if not workflow_res:
2714 self.logger.error(f"Failed to launch workflow: {workflow_name}")
2715 if operation_name == "create_app":
2716 db_app["state"] = "FAILED_CREATION"
2717 elif operation_name == "delete_app":
2718 db_app["state"] = "FAILED_DELETION"
2719 db_app["resourceState"] = "ERROR"
2720 db_app = self.update_operation_history(
2721 db_app, op_id, workflow_status=False, resource_status=None
2722 )
2723 self.db.set_one(self.db_collection, {"_id": db_app["_id"]}, db_app)
2724 # Clean items used in the workflow, no matter if the workflow succeeded
2725 clean_status, clean_msg = await self.odu.clean_items_workflow(
2726 operation_name, op_id, op_params, workflow_content
2727 )
2728 self.logger.info(
2729 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
2730 )
2731 return
2732
2733 # Update resources created in workflow
2734 db_app["app_model"] = workflow_resources.get("app_model", {})
2735
2736 # Update workflow status in App
2737 workflow_status = await self.check_workflow_and_update_db(
2738 op_id, workflow_name, db_app
2739 )
2740 # Update resource status in DB
2741 if workflow_status:
2742 resource_status, db_app = await self.check_resource_and_update_db(
2743 operation_name, op_id, op_params, db_app
2744 )
2745 else:
2746 resource_status = False
2747 self.db.set_one(self.db_collection, {"_id": db_app["_id"]}, db_app)
2748
2749 # Clean items used in the workflow, no matter if the workflow succeeded
2750 clean_status, clean_msg = await self.odu.clean_items_workflow(
2751 operation_name, op_id, op_params, workflow_content
2752 )
2753 self.logger.info(
2754 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
2755 )
2756
2757 if operation_name == "delete_app":
2758 force = params.get("force", False)
2759 if force:
2760 force_delete_status = self.check_force_delete_and_delete_from_db(
2761 db_app["_id"], workflow_status, resource_status, force
2762 )
2763 if force_delete_status:
2764 return
2765 if resource_status:
2766 db_app["state"] == "DELETED"
2767 self.update_app_dependency(db_app["_id"], db_app)
2768 self.db.del_one(self.db_collection, {"_id": db_app["_id"]})
2769
2770 self.logger.info(
2771 f"Generic app operation Exit {operation_name} with resource Status {resource_status}"
2772 )
2773 return
2774 except Exception as e:
2775 self.logger.debug(traceback.format_exc())
2776 self.logger.debug(f"Exception: {e}", exc_info=True)
2777 return
2778
2779 async def create(self, params, order_id):
2780 self.logger.info("App Create Enter")
2781 return await self.generic_operation(params, order_id, "create_app")
2782
2783 async def update(self, params, order_id):
2784 self.logger.info("App Edit Enter")
2785 return await self.generic_operation(params, order_id, "update_app")
2786
2787 async def delete(self, params, order_id):
2788 self.logger.info("App Delete Enter")
2789 return await self.generic_operation(params, order_id, "delete_app")
2790
2791 async def check_appinstance(self, op_id, op_params, content, deleted=False):
2792 self.logger.info(
2793 f"check_app_instance Operation {op_id}. Params: {op_params}. Deleted: {deleted}"
2794 )
2795 self.logger.debug(f"Content: {content}")
2796 db_app = content
2797 profile_id = db_app["profile"]
2798 profile_type = db_app["profile_type"]
2799 app_name = db_app["name"]
2800 self.logger.info(
2801 f"Checking status of AppInstance {app_name} for profile {profile_id}."
2802 )
2803
2804 # TODO: read app_model and get kustomization name and namespace
2805 # app_model = db_app.get("app_model", {})
2806 kustomization_list = [
2807 {
2808 "name": f"jenkins-{app_name}",
2809 "namespace": "flux-system",
2810 }
2811 ]
2812 checkings_list = []
2813 if deleted:
2814 for kustomization in kustomization_list:
2815 checkings_list.append(
2816 {
2817 "item": "kustomization",
2818 "name": kustomization["name"].lower(),
2819 "namespace": kustomization["namespace"],
2820 "deleted": True,
2821 "timeout": self._checkloop_kustomization_timeout,
2822 "enable": True,
2823 "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
2824 }
2825 )
2826 else:
2827 for kustomization in kustomization_list:
2828 checkings_list.append(
2829 {
2830 "item": "kustomization",
2831 "name": kustomization["name"].lower(),
2832 "namespace": kustomization["namespace"],
2833 "condition": {
2834 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
2835 "value": "True",
2836 },
2837 "timeout": self._checkloop_kustomization_timeout,
2838 "enable": True,
2839 "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
2840 }
2841 )
2842
2843 dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
2844 if not dbcluster_list:
2845 self.logger.info(f"No clusters found for profile {profile_id}.")
2846 for db_cluster in dbcluster_list:
2847 try:
2848 self.logger.info(
2849 f"Checking status of AppInstance {app_name} in cluster {db_cluster['name']}."
2850 )
2851 cluster_kubectl = self.cluster_kubectl(db_cluster)
2852 result, message = await self.common_check_list(
2853 op_id,
2854 checkings_list,
2855 self.db_collection,
2856 db_app,
2857 kubectl_obj=cluster_kubectl,
2858 )
2859 if not result:
2860 return False, message
2861 except Exception as e:
2862 self.logger.error(
2863 f"Error checking AppInstance in cluster {db_cluster['name']}."
2864 )
2865 self.logger.error(e)
2866 return (
2867 False,
2868 f"Error checking AppInstance in cluster {db_cluster['name']}.",
2869 )
2870 return True, "OK"
2871
2872 async def check_create_app(self, op_id, op_params, content):
2873 self.logger.info(f"check_update_app Operation {op_id}. Params: {op_params}.")
2874 # self.logger.debug(f"Content: {content}")
2875 return await self.check_appinstance(op_id, op_params, content)
2876
2877 async def check_update_app(self, op_id, op_params, content):
2878 self.logger.info(f"check_update_app Operation {op_id}. Params: {op_params}.")
2879 # self.logger.debug(f"Content: {content}")
2880 return await self.check_appinstance(op_id, op_params, content)
2881
2882 async def check_delete_app(self, op_id, op_params, content):
2883 self.logger.info(f"check_delete_app Operation {op_id}. Params: {op_params}.")
2884 # self.logger.debug(f"Content: {content}")
2885 return await self.check_appinstance(op_id, op_params, content, deleted=True)