blob: 5d3cca48e772eadd34afd00d8f4f8ce20591fefa [file] [log] [blame]
rshri932105f2024-07-05 15:11:55 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16__author__ = (
17 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
18 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
19)
20
rshric3564942024-11-12 18:12:38 +000021import copy
rshri932105f2024-07-05 15:11:55 +000022import logging
yshahd940c652024-10-17 06:11:12 +000023from time import time
garciadeblas72412282024-11-07 12:41:54 +010024import traceback
rshri932105f2024-07-05 15:11:55 +000025from osm_lcm.lcm_utils import LcmBase
26from copy import deepcopy
27from osm_lcm import odu_workflows
28from osm_lcm import vim_sdn
rshri948f7de2024-12-02 03:42:35 +000029from osm_lcm.data_utils.list_utils import find_in_list
garciadeblasbc96f382025-01-22 16:02:18 +010030from osm_lcm.n2vc.kubectl import Kubectl
31import yaml
rshri932105f2024-07-05 15:11:55 +000032
yshah2f39b8a2024-12-19 11:06:24 +000033MAP_PROFILE = {
34 "infra_controller_profiles": "infra-controllers",
35 "infra_config_profiles": "infra-configs",
36 "resource_profiles": "managed_resources",
37 "app_profiles": "apps",
38}
39
rshri932105f2024-07-05 15:11:55 +000040
garciadeblas72412282024-11-07 12:41:54 +010041class GitOpsLcm(LcmBase):
garciadeblasea865ff2024-11-20 12:42:49 +010042 db_collection = "gitops"
yshah564ec9c2024-11-29 07:33:32 +000043 workflow_status = None
44 resource_status = None
45
46 profile_collection_mapping = {
47 "infra_controller_profiles": "k8sinfra_controller",
48 "infra_config_profiles": "k8sinfra_config",
49 "resource_profiles": "k8sresource",
50 "app_profiles": "k8sapp",
51 }
garciadeblasea865ff2024-11-20 12:42:49 +010052
garciadeblas72412282024-11-07 12:41:54 +010053 def __init__(self, msg, lcm_tasks, config):
54 self.logger = logging.getLogger("lcm.gitops")
55 self.lcm_tasks = lcm_tasks
56 self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
57 self._checkloop_kustomization_timeout = 900
58 self._checkloop_resource_timeout = 900
59 self._workflows = {}
60 super().__init__(msg, self.logger)
61
62 async def check_dummy_operation(self, op_id, op_params, content):
63 self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
64 return True, "OK"
65
garciadeblasea865ff2024-11-20 12:42:49 +010066 def initialize_operation(self, item_id, op_id):
67 db_item = self.db.get_one(self.db_collection, {"_id": item_id})
68 operation = next(
69 (op for op in db_item.get("operationHistory", []) if op["op_id"] == op_id),
70 None,
71 )
72 operation["workflowState"] = "PROCESSING"
73 operation["resourceState"] = "NOT_READY"
74 operation["operationState"] = "IN_PROGRESS"
75 operation["gitOperationInfo"] = None
76 db_item["current_operation"] = operation["op_id"]
77 self.db.set_one(self.db_collection, {"_id": item_id}, db_item)
78
yshah564ec9c2024-11-29 07:33:32 +000079 def get_operation_params(self, item, operation_id):
80 operation_history = item.get("operationHistory", [])
81 operation = find_in_list(
82 operation_history, lambda op: op["op_id"] == operation_id
83 )
84 return operation.get("operationParams", {})
85
86 def get_operation_type(self, item, operation_id):
87 operation_history = item.get("operationHistory", [])
88 operation = find_in_list(
89 operation_history, lambda op: op["op_id"] == operation_id
90 )
91 return operation.get("operationType", {})
92
garciadeblasbe890702024-12-20 11:39:13 +010093 def update_state_operation_history(
94 self, content, op_id, workflow_state=None, resource_state=None
95 ):
96 self.logger.info(
97 f"Update state of operation {op_id} in Operation History in DB"
98 )
99 self.logger.info(
100 f"Workflow state: {workflow_state}. Resource state: {resource_state}"
101 )
102 self.logger.debug(f"Content: {content}")
103
104 op_num = 0
105 for operation in content["operationHistory"]:
106 self.logger.debug("Operations: {}".format(operation))
107 if operation["op_id"] == op_id:
108 self.logger.debug("Found operation number: {}".format(op_num))
109 if workflow_state is not None:
110 operation["workflowState"] = workflow_state
111
112 if resource_state is not None:
113 operation["resourceState"] = resource_state
114 break
115 op_num += 1
116 self.logger.debug("content: {}".format(content))
117
118 return content
119
garciadeblas7eae6f42024-11-08 10:41:38 +0100120 def update_operation_history(
garciadeblasf9092892024-12-12 11:07:08 +0100121 self, content, op_id, workflow_status=None, resource_status=None, op_end=True
garciadeblas7eae6f42024-11-08 10:41:38 +0100122 ):
123 self.logger.info(
124 f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
125 )
126 self.logger.debug(f"Content: {content}")
127
garciadeblas7eae6f42024-11-08 10:41:38 +0100128 op_num = 0
129 for operation in content["operationHistory"]:
130 self.logger.debug("Operations: {}".format(operation))
131 if operation["op_id"] == op_id:
132 self.logger.debug("Found operation number: {}".format(op_num))
garciadeblas8bde3f42024-12-20 10:37:12 +0100133 if workflow_status is not None:
134 if workflow_status:
135 operation["workflowState"] = "COMPLETED"
136 operation["result"] = True
137 else:
138 operation["workflowState"] = "ERROR"
139 operation["operationState"] = "FAILED"
140 operation["result"] = False
garciadeblas7eae6f42024-11-08 10:41:38 +0100141
garciadeblas8bde3f42024-12-20 10:37:12 +0100142 if resource_status is not None:
143 if resource_status:
144 operation["resourceState"] = "READY"
145 operation["operationState"] = "COMPLETED"
146 operation["result"] = True
147 else:
148 operation["resourceState"] = "NOT_READY"
149 operation["operationState"] = "FAILED"
150 operation["result"] = False
garciadeblas7eae6f42024-11-08 10:41:38 +0100151
garciadeblasf9092892024-12-12 11:07:08 +0100152 if op_end:
153 now = time()
154 operation["endDate"] = now
garciadeblas7eae6f42024-11-08 10:41:38 +0100155 break
156 op_num += 1
157 self.logger.debug("content: {}".format(content))
158
159 return content
160
garciadeblas713e1962025-01-17 12:49:19 +0100161 async def check_workflow_and_update_db(self, op_id, workflow_name, db_content):
yshah564ec9c2024-11-29 07:33:32 +0000162 workflow_status, workflow_msg = await self.odu.check_workflow_status(
163 workflow_name
164 )
165 self.logger.info(
166 "Workflow Status: {} Workflow Message: {}".format(
167 workflow_status, workflow_msg
168 )
169 )
170 operation_type = self.get_operation_type(db_content, op_id)
171 if operation_type == "create" and workflow_status:
172 db_content["state"] = "CREATED"
173 elif operation_type == "create" and not workflow_status:
174 db_content["state"] = "FAILED_CREATION"
175 elif operation_type == "delete" and workflow_status:
176 db_content["state"] = "DELETED"
177 elif operation_type == "delete" and not workflow_status:
178 db_content["state"] = "FAILED_DELETION"
179
180 if workflow_status:
181 db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
182 else:
183 db_content["resourceState"] = "ERROR"
184
185 db_content = self.update_operation_history(
186 db_content, op_id, workflow_status, None
187 )
188 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
189 return workflow_status
190
garciadeblas713e1962025-01-17 12:49:19 +0100191 async def check_resource_and_update_db(
192 self, resource_name, op_id, op_params, db_content
193 ):
yshah564ec9c2024-11-29 07:33:32 +0000194 workflow_status = True
195
196 resource_status, resource_msg = await self.check_resource_status(
197 resource_name, op_id, op_params, db_content
198 )
199 self.logger.info(
200 "Resource Status: {} Resource Message: {}".format(
201 resource_status, resource_msg
202 )
203 )
204
205 if resource_status:
206 db_content["resourceState"] = "READY"
207 else:
208 db_content["resourceState"] = "ERROR"
209
210 db_content = self.update_operation_history(
211 db_content, op_id, workflow_status, resource_status
212 )
213 db_content["operatingState"] = "IDLE"
214 db_content["current_operation"] = None
215 return resource_status, db_content
216
garciadeblasbc96f382025-01-22 16:02:18 +0100217 async def common_check_list(
218 self, op_id, checkings_list, db_collection, db_item, kubectl=None
219 ):
garciadeblas72412282024-11-07 12:41:54 +0100220 try:
221 for checking in checkings_list:
222 if checking["enable"]:
223 status, message = await self.odu.readiness_loop(
224 item=checking["item"],
225 name=checking["name"],
226 namespace=checking["namespace"],
garciadeblas6c82c352025-01-27 16:53:45 +0100227 condition=checking.get("condition"),
garciadeblasbc96f382025-01-22 16:02:18 +0100228 deleted=checking.get("deleted", False),
garciadeblas72412282024-11-07 12:41:54 +0100229 timeout=checking["timeout"],
garciadeblasbc96f382025-01-22 16:02:18 +0100230 kubectl=kubectl,
garciadeblas72412282024-11-07 12:41:54 +0100231 )
232 if not status:
garciadeblasc5e9d572025-01-21 18:48:58 +0100233 error_message = "Resources not ready: "
234 error_message += checking.get("error_message", "")
235 return status, f"{error_message}: {message}"
garciadeblas7eae6f42024-11-08 10:41:38 +0100236 else:
237 db_item["resourceState"] = checking["resourceState"]
garciadeblasbe890702024-12-20 11:39:13 +0100238 db_item = self.update_state_operation_history(
239 db_item, op_id, None, checking["resourceState"]
garciadeblas7eae6f42024-11-08 10:41:38 +0100240 )
241 self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
garciadeblas72412282024-11-07 12:41:54 +0100242 except Exception as e:
243 self.logger.debug(traceback.format_exc())
244 self.logger.debug(f"Exception: {e}", exc_info=True)
245 return False, f"Unexpected exception: {e}"
246 return True, "OK"
247
248 async def check_resource_status(self, key, op_id, op_params, content):
249 self.logger.info(
garciadeblasbc96f382025-01-22 16:02:18 +0100250 f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
garciadeblas72412282024-11-07 12:41:54 +0100251 )
garciadeblasbc96f382025-01-22 16:02:18 +0100252 self.logger.debug(f"Check resource status. Content: {content}")
garciadeblas72412282024-11-07 12:41:54 +0100253 check_resource_function = self._workflows.get(key, {}).get(
254 "check_resource_function"
255 )
256 self.logger.info("check_resource function : {}".format(check_resource_function))
257 if check_resource_function:
258 return await check_resource_function(op_id, op_params, content)
259 else:
260 return await self.check_dummy_operation(op_id, op_params, content)
261
garciadeblas995cbf32024-12-18 12:54:00 +0100262 def decrypted_copy(self, content, fields=["age_pubkey", "age_privkey"]):
263 # This deep copy is intended to be passed to ODU workflows.
264 content_copy = copy.deepcopy(content)
rshric3564942024-11-12 18:12:38 +0000265
266 # decrypting the key
267 self.db.encrypt_decrypt_fields(
garciadeblas995cbf32024-12-18 12:54:00 +0100268 content_copy,
rshric3564942024-11-12 18:12:38 +0000269 "decrypt",
garciadeblas995cbf32024-12-18 12:54:00 +0100270 fields,
rshric3564942024-11-12 18:12:38 +0000271 schema_version="1.11",
garciadeblas995cbf32024-12-18 12:54:00 +0100272 salt=content_copy["_id"],
rshric3564942024-11-12 18:12:38 +0000273 )
garciadeblas995cbf32024-12-18 12:54:00 +0100274 return content_copy
rshric3564942024-11-12 18:12:38 +0000275
garciadeblasbc96f382025-01-22 16:02:18 +0100276 def cluster_kubectl(self, db_cluster):
277 cluster_kubeconfig = db_cluster["credentials"]
278 kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
279 with open(kubeconfig_path, "w") as kubeconfig_file:
280 yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
281 return Kubectl(config_file=kubeconfig_path)
282
garciadeblas72412282024-11-07 12:41:54 +0100283
284class ClusterLcm(GitOpsLcm):
garciadeblas96b94f52024-07-08 16:18:21 +0200285 db_collection = "clusters"
rshri932105f2024-07-05 15:11:55 +0000286
287 def __init__(self, msg, lcm_tasks, config):
288 """
289 Init, Connect to database, filesystem storage, and messaging
290 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
291 :return: None
292 """
garciadeblas72412282024-11-07 12:41:54 +0100293 super().__init__(msg, lcm_tasks, config)
294 self._workflows = {
295 "create_cluster": {
296 "check_resource_function": self.check_create_cluster,
297 },
garciadeblasb0a42c22024-11-13 16:00:10 +0100298 "register_cluster": {
299 "check_resource_function": self.check_register_cluster,
300 },
301 "update_cluster": {
302 "check_resource_function": self.check_update_cluster,
303 },
garciadeblasbc96f382025-01-22 16:02:18 +0100304 "delete_cluster": {
305 "check_resource_function": self.check_delete_cluster,
306 },
garciadeblas72412282024-11-07 12:41:54 +0100307 }
rshri932105f2024-07-05 15:11:55 +0000308 self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
309
rshri948f7de2024-12-02 03:42:35 +0000310 async def create(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000311 self.logger.info("cluster Create Enter")
312
garciadeblas995cbf32024-12-18 12:54:00 +0100313 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000314 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000315 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000316
317 # To initialize the operation states
318 self.initialize_operation(cluster_id, op_id)
319
garciadeblas995cbf32024-12-18 12:54:00 +0100320 # To get the cluster
321 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
322
323 # To get the operation params details
324 op_params = self.get_operation_params(db_cluster, op_id)
325
326 # To copy the cluster content and decrypting fields to use in workflows
327 workflow_content = {
328 "cluster": self.decrypted_copy(db_cluster),
329 }
rshric3564942024-11-12 18:12:38 +0000330
rshri948f7de2024-12-02 03:42:35 +0000331 # To get the vim account details
rshric3564942024-11-12 18:12:38 +0000332 db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
garciadeblas995cbf32024-12-18 12:54:00 +0100333 workflow_content["vim_account"] = db_vim
rshric3564942024-11-12 18:12:38 +0000334
garciadeblasadb81e82024-11-08 01:11:46 +0100335 _, workflow_name = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100336 "create_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200337 )
rshri932105f2024-07-05 15:11:55 +0000338 self.logger.info("workflow_name is :{}".format(workflow_name))
339
garciadeblas96b94f52024-07-08 16:18:21 +0200340 workflow_status, workflow_msg = await self.odu.check_workflow_status(
341 workflow_name
342 )
rshri932105f2024-07-05 15:11:55 +0000343 self.logger.info(
344 "workflow_status is :{} and workflow_msg is :{}".format(
345 workflow_status, workflow_msg
346 )
347 )
348 if workflow_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200349 db_cluster["state"] = "CREATED"
350 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
rshri932105f2024-07-05 15:11:55 +0000351 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200352 db_cluster["state"] = "FAILED_CREATION"
353 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000354 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000355 db_cluster = self.update_operation_history(
356 db_cluster, op_id, workflow_status, None
357 )
garciadeblas96b94f52024-07-08 16:18:21 +0200358 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +0000359
garciadeblas28bff0f2024-09-16 12:53:07 +0200360 # Clean items used in the workflow, no matter if the workflow succeeded
361 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100362 "create_cluster", op_id, op_params, workflow_content
garciadeblas28bff0f2024-09-16 12:53:07 +0200363 )
364 self.logger.info(
365 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
366 )
367
rshri932105f2024-07-05 15:11:55 +0000368 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +0100369 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +0100370 "create_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +0000371 )
372 self.logger.info(
373 "resource_status is :{} and resource_msg is :{}".format(
374 resource_status, resource_msg
375 )
376 )
377 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200378 db_cluster["resourceState"] = "READY"
rshri932105f2024-07-05 15:11:55 +0000379 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200380 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000381
garciadeblas96b94f52024-07-08 16:18:21 +0200382 db_cluster["operatingState"] = "IDLE"
383 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000384 db_cluster, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +0000385 )
shahithya70a3fc92024-11-12 11:01:05 +0000386 db_cluster["current_operation"] = None
garciadeblas3e5eeec2025-01-21 11:49:38 +0100387
388 # Retrieve credentials
389 cluster_creds = None
390 if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED":
391 result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
392 # TODO: manage the case where the credentials are not available
393 if result:
394 db_cluster["credentials"] = cluster_creds
395
396 # Update db_cluster
garciadeblas96b94f52024-07-08 16:18:21 +0200397 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
398 self.update_profile_state(db_cluster, workflow_status, resource_status)
rshri948f7de2024-12-02 03:42:35 +0000399
garciadeblas3e5eeec2025-01-21 11:49:38 +0100400 # Register the cluster in k8sclusters collection
rshri948f7de2024-12-02 03:42:35 +0000401 db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]})
garciadeblas3e5eeec2025-01-21 11:49:38 +0100402 if cluster_creds:
rshri948f7de2024-12-02 03:42:35 +0000403 db_register["credentials"] = cluster_creds
garciadeblas3e5eeec2025-01-21 11:49:38 +0100404 # To call the lcm.py for registering the cluster in k8scluster lcm.
rshri948f7de2024-12-02 03:42:35 +0000405 self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
406 register = await self.regist.create(db_register, order_id)
407 self.logger.debug(f"Register is : {register}")
408 else:
409 db_register["_admin"]["operationalState"] = "ERROR"
410 result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
411 # To call the lcm.py for registering the cluster in k8scluster lcm.
412 db_register["credentials"] = cluster_creds
413 self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
414
rshri932105f2024-07-05 15:11:55 +0000415 return
416
garciadeblas72412282024-11-07 12:41:54 +0100417 async def check_create_cluster(self, op_id, op_params, content):
garciadeblas7eae6f42024-11-08 10:41:38 +0100418 self.logger.info(
419 f"check_create_cluster Operation {op_id}. Params: {op_params}."
420 )
421 # self.logger.debug(f"Content: {content}")
garciadeblas72412282024-11-07 12:41:54 +0100422 db_cluster = content["cluster"]
423 cluster_name = db_cluster["git_name"].lower()
424 cluster_kustomization_name = cluster_name
425 db_vim_account = content["vim_account"]
426 cloud_type = db_vim_account["vim_type"]
427 nodepool_name = ""
428 if cloud_type == "aws":
429 nodepool_name = f"{cluster_name}-nodegroup"
430 cluster_name = f"{cluster_name}-cluster"
431 elif cloud_type == "gcp":
432 nodepool_name = f"nodepool-{cluster_name}"
433 bootstrap = op_params.get("bootstrap", True)
434 if cloud_type in ("azure", "gcp", "aws"):
435 checkings_list = [
436 {
437 "item": "kustomization",
438 "name": cluster_kustomization_name,
439 "namespace": "managed-resources",
garciadeblas6c82c352025-01-27 16:53:45 +0100440 "condition": {
441 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
442 "value": "True",
443 },
yshahcb9075f2024-11-22 12:08:57 +0000444 "timeout": 1500,
garciadeblas72412282024-11-07 12:41:54 +0100445 "enable": True,
garciadeblas7eae6f42024-11-08 10:41:38 +0100446 "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
garciadeblas72412282024-11-07 12:41:54 +0100447 },
448 {
449 "item": f"cluster_{cloud_type}",
450 "name": cluster_name,
451 "namespace": "",
garciadeblas6c82c352025-01-27 16:53:45 +0100452 "condition": {
453 "jsonpath_filter": "status.conditions[?(@.type=='Synced')].status",
454 "value": "True",
455 },
garciadeblas72412282024-11-07 12:41:54 +0100456 "timeout": self._checkloop_resource_timeout,
457 "enable": True,
garciadeblas7eae6f42024-11-08 10:41:38 +0100458 "resourceState": "IN_PROGRESS.RESOURCE_SYNCED.CLUSTER",
garciadeblas72412282024-11-07 12:41:54 +0100459 },
460 {
461 "item": f"cluster_{cloud_type}",
462 "name": cluster_name,
463 "namespace": "",
garciadeblas6c82c352025-01-27 16:53:45 +0100464 "condition": {
465 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
466 "value": "True",
467 },
garciadeblas72412282024-11-07 12:41:54 +0100468 "timeout": self._checkloop_resource_timeout,
469 "enable": True,
garciadeblas7eae6f42024-11-08 10:41:38 +0100470 "resourceState": "IN_PROGRESS.RESOURCE_READY.CLUSTER",
garciadeblas72412282024-11-07 12:41:54 +0100471 },
472 {
473 "item": "kustomization",
474 "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
475 "namespace": "managed-resources",
garciadeblas6c82c352025-01-27 16:53:45 +0100476 "condition": {
477 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
478 "value": "True",
479 },
yshahcb9075f2024-11-22 12:08:57 +0000480 "timeout": self._checkloop_resource_timeout,
garciadeblas72412282024-11-07 12:41:54 +0100481 "enable": bootstrap,
garciadeblas7eae6f42024-11-08 10:41:38 +0100482 "resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
garciadeblas72412282024-11-07 12:41:54 +0100483 },
484 ]
485 else:
486 return False, "Not suitable VIM account to check cluster status"
487 if nodepool_name:
488 nodepool_check = {
489 "item": f"nodepool_{cloud_type}",
490 "name": nodepool_name,
491 "namespace": "",
garciadeblas6c82c352025-01-27 16:53:45 +0100492 "condition": {
493 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
494 "value": "True",
495 },
garciadeblas72412282024-11-07 12:41:54 +0100496 "timeout": self._checkloop_resource_timeout,
497 "enable": True,
garciadeblas7eae6f42024-11-08 10:41:38 +0100498 "resourceState": "IN_PROGRESS.RESOURCE_READY.NODEPOOL",
garciadeblas72412282024-11-07 12:41:54 +0100499 }
500 checkings_list.insert(3, nodepool_check)
yshahcb9075f2024-11-22 12:08:57 +0000501 return await self.common_check_list(
502 op_id, checkings_list, "clusters", db_cluster
503 )
garciadeblas72412282024-11-07 12:41:54 +0100504
garciadeblas96b94f52024-07-08 16:18:21 +0200505 def update_profile_state(self, db_cluster, workflow_status, resource_status):
rshri932105f2024-07-05 15:11:55 +0000506 profiles = [
507 "infra_controller_profiles",
508 "infra_config_profiles",
509 "app_profiles",
510 "resource_profiles",
511 ]
rshri948f7de2024-12-02 03:42:35 +0000512 """
rshri932105f2024-07-05 15:11:55 +0000513 profiles_collection = {
514 "infra_controller_profiles": "k8sinfra_controller",
515 "infra_config_profiles": "k8sinfra_config",
516 "app_profiles": "k8sapp",
517 "resource_profiles": "k8sresource",
518 }
rshri948f7de2024-12-02 03:42:35 +0000519 """
Your Name86149632024-11-14 16:17:16 +0000520 self.logger.info("the db_cluster is :{}".format(db_cluster))
rshri932105f2024-07-05 15:11:55 +0000521 for profile_type in profiles:
garciadeblas96b94f52024-07-08 16:18:21 +0200522 profile_id = db_cluster[profile_type]
rshri948f7de2024-12-02 03:42:35 +0000523 # db_collection = profiles_collection[profile_type]
524 db_collection = self.profile_collection_mapping[profile_type]
rshri932105f2024-07-05 15:11:55 +0000525 db_profile = self.db.get_one(db_collection, {"_id": profile_id})
yshahcb9075f2024-11-22 12:08:57 +0000526 op_id = db_profile["operationHistory"][-1].get("op_id")
garciadeblas96b94f52024-07-08 16:18:21 +0200527 db_profile["state"] = db_cluster["state"]
528 db_profile["resourceState"] = db_cluster["resourceState"]
529 db_profile["operatingState"] = db_cluster["operatingState"]
rshric3564942024-11-12 18:12:38 +0000530 db_profile["age_pubkey"] = db_cluster["age_pubkey"]
Your Name86149632024-11-14 16:17:16 +0000531 db_profile["age_privkey"] = db_cluster["age_privkey"]
rshri932105f2024-07-05 15:11:55 +0000532 db_profile = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000533 db_profile, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +0000534 )
rshri932105f2024-07-05 15:11:55 +0000535 self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
536
rshri948f7de2024-12-02 03:42:35 +0000537 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000538 self.logger.info("cluster delete Enter")
rshri948f7de2024-12-02 03:42:35 +0000539
garciadeblas995cbf32024-12-18 12:54:00 +0100540 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000541 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000542 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000543
544 # To initialize the operation states
545 self.initialize_operation(cluster_id, op_id)
546
garciadeblas995cbf32024-12-18 12:54:00 +0100547 # To get the cluster
548 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
549
550 # To get the operation params details
551 op_params = self.get_operation_params(db_cluster, op_id)
552
553 # To copy the cluster content and decrypting fields to use in workflows
554 workflow_content = {
555 "cluster": self.decrypted_copy(db_cluster),
556 }
rshri948f7de2024-12-02 03:42:35 +0000557
garciadeblasbc96f382025-01-22 16:02:18 +0100558 # To get the vim account details
559 db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
560 workflow_content["vim_account"] = db_vim
561
garciadeblas6b2112c2024-12-20 10:35:13 +0100562 # TODO: workaround until NBI rejects cluster deletion requests for registered clusters
563 # This if clause will be removed
garciadeblas12470812024-11-18 10:33:12 +0100564 if db_cluster["created"] == "false":
rshri948f7de2024-12-02 03:42:35 +0000565 return await self.deregister(params, order_id)
rshri932105f2024-07-05 15:11:55 +0000566
garciadeblasadb81e82024-11-08 01:11:46 +0100567 _, workflow_name = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100568 "delete_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200569 )
rshri932105f2024-07-05 15:11:55 +0000570 self.logger.info("workflow_name is :{}".format(workflow_name))
571
garciadeblas96b94f52024-07-08 16:18:21 +0200572 workflow_status, workflow_msg = await self.odu.check_workflow_status(
573 workflow_name
574 )
rshri932105f2024-07-05 15:11:55 +0000575 self.logger.info(
576 "workflow_status is :{} and workflow_msg is :{}".format(
577 workflow_status, workflow_msg
578 )
579 )
580 if workflow_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200581 db_cluster["state"] = "DELETED"
582 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
rshri932105f2024-07-05 15:11:55 +0000583 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200584 db_cluster["state"] = "FAILED_DELETION"
585 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000586 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000587 db_cluster = self.update_operation_history(
588 db_cluster, op_id, workflow_status, None
589 )
garciadeblas96b94f52024-07-08 16:18:21 +0200590 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +0000591
garciadeblas98f9a3d2024-12-10 13:42:47 +0100592 # Clean items used in the workflow or in the cluster, no matter if the workflow succeeded
593 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100594 "delete_cluster", op_id, op_params, workflow_content
garciadeblas98f9a3d2024-12-10 13:42:47 +0100595 )
596 self.logger.info(
597 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
598 )
599
rshri932105f2024-07-05 15:11:55 +0000600 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +0100601 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +0100602 "delete_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +0000603 )
604 self.logger.info(
605 "resource_status is :{} and resource_msg is :{}".format(
606 resource_status, resource_msg
607 )
608 )
609 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200610 db_cluster["resourceState"] = "READY"
rshri932105f2024-07-05 15:11:55 +0000611 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200612 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000613
garciadeblas96b94f52024-07-08 16:18:21 +0200614 db_cluster["operatingState"] = "IDLE"
615 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000616 db_cluster, op_id, workflow_status, resource_status
garciadeblas96b94f52024-07-08 16:18:21 +0200617 )
shahithya70a3fc92024-11-12 11:01:05 +0000618 db_cluster["current_operation"] = None
garciadeblas96b94f52024-07-08 16:18:21 +0200619 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +0000620
garciadeblas96b94f52024-07-08 16:18:21 +0200621 # To delete it from DB
622 if db_cluster["state"] == "DELETED":
623 self.delete_cluster(db_cluster)
rshri948f7de2024-12-02 03:42:35 +0000624
625 # To delete it from k8scluster collection
626 self.db.del_one("k8sclusters", {"name": db_cluster["name"]})
627
rshri932105f2024-07-05 15:11:55 +0000628 return
629
garciadeblasbc96f382025-01-22 16:02:18 +0100630 async def check_delete_cluster(self, op_id, op_params, content):
631 self.logger.info(
632 f"check_delete_cluster Operation {op_id}. Params: {op_params}."
633 )
634 self.logger.debug(f"Content: {content}")
635 db_cluster = content["cluster"]
636 cluster_name = db_cluster["git_name"].lower()
637 cluster_kustomization_name = cluster_name
638 db_vim_account = content["vim_account"]
639 cloud_type = db_vim_account["vim_type"]
640 if cloud_type == "aws":
641 cluster_name = f"{cluster_name}-cluster"
642 if cloud_type in ("azure", "gcp", "aws"):
643 checkings_list = [
644 {
645 "item": "kustomization",
646 "name": cluster_kustomization_name,
647 "namespace": "managed-resources",
648 "deleted": True,
649 "timeout": self._checkloop_kustomization_timeout,
650 "enable": True,
651 "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
652 },
653 {
654 "item": f"cluster_{cloud_type}",
655 "name": cluster_name,
656 "namespace": "",
657 "deleted": True,
658 "timeout": self._checkloop_resource_timeout,
659 "enable": True,
660 "resourceState": "IN_PROGRESS.RESOURCE_DELETED.CLUSTER",
661 },
662 ]
663 else:
664 return False, "Not suitable VIM account to check cluster status"
665 return await self.common_check_list(
666 op_id, checkings_list, "clusters", db_cluster
667 )
668
garciadeblas96b94f52024-07-08 16:18:21 +0200669 def delete_cluster(self, db_cluster):
670 # Actually, item_content is equal to db_cluster
671 # item_content = self.db.get_one("clusters", {"_id": db_cluster["_id"]})
672 # self.logger.debug("item_content is : {}".format(item_content))
rshri932105f2024-07-05 15:11:55 +0000673
rshri932105f2024-07-05 15:11:55 +0000674 # detach profiles
675 update_dict = None
676 profiles_to_detach = [
677 "infra_controller_profiles",
678 "infra_config_profiles",
679 "app_profiles",
680 "resource_profiles",
681 ]
rshri948f7de2024-12-02 03:42:35 +0000682 """
rshri932105f2024-07-05 15:11:55 +0000683 profiles_collection = {
684 "infra_controller_profiles": "k8sinfra_controller",
685 "infra_config_profiles": "k8sinfra_config",
686 "app_profiles": "k8sapp",
687 "resource_profiles": "k8sresource",
688 }
rshri948f7de2024-12-02 03:42:35 +0000689 """
rshri932105f2024-07-05 15:11:55 +0000690 for profile_type in profiles_to_detach:
garciadeblas96b94f52024-07-08 16:18:21 +0200691 if db_cluster.get(profile_type):
garciadeblas96b94f52024-07-08 16:18:21 +0200692 profile_ids = db_cluster[profile_type]
rshri932105f2024-07-05 15:11:55 +0000693 profile_ids_copy = deepcopy(profile_ids)
rshri932105f2024-07-05 15:11:55 +0000694 for profile_id in profile_ids_copy:
rshri948f7de2024-12-02 03:42:35 +0000695 # db_collection = profiles_collection[profile_type]
696 db_collection = self.profile_collection_mapping[profile_type]
rshri932105f2024-07-05 15:11:55 +0000697 db_profile = self.db.get_one(db_collection, {"_id": profile_id})
garciadeblasc2552852024-10-22 12:39:32 +0200698 self.logger.debug("the db_profile is :{}".format(db_profile))
699 self.logger.debug(
garciadeblas96b94f52024-07-08 16:18:21 +0200700 "the item_content name is :{}".format(db_cluster["name"])
rshri932105f2024-07-05 15:11:55 +0000701 )
garciadeblasc2552852024-10-22 12:39:32 +0200702 self.logger.debug(
rshri932105f2024-07-05 15:11:55 +0000703 "the db_profile name is :{}".format(db_profile["name"])
704 )
garciadeblas96b94f52024-07-08 16:18:21 +0200705 if db_cluster["name"] == db_profile["name"]:
rshri932105f2024-07-05 15:11:55 +0000706 self.db.del_one(db_collection, {"_id": profile_id})
707 else:
rshri932105f2024-07-05 15:11:55 +0000708 profile_ids.remove(profile_id)
709 update_dict = {profile_type: profile_ids}
rshri932105f2024-07-05 15:11:55 +0000710 self.db.set_one(
garciadeblas96b94f52024-07-08 16:18:21 +0200711 "clusters", {"_id": db_cluster["_id"]}, update_dict
rshri932105f2024-07-05 15:11:55 +0000712 )
garciadeblas96b94f52024-07-08 16:18:21 +0200713 self.db.del_one("clusters", {"_id": db_cluster["_id"]})
rshri932105f2024-07-05 15:11:55 +0000714
rshri948f7de2024-12-02 03:42:35 +0000715 async def attach_profile(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000716 self.logger.info("profile attach Enter")
rshri948f7de2024-12-02 03:42:35 +0000717
garciadeblas995cbf32024-12-18 12:54:00 +0100718 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000719 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000720 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000721
722 # To initialize the operation states
723 self.initialize_operation(cluster_id, op_id)
724
garciadeblas995cbf32024-12-18 12:54:00 +0100725 # To get the cluster
726 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
727
728 # To get the operation params details
729 op_params = self.get_operation_params(db_cluster, op_id)
730
731 # To copy the cluster content and decrypting fields to use in workflows
732 workflow_content = {
733 "cluster": self.decrypted_copy(db_cluster),
734 }
rshri948f7de2024-12-02 03:42:35 +0000735
736 # To get the profile details
737 profile_id = params["profile_id"]
738 profile_type = params["profile_type"]
739 profile_collection = self.profile_collection_mapping[profile_type]
740 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
741 db_profile["profile_type"] = profile_type
742 # content["profile"] = db_profile
garciadeblas995cbf32024-12-18 12:54:00 +0100743 workflow_content["profile"] = db_profile
rshri932105f2024-07-05 15:11:55 +0000744
garciadeblasadb81e82024-11-08 01:11:46 +0100745 _, workflow_name = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100746 "attach_profile_to_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200747 )
rshri932105f2024-07-05 15:11:55 +0000748 self.logger.info("workflow_name is :{}".format(workflow_name))
749
garciadeblas96b94f52024-07-08 16:18:21 +0200750 workflow_status, workflow_msg = await self.odu.check_workflow_status(
751 workflow_name
752 )
rshri932105f2024-07-05 15:11:55 +0000753 self.logger.info(
754 "workflow_status is :{} and workflow_msg is :{}".format(
755 workflow_status, workflow_msg
756 )
757 )
758 if workflow_status:
759 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
760 else:
761 db_cluster["resourceState"] = "ERROR"
762 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000763 db_cluster = self.update_operation_history(
764 db_cluster, op_id, workflow_status, None
765 )
rshri932105f2024-07-05 15:11:55 +0000766 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
767
768 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +0100769 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +0100770 "attach_profile_to_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +0000771 )
772 self.logger.info(
773 "resource_status is :{} and resource_msg is :{}".format(
774 resource_status, resource_msg
775 )
776 )
777 if resource_status:
778 db_cluster["resourceState"] = "READY"
779 else:
780 db_cluster["resourceState"] = "ERROR"
781
782 db_cluster["operatingState"] = "IDLE"
783 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000784 db_cluster, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +0000785 )
rshri932105f2024-07-05 15:11:55 +0000786 profile_list = db_cluster[profile_type]
rshri932105f2024-07-05 15:11:55 +0000787 if resource_status:
rshri932105f2024-07-05 15:11:55 +0000788 profile_list.append(profile_id)
rshri932105f2024-07-05 15:11:55 +0000789 db_cluster[profile_type] = profile_list
shahithya70a3fc92024-11-12 11:01:05 +0000790 db_cluster["current_operation"] = None
rshri932105f2024-07-05 15:11:55 +0000791 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
792
793 return
794
rshri948f7de2024-12-02 03:42:35 +0000795 async def detach_profile(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000796 self.logger.info("profile dettach Enter")
rshri948f7de2024-12-02 03:42:35 +0000797
garciadeblas995cbf32024-12-18 12:54:00 +0100798 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000799 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000800 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000801
802 # To initialize the operation states
803 self.initialize_operation(cluster_id, op_id)
804
garciadeblas995cbf32024-12-18 12:54:00 +0100805 # To get the cluster
806 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
807
808 # To get the operation params details
809 op_params = self.get_operation_params(db_cluster, op_id)
810
811 # To copy the cluster content and decrypting fields to use in workflows
812 workflow_content = {
813 "cluster": self.decrypted_copy(db_cluster),
814 }
rshri948f7de2024-12-02 03:42:35 +0000815
816 # To get the profile details
817 profile_id = params["profile_id"]
818 profile_type = params["profile_type"]
819 profile_collection = self.profile_collection_mapping[profile_type]
820 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
821 db_profile["profile_type"] = profile_type
822 # content["profile"] = db_profile
garciadeblas995cbf32024-12-18 12:54:00 +0100823 workflow_content["profile"] = db_profile
rshri932105f2024-07-05 15:11:55 +0000824
garciadeblasadb81e82024-11-08 01:11:46 +0100825 _, workflow_name = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100826 "detach_profile_from_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200827 )
rshri932105f2024-07-05 15:11:55 +0000828 self.logger.info("workflow_name is :{}".format(workflow_name))
829
garciadeblas96b94f52024-07-08 16:18:21 +0200830 workflow_status, workflow_msg = await self.odu.check_workflow_status(
831 workflow_name
832 )
rshri932105f2024-07-05 15:11:55 +0000833 self.logger.info(
834 "workflow_status is :{} and workflow_msg is :{}".format(
835 workflow_status, workflow_msg
836 )
837 )
838 if workflow_status:
839 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
840 else:
841 db_cluster["resourceState"] = "ERROR"
842 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000843 db_cluster = self.update_operation_history(
844 db_cluster, op_id, workflow_status, None
845 )
rshri932105f2024-07-05 15:11:55 +0000846 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
847
848 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +0100849 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +0100850 "detach_profile_from_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +0000851 )
852 self.logger.info(
853 "resource_status is :{} and resource_msg is :{}".format(
854 resource_status, resource_msg
855 )
856 )
857 if resource_status:
858 db_cluster["resourceState"] = "READY"
859 else:
860 db_cluster["resourceState"] = "ERROR"
861
862 db_cluster["operatingState"] = "IDLE"
863 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000864 db_cluster, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +0000865 )
rshri932105f2024-07-05 15:11:55 +0000866 profile_list = db_cluster[profile_type]
867 self.logger.info("profile list is : {}".format(profile_list))
868 if resource_status:
rshri932105f2024-07-05 15:11:55 +0000869 profile_list.remove(profile_id)
rshri932105f2024-07-05 15:11:55 +0000870 db_cluster[profile_type] = profile_list
shahithya70a3fc92024-11-12 11:01:05 +0000871 db_cluster["current_operation"] = None
rshri932105f2024-07-05 15:11:55 +0000872 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
873
874 return
875
rshri948f7de2024-12-02 03:42:35 +0000876 async def register(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000877 self.logger.info("cluster register enter")
878
garciadeblas995cbf32024-12-18 12:54:00 +0100879 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000880 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000881 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000882
883 # To initialize the operation states
884 self.initialize_operation(cluster_id, op_id)
885
garciadeblas995cbf32024-12-18 12:54:00 +0100886 # To get the cluster
887 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
888
889 # To get the operation params details
890 op_params = self.get_operation_params(db_cluster, op_id)
891
892 # To copy the cluster content and decrypting fields to use in workflows
893 workflow_content = {
894 "cluster": self.decrypted_copy(db_cluster),
895 }
rshric3564942024-11-12 18:12:38 +0000896
garciadeblasadb81e82024-11-08 01:11:46 +0100897 _, workflow_name = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100898 "register_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +0200899 )
rshri932105f2024-07-05 15:11:55 +0000900 self.logger.info("workflow_name is :{}".format(workflow_name))
901
garciadeblas96b94f52024-07-08 16:18:21 +0200902 workflow_status, workflow_msg = await self.odu.check_workflow_status(
903 workflow_name
904 )
rshri932105f2024-07-05 15:11:55 +0000905 self.logger.info(
906 "workflow_status is :{} and workflow_msg is :{}".format(
907 workflow_status, workflow_msg
908 )
909 )
910 if workflow_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200911 db_cluster["state"] = "CREATED"
912 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
rshri932105f2024-07-05 15:11:55 +0000913 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200914 db_cluster["state"] = "FAILED_CREATION"
915 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000916 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +0000917 db_cluster = self.update_operation_history(
918 db_cluster, op_id, workflow_status, None
919 )
garciadeblas96b94f52024-07-08 16:18:21 +0200920 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +0000921
garciadeblasdde3a312024-09-17 13:25:06 +0200922 # Clean items used in the workflow, no matter if the workflow succeeded
923 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +0100924 "register_cluster", op_id, op_params, workflow_content
garciadeblasdde3a312024-09-17 13:25:06 +0200925 )
926 self.logger.info(
927 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
928 )
929
rshri932105f2024-07-05 15:11:55 +0000930 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +0100931 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +0100932 "register_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +0000933 )
934 self.logger.info(
935 "resource_status is :{} and resource_msg is :{}".format(
936 resource_status, resource_msg
937 )
938 )
939 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +0200940 db_cluster["resourceState"] = "READY"
rshri932105f2024-07-05 15:11:55 +0000941 else:
garciadeblas96b94f52024-07-08 16:18:21 +0200942 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +0000943
garciadeblas96b94f52024-07-08 16:18:21 +0200944 db_cluster["operatingState"] = "IDLE"
945 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +0000946 db_cluster, op_id, workflow_status, resource_status
rshri932105f2024-07-05 15:11:55 +0000947 )
shahithya70a3fc92024-11-12 11:01:05 +0000948 db_cluster["current_operation"] = None
garciadeblas96b94f52024-07-08 16:18:21 +0200949 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri948f7de2024-12-02 03:42:35 +0000950
951 db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]})
952 db_register["credentials"] = db_cluster["credentials"]
953 self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
954
955 if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED":
956 # To call the lcm.py for registering the cluster in k8scluster lcm.
957 register = await self.regist.create(db_register, order_id)
958 self.logger.debug(f"Register is : {register}")
959 else:
960 db_register["_admin"]["operationalState"] = "ERROR"
961 self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
962
rshri932105f2024-07-05 15:11:55 +0000963 return
964
garciadeblasbc96f382025-01-22 16:02:18 +0100965 async def check_register_cluster(self, op_id, op_params, content):
966 self.logger.info(
967 f"check_register_cluster Operation {op_id}. Params: {op_params}."
968 )
969 # self.logger.debug(f"Content: {content}")
970 db_cluster = content["cluster"]
971 cluster_name = db_cluster["git_name"].lower()
972 cluster_kustomization_name = cluster_name
973 bootstrap = op_params.get("bootstrap", True)
974 checkings_list = [
975 {
976 "item": "kustomization",
977 "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
978 "namespace": "managed-resources",
garciadeblas6c82c352025-01-27 16:53:45 +0100979 "condition": {
980 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
981 "value": "True",
982 },
garciadeblasbc96f382025-01-22 16:02:18 +0100983 "timeout": self._checkloop_kustomization_timeout,
984 "enable": bootstrap,
985 "resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
986 },
987 ]
988 return await self.common_check_list(
989 op_id, checkings_list, "clusters", db_cluster
990 )
991
rshri948f7de2024-12-02 03:42:35 +0000992 async def deregister(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +0000993 self.logger.info("cluster deregister enter")
994
garciadeblas995cbf32024-12-18 12:54:00 +0100995 # To get the cluster and op ids
rshri948f7de2024-12-02 03:42:35 +0000996 cluster_id = params["cluster_id"]
rshri948f7de2024-12-02 03:42:35 +0000997 op_id = params["operation_id"]
rshri948f7de2024-12-02 03:42:35 +0000998
999 # To initialize the operation states
1000 self.initialize_operation(cluster_id, op_id)
1001
garciadeblas995cbf32024-12-18 12:54:00 +01001002 # To get the cluster
1003 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
1004
1005 # To get the operation params details
1006 op_params = self.get_operation_params(db_cluster, op_id)
1007
1008 # To copy the cluster content and decrypting fields to use in workflows
1009 workflow_content = {
1010 "cluster": self.decrypted_copy(db_cluster),
1011 }
rshri932105f2024-07-05 15:11:55 +00001012
garciadeblasadb81e82024-11-08 01:11:46 +01001013 _, workflow_name = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001014 "deregister_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +02001015 )
rshri932105f2024-07-05 15:11:55 +00001016 self.logger.info("workflow_name is :{}".format(workflow_name))
1017
garciadeblas96b94f52024-07-08 16:18:21 +02001018 workflow_status, workflow_msg = await self.odu.check_workflow_status(
1019 workflow_name
1020 )
rshri932105f2024-07-05 15:11:55 +00001021 self.logger.info(
1022 "workflow_status is :{} and workflow_msg is :{}".format(
1023 workflow_status, workflow_msg
1024 )
1025 )
1026 if workflow_status:
garciadeblas96b94f52024-07-08 16:18:21 +02001027 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
rshri932105f2024-07-05 15:11:55 +00001028 else:
garciadeblas96b94f52024-07-08 16:18:21 +02001029 db_cluster["state"] = "FAILED_DELETION"
1030 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +00001031 # has to call update_operation_history return content
yshahcb9075f2024-11-22 12:08:57 +00001032 db_cluster = self.update_operation_history(
1033 db_cluster, op_id, workflow_status, None
1034 )
garciadeblas96b94f52024-07-08 16:18:21 +02001035 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +00001036
garciadeblas91bb2c42024-11-12 11:17:12 +01001037 # Clean items used in the workflow or in the cluster, no matter if the workflow succeeded
1038 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001039 "deregister_cluster", op_id, op_params, workflow_content
garciadeblas91bb2c42024-11-12 11:17:12 +01001040 )
1041 self.logger.info(
1042 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1043 )
1044
rshri932105f2024-07-05 15:11:55 +00001045 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001046 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +01001047 "deregister_cluster", op_id, op_params, workflow_content
rshri932105f2024-07-05 15:11:55 +00001048 )
1049 self.logger.info(
1050 "resource_status is :{} and resource_msg is :{}".format(
1051 resource_status, resource_msg
1052 )
1053 )
1054 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +02001055 db_cluster["resourceState"] = "READY"
rshri932105f2024-07-05 15:11:55 +00001056 else:
garciadeblas96b94f52024-07-08 16:18:21 +02001057 db_cluster["resourceState"] = "ERROR"
rshri932105f2024-07-05 15:11:55 +00001058
garciadeblas96b94f52024-07-08 16:18:21 +02001059 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +00001060 db_cluster, op_id, workflow_status, resource_status
garciadeblas96b94f52024-07-08 16:18:21 +02001061 )
garciadeblas6b2112c2024-12-20 10:35:13 +01001062 # TODO: workaround until NBI rejects cluster deletion requests for registered clusters
1063 # Setting created flag to true avoids infinite loops when deregistering a cluster
1064 db_cluster["created"] = "true"
garciadeblas96b94f52024-07-08 16:18:21 +02001065 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri932105f2024-07-05 15:11:55 +00001066
garciadeblas98f9a3d2024-12-10 13:42:47 +01001067 return await self.delete(params, order_id)
rshri932105f2024-07-05 15:11:55 +00001068
rshri948f7de2024-12-02 03:42:35 +00001069 async def get_creds(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001070 self.logger.info("Cluster get creds Enter")
rshri948f7de2024-12-02 03:42:35 +00001071 cluster_id = params["cluster_id"]
1072 op_id = params["operation_id"]
1073 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
garciadeblas96b94f52024-07-08 16:18:21 +02001074 result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
1075 if result:
1076 db_cluster["credentials"] = cluster_creds
yshahd940c652024-10-17 06:11:12 +00001077 op_len = 0
1078 for operations in db_cluster["operationHistory"]:
1079 if operations["op_id"] == op_id:
1080 db_cluster["operationHistory"][op_len]["result"] = result
1081 db_cluster["operationHistory"][op_len]["endDate"] = time()
1082 op_len += 1
shahithya70a3fc92024-11-12 11:01:05 +00001083 db_cluster["current_operation"] = None
yshahd940c652024-10-17 06:11:12 +00001084 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
rshri948f7de2024-12-02 03:42:35 +00001085 self.logger.info("Cluster Get Creds Exit")
yshah771dea82024-07-05 15:11:49 +00001086 return
1087
rshri948f7de2024-12-02 03:42:35 +00001088 async def update(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001089 self.logger.info("Cluster update Enter")
rshri948f7de2024-12-02 03:42:35 +00001090 # To get the cluster details
1091 cluster_id = params["cluster_id"]
1092 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
1093
1094 # To get the operation params details
1095 op_id = params["operation_id"]
1096 op_params = self.get_operation_params(db_cluster, op_id)
yshah771dea82024-07-05 15:11:49 +00001097
garciadeblas995cbf32024-12-18 12:54:00 +01001098 # To copy the cluster content and decrypting fields to use in workflows
1099 workflow_content = {
1100 "cluster": self.decrypted_copy(db_cluster),
1101 }
rshric3564942024-11-12 18:12:38 +00001102
1103 # vim account details
1104 db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
garciadeblas995cbf32024-12-18 12:54:00 +01001105 workflow_content["vim_account"] = db_vim
rshric3564942024-11-12 18:12:38 +00001106
garciadeblasadb81e82024-11-08 01:11:46 +01001107 _, workflow_name = await self.odu.launch_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001108 "update_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +02001109 )
1110 workflow_status, workflow_msg = await self.odu.check_workflow_status(
1111 workflow_name
1112 )
1113 self.logger.info(
1114 "Workflow Status: {} Workflow Message: {}".format(
1115 workflow_status, workflow_msg
yshah771dea82024-07-05 15:11:49 +00001116 )
garciadeblas96b94f52024-07-08 16:18:21 +02001117 )
1118
1119 if workflow_status:
1120 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
1121 else:
1122 db_cluster["resourceState"] = "ERROR"
1123
yshahcb9075f2024-11-22 12:08:57 +00001124 db_cluster = self.update_operation_history(
1125 db_cluster, op_id, workflow_status, None
1126 )
garciadeblas96b94f52024-07-08 16:18:21 +02001127 # self.logger.info("Db content: {}".format(db_content))
1128 # self.db.set_one(self.db_collection, {"_id": _id}, db_cluster)
1129 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
1130
garciadeblas28bff0f2024-09-16 12:53:07 +02001131 # Clean items used in the workflow, no matter if the workflow succeeded
1132 clean_status, clean_msg = await self.odu.clean_items_workflow(
garciadeblas995cbf32024-12-18 12:54:00 +01001133 "update_cluster", op_id, op_params, workflow_content
garciadeblas28bff0f2024-09-16 12:53:07 +02001134 )
1135 self.logger.info(
1136 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1137 )
garciadeblas96b94f52024-07-08 16:18:21 +02001138 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001139 resource_status, resource_msg = await self.check_resource_status(
garciadeblas995cbf32024-12-18 12:54:00 +01001140 "update_cluster", op_id, op_params, workflow_content
garciadeblas96b94f52024-07-08 16:18:21 +02001141 )
1142 self.logger.info(
1143 "Resource Status: {} Resource Message: {}".format(
1144 resource_status, resource_msg
1145 )
1146 )
yshah771dea82024-07-05 15:11:49 +00001147
1148 if resource_status:
garciadeblas96b94f52024-07-08 16:18:21 +02001149 db_cluster["resourceState"] = "READY"
yshah771dea82024-07-05 15:11:49 +00001150 else:
garciadeblas96b94f52024-07-08 16:18:21 +02001151 db_cluster["resourceState"] = "ERROR"
yshah771dea82024-07-05 15:11:49 +00001152
yshah0defcd52024-11-18 07:41:35 +00001153 db_cluster = self.update_operation_history(
yshahcb9075f2024-11-22 12:08:57 +00001154 db_cluster, op_id, workflow_status, resource_status
yshah0defcd52024-11-18 07:41:35 +00001155 )
1156
garciadeblas96b94f52024-07-08 16:18:21 +02001157 db_cluster["operatingState"] = "IDLE"
garciadeblas96b94f52024-07-08 16:18:21 +02001158 # self.logger.info("db_cluster: {}".format(db_cluster))
garciadeblas6c82c352025-01-27 16:53:45 +01001159 # TODO: verify condition
garciadeblas96b94f52024-07-08 16:18:21 +02001160 # For the moment, if the workflow completed successfully, then we update the db accordingly.
1161 if workflow_status:
1162 if "k8s_version" in op_params:
1163 db_cluster["k8s_version"] = op_params["k8s_version"]
yshah0defcd52024-11-18 07:41:35 +00001164 if "node_count" in op_params:
garciadeblas96b94f52024-07-08 16:18:21 +02001165 db_cluster["node_count"] = op_params["node_count"]
yshah0defcd52024-11-18 07:41:35 +00001166 if "node_size" in op_params:
1167 db_cluster["node_count"] = op_params["node_size"]
garciadeblas96b94f52024-07-08 16:18:21 +02001168 # self.db.set_one(self.db_collection, {"_id": _id}, db_content)
shahithya70a3fc92024-11-12 11:01:05 +00001169 db_cluster["current_operation"] = None
garciadeblas96b94f52024-07-08 16:18:21 +02001170 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
yshah771dea82024-07-05 15:11:49 +00001171 return
1172
garciadeblasbc96f382025-01-22 16:02:18 +01001173 async def check_update_cluster(self, op_id, op_params, content):
1174 self.logger.info(
1175 f"check_update_cluster Operation {op_id}. Params: {op_params}."
1176 )
1177 self.logger.debug(f"Content: {content}")
1178 return await self.check_dummy_operation(op_id, op_params, content)
1179 # db_cluster = content["cluster"]
1180 # cluster_id = db_cluster["_id"]
1181 # cluster_kubectl = self.cluster_kubectl(db_cluster)
1182 # cluster_name = db_cluster["git_name"].lower()
1183 # cluster_kustomization_name = cluster_name
1184 # checkings_list = [
1185 # {
1186 # "item": "kustomization",
1187 # "name": cluster_kustomization_name,
1188 # "namespace": "managed-resources",
garciadeblas6c82c352025-01-27 16:53:45 +01001189 # "condition": {
1190 # "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
1191 # "value": "True",
1192 # },
garciadeblasbc96f382025-01-22 16:02:18 +01001193 # "timeout": self._checkloop_kustomization_timeout,
1194 # "enable": True,
1195 # "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
1196 # },
1197 # ]
1198 # return await self.common_check_list(
1199 # op_id, checkings_list, "clusters", db_cluster, cluster_kubectl
1200 # )
1201
yshah771dea82024-07-05 15:11:49 +00001202
garciadeblas72412282024-11-07 12:41:54 +01001203class CloudCredentialsLcm(GitOpsLcm):
yshah771dea82024-07-05 15:11:49 +00001204 db_collection = "vim_accounts"
1205
1206 def __init__(self, msg, lcm_tasks, config):
1207 """
1208 Init, Connect to database, filesystem storage, and messaging
1209 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1210 :return: None
1211 """
garciadeblas72412282024-11-07 12:41:54 +01001212 super().__init__(msg, lcm_tasks, config)
yshah771dea82024-07-05 15:11:49 +00001213
yshah564ec9c2024-11-29 07:33:32 +00001214 async def add(self, params, order_id):
yshah771dea82024-07-05 15:11:49 +00001215 self.logger.info("Cloud Credentials create")
yshah564ec9c2024-11-29 07:33:32 +00001216 vim_id = params["_id"]
1217 op_id = vim_id
1218 op_params = params
1219 db_content = self.db.get_one(self.db_collection, {"_id": vim_id})
1220 vim_config = db_content.get("config", {})
1221 self.db.encrypt_decrypt_fields(
1222 vim_config.get("credentials"),
1223 "decrypt",
1224 ["password", "secret"],
1225 schema_version=db_content["schema_version"],
1226 salt=vim_id,
1227 )
1228
garciadeblasadb81e82024-11-08 01:11:46 +01001229 _, workflow_name = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001230 "create_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001231 )
1232
1233 workflow_status, workflow_msg = await self.odu.check_workflow_status(
1234 workflow_name
1235 )
1236
1237 self.logger.info(
1238 "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
1239 )
1240
garciadeblas28bff0f2024-09-16 12:53:07 +02001241 # Clean items used in the workflow, no matter if the workflow succeeded
1242 clean_status, clean_msg = await self.odu.clean_items_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001243 "create_cloud_credentials", op_id, op_params, db_content
garciadeblas28bff0f2024-09-16 12:53:07 +02001244 )
1245 self.logger.info(
1246 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1247 )
1248
yshah771dea82024-07-05 15:11:49 +00001249 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001250 resource_status, resource_msg = await self.check_resource_status(
yshah564ec9c2024-11-29 07:33:32 +00001251 "create_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001252 )
1253 self.logger.info(
1254 "Resource Status: {} Resource Message: {}".format(
1255 resource_status, resource_msg
1256 )
1257 )
garciadeblas15b8a302024-09-23 12:40:13 +02001258
yshah564ec9c2024-11-29 07:33:32 +00001259 db_content["_admin"]["operationalState"] = "ENABLED"
1260 for operation in db_content["_admin"]["operations"]:
garciadeblas15b8a302024-09-23 12:40:13 +02001261 if operation["lcmOperationType"] == "create":
1262 operation["operationState"] = "ENABLED"
yshah564ec9c2024-11-29 07:33:32 +00001263 self.logger.info("Content : {}".format(db_content))
1264 self.db.set_one("vim_accounts", {"_id": db_content["_id"]}, db_content)
yshah771dea82024-07-05 15:11:49 +00001265 return
1266
yshah564ec9c2024-11-29 07:33:32 +00001267 async def edit(self, params, order_id):
1268 self.logger.info("Cloud Credentials Update")
1269 vim_id = params["_id"]
1270 op_id = vim_id
1271 op_params = params
1272 db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
1273 vim_config = db_content.get("config", {})
1274 self.db.encrypt_decrypt_fields(
1275 vim_config.get("credentials"),
1276 "decrypt",
1277 ["password", "secret"],
1278 schema_version=db_content["schema_version"],
1279 salt=vim_id,
1280 )
1281
garciadeblasadb81e82024-11-08 01:11:46 +01001282 _, workflow_name = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001283 "update_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001284 )
1285 workflow_status, workflow_msg = await self.odu.check_workflow_status(
1286 workflow_name
1287 )
1288 self.logger.info(
1289 "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
1290 )
1291
garciadeblas28bff0f2024-09-16 12:53:07 +02001292 # Clean items used in the workflow, no matter if the workflow succeeded
1293 clean_status, clean_msg = await self.odu.clean_items_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001294 "update_cloud_credentials", op_id, op_params, db_content
garciadeblas28bff0f2024-09-16 12:53:07 +02001295 )
1296 self.logger.info(
1297 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1298 )
1299
yshah771dea82024-07-05 15:11:49 +00001300 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001301 resource_status, resource_msg = await self.check_resource_status(
yshah564ec9c2024-11-29 07:33:32 +00001302 "update_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001303 )
1304 self.logger.info(
1305 "Resource Status: {} Resource Message: {}".format(
1306 resource_status, resource_msg
1307 )
1308 )
1309 return
1310
yshah564ec9c2024-11-29 07:33:32 +00001311 async def remove(self, params, order_id):
1312 self.logger.info("Cloud Credentials remove")
1313 vim_id = params["_id"]
1314 op_id = vim_id
1315 op_params = params
1316 db_content = self.db.get_one("vim_accounts", {"_id": vim_id})
1317
garciadeblasadb81e82024-11-08 01:11:46 +01001318 _, workflow_name = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001319 "delete_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001320 )
1321 workflow_status, workflow_msg = await self.odu.check_workflow_status(
1322 workflow_name
1323 )
1324 self.logger.info(
1325 "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
1326 )
1327
1328 if workflow_status:
garciadeblas72412282024-11-07 12:41:54 +01001329 resource_status, resource_msg = await self.check_resource_status(
yshah564ec9c2024-11-29 07:33:32 +00001330 "delete_cloud_credentials", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001331 )
1332 self.logger.info(
1333 "Resource Status: {} Resource Message: {}".format(
1334 resource_status, resource_msg
1335 )
1336 )
yshah564ec9c2024-11-29 07:33:32 +00001337 self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
yshah771dea82024-07-05 15:11:49 +00001338 return
1339
rshri932105f2024-07-05 15:11:55 +00001340
garciadeblas72412282024-11-07 12:41:54 +01001341class K8sAppLcm(GitOpsLcm):
rshri948f7de2024-12-02 03:42:35 +00001342 db_collection = "k8sapp"
1343
rshri932105f2024-07-05 15:11:55 +00001344 def __init__(self, msg, lcm_tasks, config):
1345 """
1346 Init, Connect to database, filesystem storage, and messaging
1347 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1348 :return: None
1349 """
garciadeblas72412282024-11-07 12:41:54 +01001350 super().__init__(msg, lcm_tasks, config)
rshri932105f2024-07-05 15:11:55 +00001351
rshri948f7de2024-12-02 03:42:35 +00001352 async def create(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001353 self.logger.info("App Create Enter")
1354
rshri948f7de2024-12-02 03:42:35 +00001355 op_id = params["operation_id"]
1356 profile_id = params["profile_id"]
1357
1358 # To initialize the operation states
1359 self.initialize_operation(profile_id, op_id)
1360
1361 content = self.db.get_one("k8sapp", {"_id": profile_id})
1362 content["profile_type"] = "applications"
1363 op_params = self.get_operation_params(content, op_id)
1364 self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
1365
garciadeblasadb81e82024-11-08 01:11:46 +01001366 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001367 "create_profile", op_id, op_params, content
1368 )
rshri932105f2024-07-05 15:11:55 +00001369 self.logger.info("workflow_name is :{}".format(workflow_name))
1370
garciadeblas713e1962025-01-17 12:49:19 +01001371 workflow_status = await self.check_workflow_and_update_db(
1372 op_id, workflow_name, content
1373 )
rshri932105f2024-07-05 15:11:55 +00001374
1375 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001376 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001377 "create_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001378 )
yshah564ec9c2024-11-29 07:33:32 +00001379 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1380 self.logger.info(f"App Create Exit with resource status: {resource_status}")
rshri932105f2024-07-05 15:11:55 +00001381 return
1382
rshri948f7de2024-12-02 03:42:35 +00001383 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001384 self.logger.info("App delete Enter")
rshri932105f2024-07-05 15:11:55 +00001385
rshri948f7de2024-12-02 03:42:35 +00001386 op_id = params["operation_id"]
1387 profile_id = params["profile_id"]
1388
1389 # To initialize the operation states
1390 self.initialize_operation(profile_id, op_id)
1391
1392 content = self.db.get_one("k8sapp", {"_id": profile_id})
1393 op_params = self.get_operation_params(content, op_id)
1394
garciadeblasadb81e82024-11-08 01:11:46 +01001395 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001396 "delete_profile", op_id, op_params, content
1397 )
rshri932105f2024-07-05 15:11:55 +00001398 self.logger.info("workflow_name is :{}".format(workflow_name))
1399
garciadeblas713e1962025-01-17 12:49:19 +01001400 workflow_status = await self.check_workflow_and_update_db(
1401 op_id, workflow_name, content
1402 )
rshri932105f2024-07-05 15:11:55 +00001403
1404 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001405 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001406 "delete_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001407 )
rshri932105f2024-07-05 15:11:55 +00001408
yshah564ec9c2024-11-29 07:33:32 +00001409 if resource_status:
1410 content["state"] = "DELETED"
1411 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1412 self.db.del_one(self.db_collection, {"_id": content["_id"]})
1413 self.logger.info(f"App Delete Exit with resource status: {resource_status}")
rshri932105f2024-07-05 15:11:55 +00001414 return
1415
1416
garciadeblas72412282024-11-07 12:41:54 +01001417class K8sResourceLcm(GitOpsLcm):
rshri948f7de2024-12-02 03:42:35 +00001418 db_collection = "k8sresource"
1419
rshri932105f2024-07-05 15:11:55 +00001420 def __init__(self, msg, lcm_tasks, config):
1421 """
1422 Init, Connect to database, filesystem storage, and messaging
1423 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1424 :return: None
1425 """
garciadeblas72412282024-11-07 12:41:54 +01001426 super().__init__(msg, lcm_tasks, config)
rshri932105f2024-07-05 15:11:55 +00001427
rshri948f7de2024-12-02 03:42:35 +00001428 async def create(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001429 self.logger.info("Resource Create Enter")
1430
rshri948f7de2024-12-02 03:42:35 +00001431 op_id = params["operation_id"]
1432 profile_id = params["profile_id"]
1433
1434 # To initialize the operation states
1435 self.initialize_operation(profile_id, op_id)
1436
1437 content = self.db.get_one("k8sresource", {"_id": profile_id})
1438 content["profile_type"] = "managed-resources"
1439 op_params = self.get_operation_params(content, op_id)
1440 self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
1441
garciadeblasadb81e82024-11-08 01:11:46 +01001442 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001443 "create_profile", op_id, op_params, content
1444 )
rshri932105f2024-07-05 15:11:55 +00001445 self.logger.info("workflow_name is :{}".format(workflow_name))
1446
garciadeblas713e1962025-01-17 12:49:19 +01001447 workflow_status = await self.check_workflow_and_update_db(
1448 op_id, workflow_name, content
1449 )
rshri932105f2024-07-05 15:11:55 +00001450
1451 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001452 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001453 "create_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001454 )
yshah564ec9c2024-11-29 07:33:32 +00001455 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1456 self.logger.info(
1457 f"Resource Create Exit with resource status: {resource_status}"
rshri932105f2024-07-05 15:11:55 +00001458 )
rshri932105f2024-07-05 15:11:55 +00001459 return
1460
rshri948f7de2024-12-02 03:42:35 +00001461 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001462 self.logger.info("Resource delete Enter")
rshri948f7de2024-12-02 03:42:35 +00001463
1464 op_id = params["operation_id"]
1465 profile_id = params["profile_id"]
1466
1467 # To initialize the operation states
1468 self.initialize_operation(profile_id, op_id)
1469
1470 content = self.db.get_one("k8sresource", {"_id": profile_id})
1471 op_params = self.get_operation_params(content, op_id)
rshri932105f2024-07-05 15:11:55 +00001472
garciadeblasadb81e82024-11-08 01:11:46 +01001473 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001474 "delete_profile", op_id, op_params, content
1475 )
rshri932105f2024-07-05 15:11:55 +00001476 self.logger.info("workflow_name is :{}".format(workflow_name))
1477
garciadeblas713e1962025-01-17 12:49:19 +01001478 workflow_status = await self.check_workflow_and_update_db(
1479 op_id, workflow_name, content
1480 )
rshri932105f2024-07-05 15:11:55 +00001481
1482 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001483 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001484 "delete_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001485 )
rshri932105f2024-07-05 15:11:55 +00001486
yshah564ec9c2024-11-29 07:33:32 +00001487 if resource_status:
1488 content["state"] = "DELETED"
1489 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1490 self.db.del_one(self.db_collection, {"_id": content["_id"]})
1491 self.logger.info(
1492 f"Resource Delete Exit with resource status: {resource_status}"
garciadeblas96b94f52024-07-08 16:18:21 +02001493 )
rshri932105f2024-07-05 15:11:55 +00001494 return
1495
1496
garciadeblas72412282024-11-07 12:41:54 +01001497class K8sInfraControllerLcm(GitOpsLcm):
rshri948f7de2024-12-02 03:42:35 +00001498 db_collection = "k8sinfra_controller"
1499
rshri932105f2024-07-05 15:11:55 +00001500 def __init__(self, msg, lcm_tasks, config):
1501 """
1502 Init, Connect to database, filesystem storage, and messaging
1503 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1504 :return: None
1505 """
garciadeblas72412282024-11-07 12:41:54 +01001506 super().__init__(msg, lcm_tasks, config)
rshri932105f2024-07-05 15:11:55 +00001507
rshri948f7de2024-12-02 03:42:35 +00001508 async def create(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001509 self.logger.info("Infra controller Create Enter")
1510
rshri948f7de2024-12-02 03:42:35 +00001511 op_id = params["operation_id"]
1512 profile_id = params["profile_id"]
1513
1514 # To initialize the operation states
1515 self.initialize_operation(profile_id, op_id)
1516
1517 content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
1518 content["profile_type"] = "infra-controllers"
1519 op_params = self.get_operation_params(content, op_id)
1520 self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
1521
garciadeblasadb81e82024-11-08 01:11:46 +01001522 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001523 "create_profile", op_id, op_params, content
1524 )
rshri932105f2024-07-05 15:11:55 +00001525 self.logger.info("workflow_name is :{}".format(workflow_name))
1526
garciadeblas713e1962025-01-17 12:49:19 +01001527 workflow_status = await self.check_workflow_and_update_db(
1528 op_id, workflow_name, content
1529 )
rshri932105f2024-07-05 15:11:55 +00001530
1531 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001532 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001533 "create_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001534 )
yshah564ec9c2024-11-29 07:33:32 +00001535 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1536 self.logger.info(
1537 f"Infra Controller Create Exit with resource status: {resource_status}"
rshri932105f2024-07-05 15:11:55 +00001538 )
rshri932105f2024-07-05 15:11:55 +00001539 return
1540
rshri948f7de2024-12-02 03:42:35 +00001541 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001542 self.logger.info("Infra controller delete Enter")
rshri932105f2024-07-05 15:11:55 +00001543
rshri948f7de2024-12-02 03:42:35 +00001544 op_id = params["operation_id"]
1545 profile_id = params["profile_id"]
1546
1547 # To initialize the operation states
1548 self.initialize_operation(profile_id, op_id)
1549
1550 content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
1551 op_params = self.get_operation_params(content, op_id)
1552
garciadeblasadb81e82024-11-08 01:11:46 +01001553 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001554 "delete_profile", op_id, op_params, content
1555 )
rshri932105f2024-07-05 15:11:55 +00001556 self.logger.info("workflow_name is :{}".format(workflow_name))
1557
garciadeblas713e1962025-01-17 12:49:19 +01001558 workflow_status = await self.check_workflow_and_update_db(
1559 op_id, workflow_name, content
1560 )
rshri932105f2024-07-05 15:11:55 +00001561
1562 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001563 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001564 "delete_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001565 )
rshri932105f2024-07-05 15:11:55 +00001566
yshah564ec9c2024-11-29 07:33:32 +00001567 if resource_status:
1568 content["state"] = "DELETED"
1569 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1570 self.db.del_one(self.db_collection, {"_id": content["_id"]})
1571 self.logger.info(
1572 f"Infra Controller Delete Exit with resource status: {resource_status}"
garciadeblas96b94f52024-07-08 16:18:21 +02001573 )
rshri932105f2024-07-05 15:11:55 +00001574 return
1575
1576
garciadeblas72412282024-11-07 12:41:54 +01001577class K8sInfraConfigLcm(GitOpsLcm):
rshri948f7de2024-12-02 03:42:35 +00001578 db_collection = "k8sinfra_config"
1579
rshri932105f2024-07-05 15:11:55 +00001580 def __init__(self, msg, lcm_tasks, config):
1581 """
1582 Init, Connect to database, filesystem storage, and messaging
1583 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1584 :return: None
1585 """
garciadeblas72412282024-11-07 12:41:54 +01001586 super().__init__(msg, lcm_tasks, config)
rshri932105f2024-07-05 15:11:55 +00001587
rshri948f7de2024-12-02 03:42:35 +00001588 async def create(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001589 self.logger.info("Infra config Create Enter")
1590
rshri948f7de2024-12-02 03:42:35 +00001591 op_id = params["operation_id"]
1592 profile_id = params["profile_id"]
1593
1594 # To initialize the operation states
1595 self.initialize_operation(profile_id, op_id)
1596
1597 content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
1598 content["profile_type"] = "infra-configs"
1599 op_params = self.get_operation_params(content, op_id)
1600 self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
1601
garciadeblasadb81e82024-11-08 01:11:46 +01001602 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001603 "create_profile", op_id, op_params, content
1604 )
rshri932105f2024-07-05 15:11:55 +00001605 self.logger.info("workflow_name is :{}".format(workflow_name))
1606
garciadeblas713e1962025-01-17 12:49:19 +01001607 workflow_status = await self.check_workflow_and_update_db(
1608 op_id, workflow_name, content
1609 )
rshri932105f2024-07-05 15:11:55 +00001610
1611 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001612 resource_status, content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001613 "create_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001614 )
yshah564ec9c2024-11-29 07:33:32 +00001615 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1616 self.logger.info(
1617 f"Infra Config Create Exit with resource status: {resource_status}"
rshri932105f2024-07-05 15:11:55 +00001618 )
rshri932105f2024-07-05 15:11:55 +00001619 return
1620
rshri948f7de2024-12-02 03:42:35 +00001621 async def delete(self, params, order_id):
rshri932105f2024-07-05 15:11:55 +00001622 self.logger.info("Infra config delete Enter")
1623
rshri948f7de2024-12-02 03:42:35 +00001624 op_id = params["operation_id"]
1625 profile_id = params["profile_id"]
1626
1627 # To initialize the operation states
1628 self.initialize_operation(profile_id, op_id)
1629
1630 content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
1631 op_params = self.get_operation_params(content, op_id)
1632
garciadeblasadb81e82024-11-08 01:11:46 +01001633 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001634 "delete_profile", op_id, op_params, content
1635 )
rshri932105f2024-07-05 15:11:55 +00001636 self.logger.info("workflow_name is :{}".format(workflow_name))
rshri932105f2024-07-05 15:11:55 +00001637
garciadeblas713e1962025-01-17 12:49:19 +01001638 workflow_status = await self.check_workflow_and_update_db(
1639 op_id, workflow_name, content
1640 )
yshah564ec9c2024-11-29 07:33:32 +00001641
rshri932105f2024-07-05 15:11:55 +00001642 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001643 resource_status, content = await self.check_resource_and_update_db(
yshah564ec9c2024-11-29 07:33:32 +00001644 "delete_profile", op_id, op_params, content
rshri932105f2024-07-05 15:11:55 +00001645 )
yshah564ec9c2024-11-29 07:33:32 +00001646
rshri932105f2024-07-05 15:11:55 +00001647 if resource_status:
yshah564ec9c2024-11-29 07:33:32 +00001648 content["state"] = "DELETED"
1649 self.db.set_one(self.db_collection, {"_id": content["_id"]}, content)
1650 self.db.del_one(self.db_collection, {"_id": content["_id"]})
1651 self.logger.info(
1652 f"Infra Config Delete Exit with resource status: {resource_status}"
garciadeblas96b94f52024-07-08 16:18:21 +02001653 )
rshri932105f2024-07-05 15:11:55 +00001654
rshri932105f2024-07-05 15:11:55 +00001655 return
yshah771dea82024-07-05 15:11:49 +00001656
1657
garciadeblas72412282024-11-07 12:41:54 +01001658class OkaLcm(GitOpsLcm):
yshah771dea82024-07-05 15:11:49 +00001659 db_collection = "okas"
1660
1661 def __init__(self, msg, lcm_tasks, config):
1662 """
1663 Init, Connect to database, filesystem storage, and messaging
1664 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1665 :return: None
1666 """
garciadeblas72412282024-11-07 12:41:54 +01001667 super().__init__(msg, lcm_tasks, config)
yshah771dea82024-07-05 15:11:49 +00001668
yshah564ec9c2024-11-29 07:33:32 +00001669 async def create(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001670 self.logger.info("OKA Create Enter")
yshah564ec9c2024-11-29 07:33:32 +00001671 op_id = params["operation_id"]
1672 oka_id = params["oka_id"]
1673 self.initialize_operation(oka_id, op_id)
1674 db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
1675 op_params = self.get_operation_params(db_content, op_id)
yshah771dea82024-07-05 15:11:49 +00001676
garciadeblasadb81e82024-11-08 01:11:46 +01001677 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001678 "create_oka", op_id, op_params, db_content
1679 )
yshah564ec9c2024-11-29 07:33:32 +00001680
garciadeblas713e1962025-01-17 12:49:19 +01001681 workflow_status = await self.check_workflow_and_update_db(
1682 op_id, workflow_name, db_content
1683 )
yshah771dea82024-07-05 15:11:49 +00001684
1685 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001686 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001687 "create_oka", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001688 )
garciadeblas96b94f52024-07-08 16:18:21 +02001689 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
yshah564ec9c2024-11-29 07:33:32 +00001690 self.logger.info(f"OKA Create Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00001691 return
1692
yshah564ec9c2024-11-29 07:33:32 +00001693 async def edit(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001694 self.logger.info("OKA Edit Enter")
yshah564ec9c2024-11-29 07:33:32 +00001695 op_id = params["operation_id"]
1696 oka_id = params["oka_id"]
1697 self.initialize_operation(oka_id, op_id)
1698 db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
1699 op_params = self.get_operation_params(db_content, op_id)
yshah771dea82024-07-05 15:11:49 +00001700
garciadeblasadb81e82024-11-08 01:11:46 +01001701 _, workflow_name = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001702 "update_oka", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02001703 )
garciadeblas713e1962025-01-17 12:49:19 +01001704 workflow_status = await self.check_workflow_and_update_db(
1705 op_id, workflow_name, db_content
1706 )
yshah771dea82024-07-05 15:11:49 +00001707
1708 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001709 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001710 "update_oka", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001711 )
garciadeblas96b94f52024-07-08 16:18:21 +02001712 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
yshah564ec9c2024-11-29 07:33:32 +00001713 self.logger.info(f"OKA Update Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00001714 return
1715
yshah564ec9c2024-11-29 07:33:32 +00001716 async def delete(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001717 self.logger.info("OKA delete Enter")
yshah564ec9c2024-11-29 07:33:32 +00001718 op_id = params["operation_id"]
1719 oka_id = params["oka_id"]
1720 self.initialize_operation(oka_id, op_id)
1721 db_content = self.db.get_one(self.db_collection, {"_id": oka_id})
1722 op_params = self.get_operation_params(db_content, op_id)
yshah771dea82024-07-05 15:11:49 +00001723
garciadeblasadb81e82024-11-08 01:11:46 +01001724 _, workflow_name = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001725 "delete_oka", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02001726 )
garciadeblas713e1962025-01-17 12:49:19 +01001727 workflow_status = await self.check_workflow_and_update_db(
1728 op_id, workflow_name, db_content
1729 )
yshah771dea82024-07-05 15:11:49 +00001730
1731 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001732 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001733 "delete_oka", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001734 )
yshah771dea82024-07-05 15:11:49 +00001735
yshah564ec9c2024-11-29 07:33:32 +00001736 if resource_status:
1737 db_content["state"] == "DELETED"
1738 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
garciadeblas96b94f52024-07-08 16:18:21 +02001739 self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
yshah564ec9c2024-11-29 07:33:32 +00001740 self.logger.info(f"OKA Delete Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00001741 return
1742
1743
garciadeblas72412282024-11-07 12:41:54 +01001744class KsuLcm(GitOpsLcm):
yshah771dea82024-07-05 15:11:49 +00001745 db_collection = "ksus"
1746
1747 def __init__(self, msg, lcm_tasks, config):
1748 """
1749 Init, Connect to database, filesystem storage, and messaging
1750 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1751 :return: None
1752 """
garciadeblas72412282024-11-07 12:41:54 +01001753 super().__init__(msg, lcm_tasks, config)
garciadeblasbc96f382025-01-22 16:02:18 +01001754 self._workflows = {
1755 "create_ksus": {
1756 "check_resource_function": self.check_create_ksus,
1757 },
1758 "delete_ksus": {
1759 "check_resource_function": self.check_delete_ksus,
1760 },
1761 }
1762
1763 def get_dbclusters_from_profile(self, profile_id, profile_type):
1764 cluster_list = []
1765 db_clusters = self.db.get_list("clusters")
1766 self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}")
1767 for db_cluster in db_clusters:
1768 if profile_id in db_cluster.get(profile_type, []):
1769 self.logger.info(
1770 f"Profile {profile_id} found in cluster {db_cluster['name']}"
1771 )
1772 cluster_list.append(db_cluster)
1773 return cluster_list
yshah771dea82024-07-05 15:11:49 +00001774
yshah564ec9c2024-11-29 07:33:32 +00001775 async def create(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001776 self.logger.info("ksu Create Enter")
yshah564ec9c2024-11-29 07:33:32 +00001777 db_content = []
1778 op_params = []
1779 op_id = params["operation_id"]
1780 for ksu_id in params["ksus_list"]:
1781 self.logger.info("Ksu ID: {}".format(ksu_id))
1782 self.initialize_operation(ksu_id, op_id)
1783 db_ksu = self.db.get_one(self.db_collection, {"_id": ksu_id})
1784 self.logger.info("Db KSU: {}".format(db_ksu))
1785 db_content.append(db_ksu)
1786 ksu_params = {}
1787 ksu_params = self.get_operation_params(db_ksu, op_id)
1788 self.logger.info("Operation Params: {}".format(ksu_params))
1789 # Update ksu_params["profile"] with profile name and age-pubkey
1790 profile_type = ksu_params["profile"]["profile_type"]
1791 profile_id = ksu_params["profile"]["_id"]
1792 profile_collection = self.profile_collection_mapping[profile_type]
1793 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
1794 ksu_params["profile"]["name"] = db_profile["name"]
1795 ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
1796 # Update ksu_params["oka"] with sw_catalog_path (when missing)
garciadeblas8c9c5442025-01-17 01:06:05 +01001797 # TODO: remove this in favor of doing it in ODU workflow
yshah564ec9c2024-11-29 07:33:32 +00001798 for oka in ksu_params["oka"]:
1799 if "sw_catalog_path" not in oka:
1800 oka_id = oka["_id"]
1801 db_oka = self.db.get_one("okas", {"_id": oka_id})
yshah2f39b8a2024-12-19 11:06:24 +00001802 oka_type = MAP_PROFILE[
1803 db_oka.get("profile_type", "infra_controller_profiles")
1804 ]
garciadeblas8c9c5442025-01-17 01:06:05 +01001805 oka[
1806 "sw_catalog_path"
garciadeblas1ad4e882025-01-24 14:24:41 +01001807 ] = f"{oka_type}/{db_oka['git_name'].lower()}/templates"
yshah564ec9c2024-11-29 07:33:32 +00001808 op_params.append(ksu_params)
yshah771dea82024-07-05 15:11:49 +00001809
garciadeblasbc96f382025-01-22 16:02:18 +01001810 # A single workflow is launched for all KSUs
garciadeblasadb81e82024-11-08 01:11:46 +01001811 _, workflow_name = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001812 "create_ksus", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02001813 )
garciadeblasbc96f382025-01-22 16:02:18 +01001814 # Update workflow status in all KSUs
1815 wf_status_list = []
yshah564ec9c2024-11-29 07:33:32 +00001816 for db_ksu, ksu_params in zip(db_content, op_params):
garciadeblas713e1962025-01-17 12:49:19 +01001817 workflow_status = await self.check_workflow_and_update_db(
1818 op_id, workflow_name, db_ksu
1819 )
garciadeblasbc96f382025-01-22 16:02:18 +01001820 wf_status_list.append(workflow_status)
1821 # Update resource status in all KSUs
1822 # TODO: Is an operation correct if n KSUs are right and 1 is not OK?
1823 res_status_list = []
1824 for db_ksu, ksu_params, wf_status in zip(db_content, op_params, wf_status_list):
1825 if wf_status:
garciadeblas713e1962025-01-17 12:49:19 +01001826 resource_status, db_ksu = await self.check_resource_and_update_db(
yshah564ec9c2024-11-29 07:33:32 +00001827 "create_ksus", op_id, ksu_params, db_ksu
1828 )
garciadeblasbc96f382025-01-22 16:02:18 +01001829 else:
1830 resource_status = False
1831 res_status_list.append(resource_status)
garciadeblas96b94f52024-07-08 16:18:21 +02001832 self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
1833
garciadeblasd8429852024-10-17 15:30:30 +02001834 # Clean items used in the workflow, no matter if the workflow succeeded
1835 clean_status, clean_msg = await self.odu.clean_items_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001836 "create_ksus", op_id, op_params, db_content
garciadeblasd8429852024-10-17 15:30:30 +02001837 )
1838 self.logger.info(
1839 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
1840 )
garciadeblasbc96f382025-01-22 16:02:18 +01001841 self.logger.info(f"KSU Create EXIT with Resource Status {res_status_list}")
yshah771dea82024-07-05 15:11:49 +00001842 return
1843
yshah564ec9c2024-11-29 07:33:32 +00001844 async def edit(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001845 self.logger.info("ksu edit Enter")
yshah564ec9c2024-11-29 07:33:32 +00001846 db_content = []
1847 op_params = []
1848 op_id = params["operation_id"]
1849 for ksu_id in params["ksus_list"]:
1850 self.initialize_operation(ksu_id, op_id)
1851 db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
1852 db_content.append(db_ksu)
1853 ksu_params = {}
1854 ksu_params = self.get_operation_params(db_ksu, op_id)
1855 # Update ksu_params["profile"] with profile name and age-pubkey
1856 profile_type = ksu_params["profile"]["profile_type"]
1857 profile_id = ksu_params["profile"]["_id"]
1858 profile_collection = self.profile_collection_mapping[profile_type]
1859 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
1860 ksu_params["profile"]["name"] = db_profile["name"]
1861 ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
1862 # Update ksu_params["oka"] with sw_catalog_path (when missing)
garciadeblas8c9c5442025-01-17 01:06:05 +01001863 # TODO: remove this in favor of doing it in ODU workflow
yshah564ec9c2024-11-29 07:33:32 +00001864 for oka in ksu_params["oka"]:
1865 if "sw_catalog_path" not in oka:
1866 oka_id = oka["_id"]
1867 db_oka = self.db.get_one("okas", {"_id": oka_id})
yshah2f39b8a2024-12-19 11:06:24 +00001868 oka_type = MAP_PROFILE[
1869 db_oka.get("profile_type", "infra_controller_profiles")
1870 ]
garciadeblas8c9c5442025-01-17 01:06:05 +01001871 oka[
1872 "sw_catalog_path"
1873 ] = f"{oka_type}/{db_oka['git_name']}/templates"
yshah564ec9c2024-11-29 07:33:32 +00001874 op_params.append(ksu_params)
yshah771dea82024-07-05 15:11:49 +00001875
garciadeblasadb81e82024-11-08 01:11:46 +01001876 _, workflow_name = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001877 "update_ksus", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02001878 )
yshah771dea82024-07-05 15:11:49 +00001879
yshah564ec9c2024-11-29 07:33:32 +00001880 for db_ksu, ksu_params in zip(db_content, op_params):
garciadeblas713e1962025-01-17 12:49:19 +01001881 workflow_status = await self.check_workflow_and_update_db(
1882 op_id, workflow_name, db_ksu
1883 )
yshah564ec9c2024-11-29 07:33:32 +00001884
garciadeblas96b94f52024-07-08 16:18:21 +02001885 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001886 resource_status, db_ksu = await self.check_resource_and_update_db(
yshah564ec9c2024-11-29 07:33:32 +00001887 "update_ksus", op_id, ksu_params, db_ksu
garciadeblas96b94f52024-07-08 16:18:21 +02001888 )
garciadeblas96b94f52024-07-08 16:18:21 +02001889 db_ksu["name"] = ksu_params["name"]
1890 db_ksu["description"] = ksu_params["description"]
1891 db_ksu["profile"]["profile_type"] = ksu_params["profile"][
1892 "profile_type"
1893 ]
1894 db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"]
1895 db_ksu["oka"] = ksu_params["oka"]
1896 self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
1897
yshah564ec9c2024-11-29 07:33:32 +00001898 # Clean items used in the workflow, no matter if the workflow succeeded
1899 clean_status, clean_msg = await self.odu.clean_items_workflow(
1900 "create_ksus", op_id, op_params, db_content
garciadeblas96b94f52024-07-08 16:18:21 +02001901 )
1902 self.logger.info(
yshah564ec9c2024-11-29 07:33:32 +00001903 f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
garciadeblas96b94f52024-07-08 16:18:21 +02001904 )
yshah564ec9c2024-11-29 07:33:32 +00001905 self.logger.info(f"KSU Update EXIT with Resource Status {resource_status}")
yshah771dea82024-07-05 15:11:49 +00001906 return
1907
yshah564ec9c2024-11-29 07:33:32 +00001908 async def delete(self, params, order_id):
1909 self.logger.info("ksu delete Enter")
1910 db_content = []
1911 op_params = []
1912 op_id = params["operation_id"]
1913 for ksu_id in params["ksus_list"]:
1914 self.initialize_operation(ksu_id, op_id)
1915 db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
1916 db_content.append(db_ksu)
1917 ksu_params = {}
1918 ksu_params["profile"] = {}
1919 ksu_params["profile"]["profile_type"] = db_ksu["profile"]["profile_type"]
1920 ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"]
1921 # Update ksu_params["profile"] with profile name and age-pubkey
1922 profile_type = ksu_params["profile"]["profile_type"]
1923 profile_id = ksu_params["profile"]["_id"]
1924 profile_collection = self.profile_collection_mapping[profile_type]
1925 db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
1926 ksu_params["profile"]["name"] = db_profile["name"]
1927 ksu_params["profile"]["age_pubkey"] = db_profile.get("age_pubkey", "")
1928 op_params.append(ksu_params)
yshah771dea82024-07-05 15:11:49 +00001929
garciadeblasadb81e82024-11-08 01:11:46 +01001930 _, workflow_name = await self.odu.launch_workflow(
yshah564ec9c2024-11-29 07:33:32 +00001931 "delete_ksus", op_id, op_params, db_content
1932 )
1933
1934 for db_ksu, ksu_params in zip(db_content, op_params):
garciadeblas713e1962025-01-17 12:49:19 +01001935 workflow_status = await self.check_workflow_and_update_db(
1936 op_id, workflow_name, db_ksu
1937 )
yshah564ec9c2024-11-29 07:33:32 +00001938
1939 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001940 resource_status, db_ksu = await self.check_resource_and_update_db(
yshah564ec9c2024-11-29 07:33:32 +00001941 "delete_ksus", op_id, ksu_params, db_ksu
1942 )
1943
1944 if resource_status:
1945 db_ksu["state"] == "DELETED"
1946 self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
1947 self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]})
1948
1949 self.logger.info(f"KSU Delete Exit with resource status: {resource_status}")
1950 return
1951
1952 async def clone(self, params, order_id):
1953 self.logger.info("ksu clone Enter")
1954 op_id = params["operation_id"]
1955 ksus_id = params["ksus_list"][0]
1956 self.initialize_operation(ksus_id, op_id)
1957 db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
1958 op_params = self.get_operation_params(db_content, op_id)
1959 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001960 "clone_ksus", op_id, op_params, db_content
1961 )
yshah564ec9c2024-11-29 07:33:32 +00001962
garciadeblas713e1962025-01-17 12:49:19 +01001963 workflow_status = await self.check_workflow_and_update_db(
1964 op_id, workflow_name, db_content
1965 )
yshah771dea82024-07-05 15:11:49 +00001966
1967 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001968 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001969 "clone_ksus", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001970 )
garciadeblas96b94f52024-07-08 16:18:21 +02001971 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
yshah564ec9c2024-11-29 07:33:32 +00001972
1973 self.logger.info(f"KSU Clone Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00001974 return
1975
yshah564ec9c2024-11-29 07:33:32 +00001976 async def move(self, params, order_id):
garciadeblas96b94f52024-07-08 16:18:21 +02001977 self.logger.info("ksu move Enter")
yshah564ec9c2024-11-29 07:33:32 +00001978 op_id = params["operation_id"]
1979 ksus_id = params["ksus_list"][0]
1980 self.initialize_operation(ksus_id, op_id)
1981 db_content = self.db.get_one(self.db_collection, {"_id": ksus_id})
1982 op_params = self.get_operation_params(db_content, op_id)
garciadeblasadb81e82024-11-08 01:11:46 +01001983 _, workflow_name = await self.odu.launch_workflow(
garciadeblas96b94f52024-07-08 16:18:21 +02001984 "move_ksus", op_id, op_params, db_content
1985 )
yshah564ec9c2024-11-29 07:33:32 +00001986
garciadeblas713e1962025-01-17 12:49:19 +01001987 workflow_status = await self.check_workflow_and_update_db(
1988 op_id, workflow_name, db_content
1989 )
yshah771dea82024-07-05 15:11:49 +00001990
1991 if workflow_status:
garciadeblas713e1962025-01-17 12:49:19 +01001992 resource_status, db_content = await self.check_resource_and_update_db(
garciadeblas96b94f52024-07-08 16:18:21 +02001993 "move_ksus", op_id, op_params, db_content
yshah771dea82024-07-05 15:11:49 +00001994 )
garciadeblas96b94f52024-07-08 16:18:21 +02001995 self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
yshah564ec9c2024-11-29 07:33:32 +00001996
1997 self.logger.info(f"KSU Move Exit with resource status: {resource_status}")
yshah771dea82024-07-05 15:11:49 +00001998 return
garciadeblasbc96f382025-01-22 16:02:18 +01001999
2000 async def check_create_ksus(self, op_id, op_params, content):
2001 self.logger.info(f"check_create_ksus Operation {op_id}. Params: {op_params}.")
2002 self.logger.debug(f"Content: {content}")
2003 db_ksu = content
2004 kustomization_name = db_ksu["git_name"].lower()
2005 oka_list = op_params["oka"]
2006 oka_item = oka_list[0]
2007 oka_params = oka_item.get("transformation", {})
2008 target_ns = oka_params.get("namespace", "default")
2009 profile_id = op_params.get("profile", {}).get("_id")
2010 profile_type = op_params.get("profile", {}).get("profile_type")
2011 self.logger.info(
2012 f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
2013 )
2014 dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
2015 if not dbcluster_list:
2016 self.logger.info(f"No clusters found for profile {profile_id}.")
2017 for db_cluster in dbcluster_list:
2018 try:
2019 self.logger.info(
2020 f"Checking status of KSU in cluster {db_cluster['name']}."
2021 )
2022 cluster_kubectl = self.cluster_kubectl(db_cluster)
2023 checkings_list = [
2024 {
2025 "item": "kustomization",
2026 "name": kustomization_name,
2027 "namespace": target_ns,
garciadeblas6c82c352025-01-27 16:53:45 +01002028 "condition": {
2029 "jsonpath_filter": "status.conditions[?(@.type=='Ready')].status",
2030 "value": "True",
2031 },
garciadeblasbc96f382025-01-22 16:02:18 +01002032 "timeout": self._checkloop_kustomization_timeout,
2033 "enable": True,
2034 "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
2035 },
2036 ]
2037 self.logger.info(
2038 f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
2039 )
2040 result, message = await self.common_check_list(
2041 op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl
2042 )
2043 if not result:
2044 return False, message
2045 except Exception as e:
2046 self.logger.error(
2047 f"Error checking KSU in cluster {db_cluster['name']}."
2048 )
2049 self.logger.error(e)
2050 return False, f"Error checking KSU in cluster {db_cluster['name']}."
2051 return True, "OK"
2052
2053 async def check_delete_ksus(self, op_id, op_params, content):
2054 self.logger.info(f"check_delete_ksus Operation {op_id}. Params: {op_params}.")
2055 self.logger.debug(f"Content: {content}")
2056 db_ksu = content
2057 kustomization_name = db_ksu["git_name"].lower()
2058 oka_list = db_ksu["oka"]
2059 oka_item = oka_list[0]
2060 oka_params = oka_item.get("transformation", {})
2061 target_ns = oka_params.get("namespace", "default")
2062 profile_id = op_params.get("profile", {}).get("_id")
2063 profile_type = op_params.get("profile", {}).get("profile_type")
2064 self.logger.info(
2065 f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
2066 )
2067 dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
2068 if not dbcluster_list:
2069 self.logger.info(f"No clusters found for profile {profile_id}.")
2070 for db_cluster in dbcluster_list:
2071 try:
2072 self.logger.info(
2073 f"Checking status of KSU in cluster {db_cluster['name']}."
2074 )
2075 cluster_kubectl = self.cluster_kubectl(db_cluster)
2076 checkings_list = [
2077 {
2078 "item": "kustomization",
2079 "name": kustomization_name,
2080 "namespace": target_ns,
2081 "deleted": True,
2082 "timeout": self._checkloop_kustomization_timeout,
2083 "enable": True,
2084 "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
2085 },
2086 ]
2087 self.logger.info(
2088 f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
2089 )
2090 result, message = await self.common_check_list(
2091 op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl
2092 )
2093 if not result:
2094 return False, message
2095 except Exception as e:
2096 self.logger.error(
2097 f"Error checking KSU in cluster {db_cluster['name']}."
2098 )
2099 self.logger.error(e)
2100 return False, f"Error checking KSU in cluster {db_cluster['name']}."
2101 return True, "OK"