blob: 29e43490f92f6c3c7c0954aa104940ef8ae8f880 [file] [log] [blame]
almagiacdd20ae2024-12-13 09:45:45 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22from typing import Union
23from shlex import quote
24import os
25import yaml
26
27from osm_lcm.n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
28from osm_lcm.n2vc.exceptions import K8sException
29
30
31class K8sHelm3Connector(K8sHelmBaseConnector):
32
33 """
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
37 """
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = "/usr/bin/kubectl",
44 helm_command: str = "/usr/bin/helm3",
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49 Initializes helm connector for helm v3
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 )
69
70 self.log.info("K8S Helm3 connector initialized")
71
72 async def install(
73 self,
74 cluster_uuid: str,
75 kdu_model: str,
76 kdu_instance: str,
77 atomic: bool = True,
78 timeout: float = 300,
79 params: dict = None,
80 db_dict: dict = None,
81 kdu_name: str = None,
82 namespace: str = None,
83 **kwargs,
84 ):
85 """Install a helm chart
86
87 :param cluster_uuid str: The UUID of the cluster to install to
88 :param kdu_model str: chart/reference (string), which can be either
89 of these options:
90 - a name of chart available via the repos known by OSM
91 (e.g. stable/openldap, stable/openldap:1.2.4)
92 - a path to a packaged chart (e.g. mychart.tgz)
93 - a path to an unpacked chart directory or a URL (e.g. mychart)
94 :param kdu_instance: Kdu instance name
95 :param atomic bool: If set, waits until the model is active and resets
96 the cluster on failure.
97 :param timeout int: The time, in seconds, to wait for the install
98 to finish
99 :param params dict: Key-value pairs of instantiation parameters
100 :param kdu_name: Name of the KDU instance to be installed
101 :param namespace: K8s namespace to use for the KDU instance
102
103 :param kwargs: Additional parameters (None yet)
104
105 :return: True if successful
106 """
107
108 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
109
110 labels_dict = None
111 if db_dict:
112 labels_dict = await self._labels_dict(db_dict, kdu_instance)
113
114 # sync local dir
115 self.fs.sync(from_path=cluster_uuid)
116
117 # init env, paths
118 paths, env = self._init_paths_env(
119 cluster_name=cluster_uuid, create_if_not_exist=True
120 )
121
122 # for helm3 if namespace does not exist must create it
123 if namespace and namespace != "kube-system":
124 if not await self._namespace_exists(cluster_uuid, namespace):
125 try:
126 # TODO: refactor to use kubernetes API client
127 await self._create_namespace(cluster_uuid, namespace)
128 except Exception as e:
129 if not await self._namespace_exists(cluster_uuid, namespace):
130 err_msg = (
131 "namespace {} does not exist in cluster_id {} "
132 "error message: ".format(namespace, e)
133 )
134 self.log.error(err_msg)
135 raise K8sException(err_msg)
136
137 await self._install_impl(
138 cluster_uuid,
139 kdu_model,
140 paths,
141 env,
142 kdu_instance,
143 atomic=atomic,
144 timeout=timeout,
145 params=params,
146 db_dict=db_dict,
147 labels=labels_dict,
148 kdu_name=kdu_name,
149 namespace=namespace,
150 )
151
152 # sync fs
153 self.fs.reverse_sync(from_path=cluster_uuid)
154
155 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
156 return True
157
158 async def migrate(self, nsr_id, target):
159 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
160
161 # check if it has k8s deployed kdus
162 if len(db_nsr["_admin"]["deployed"]["K8s"]) < 1:
163 err_msg = "INFO: No deployed KDUs"
164 self.log.error(err_msg)
165 raise K8sException(err_msg)
166
167 kdu_id = target["vdu"]["vduId"]
168 for index, kdu in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
169 if kdu["kdu-instance"] == kdu_id:
170 namespace = kdu["namespace"]
171 cluster_uuid = kdu["k8scluster-uuid"]
172 kdu_model = kdu["kdu-model"]
173 db_dict = {
174 "collection": "nsrs",
175 "filter": {"_id": nsr_id},
176 "path": "_admin.deployed.K8s.{}".format(index),
177 }
178
179 await self.upgrade(
180 cluster_uuid,
181 kdu_instance=kdu_id,
182 kdu_model=kdu_model,
183 namespace=namespace,
184 targetHostK8sLabels=target["targetHostK8sLabels"],
185 atomic=True,
186 db_dict=db_dict,
187 force=True,
188 )
189
190 return True
191
192 self.log.debug("ERROR: Unable to retrieve kdu from the database")
193
194 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
195 self.log.debug(
196 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
197 )
198
199 return await self._exec_inspect_command(
200 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
201 )
202
203 """
204 ####################################################################################
205 ################################### P R I V A T E ##################################
206 ####################################################################################
207 """
208
209 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
210 """
211 Creates and returns base cluster and kube dirs and returns them.
212 Also created helm3 dirs according to new directory specification, paths are
213 returned and also environment variables that must be provided to execute commands
214
215 Helm 3 directory specification uses XDG categories for variable support:
216 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
217 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
218 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
219
220 The variables assigned for this paths are:
221 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
222 $HELM_PATH_DATA but looking and helm env the variable names are different)
223 - Cache: $HELM_CACHE_HOME
224 - Config: $HELM_CONFIG_HOME
225 - Data: $HELM_DATA_HOME
226 - helm kubeconfig: $KUBECONFIG
227
228 :param cluster_name: cluster_name
229 :return: Dictionary with config_paths and dictionary with helm environment variables
230 """
231
232 base = self.fs.path
233 if base.endswith("/") or base.endswith("\\"):
234 base = base[:-1]
235
236 # base dir for cluster
237 cluster_dir = base + "/" + cluster_name
238
239 # kube dir
240 kube_dir = cluster_dir + "/" + ".kube"
241 if create_if_not_exist and not os.path.exists(kube_dir):
242 self.log.debug("Creating dir {}".format(kube_dir))
243 os.makedirs(kube_dir)
244
245 helm_path_cache = cluster_dir + "/.cache/helm"
246 if create_if_not_exist and not os.path.exists(helm_path_cache):
247 self.log.debug("Creating dir {}".format(helm_path_cache))
248 os.makedirs(helm_path_cache)
249
250 helm_path_config = cluster_dir + "/.config/helm"
251 if create_if_not_exist and not os.path.exists(helm_path_config):
252 self.log.debug("Creating dir {}".format(helm_path_config))
253 os.makedirs(helm_path_config)
254
255 helm_path_data = cluster_dir + "/.local/share/helm"
256 if create_if_not_exist and not os.path.exists(helm_path_data):
257 self.log.debug("Creating dir {}".format(helm_path_data))
258 os.makedirs(helm_path_data)
259
260 config_filename = kube_dir + "/config"
261
262 # 2 - Prepare dictionary with paths
263 paths = {
264 "kube_dir": kube_dir,
265 "kube_config": config_filename,
266 "cluster_dir": cluster_dir,
267 }
268
269 # 3 - Prepare environment variables
270 env = {
271 "HELM_CACHE_HOME": helm_path_cache,
272 "HELM_CONFIG_HOME": helm_path_config,
273 "HELM_DATA_HOME": helm_path_data,
274 "KUBECONFIG": config_filename,
275 }
276
277 for file_name, file in paths.items():
278 if "dir" in file_name and not os.path.exists(file):
279 err_msg = "{} dir does not exist".format(file)
280 self.log.error(err_msg)
281 raise K8sException(err_msg)
282
283 return paths, env
284
285 async def _namespace_exists(self, cluster_id, namespace) -> bool:
286 self.log.debug(
287 "checking if namespace {} exists cluster_id {}".format(
288 namespace, cluster_id
289 )
290 )
291 namespaces = await self._get_namespaces(cluster_id)
292 return namespace in namespaces if namespaces else False
293
294 async def _get_namespaces(self, cluster_id: str):
295 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
296
297 # init config, env
298 paths, env = self._init_paths_env(
299 cluster_name=cluster_id, create_if_not_exist=True
300 )
301
302 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
303 self.kubectl_command, quote(paths["kube_config"])
304 )
305 output, _rc = await self._local_async_exec(
306 command=command, raise_exception_on_error=True, env=env
307 )
308
309 data = yaml.load(output, Loader=yaml.SafeLoader)
310 namespaces = [item["metadata"]["name"] for item in data["items"]]
311 self.log.debug(f"namespaces {namespaces}")
312
313 return namespaces
314
315 async def _create_namespace(self, cluster_id: str, namespace: str):
316 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
317
318 # init config, env
319 paths, env = self._init_paths_env(
320 cluster_name=cluster_id, create_if_not_exist=True
321 )
322
323 command = "{} --kubeconfig={} create namespace {}".format(
324 self.kubectl_command, quote(paths["kube_config"]), quote(namespace)
325 )
326 _, _rc = await self._local_async_exec(
327 command=command, raise_exception_on_error=True, env=env
328 )
329 self.log.debug(f"namespace {namespace} created")
330
331 return _rc
332
333 async def _get_services(
334 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
335 ):
336 # init config, env
337 paths, env = self._init_paths_env(
338 cluster_name=cluster_id, create_if_not_exist=True
339 )
340
garciadeblasd3672322025-07-17 17:00:12 +0200341 command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
almagiacdd20ae2024-12-13 09:45:45 +0100342 kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
343 )
garciadeblasd3672322025-07-17 17:00:12 +0200344 output, _rc = await self._local_async_exec(
345 command, env=env, raise_exception_on_error=True
almagiacdd20ae2024-12-13 09:45:45 +0100346 )
347 services = self._parse_services(output)
348
349 return services
350
351 async def _cluster_init(self, cluster_id, namespace, paths, env):
352 """
353 Implements the helm version dependent cluster initialization:
354 For helm3 it creates the namespace if it is not created
355 """
356 if namespace != "kube-system":
357 namespaces = await self._get_namespaces(cluster_id)
358 if namespace not in namespaces:
359 # TODO: refactor to use kubernetes API client
360 await self._create_namespace(cluster_id, namespace)
361
362 repo_list = await self.repo_list(cluster_id)
363 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
364 if not stable_repo and self._stable_repo_url:
365 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
366
367 # Returns False as no software needs to be uninstalled
368 return False
369
370 async def _uninstall_sw(self, cluster_id: str, namespace: str):
371 # nothing to do to uninstall sw
372 pass
373
374 async def _instances_list(self, cluster_id: str):
375 # init paths, env
376 paths, env = self._init_paths_env(
377 cluster_name=cluster_id, create_if_not_exist=True
378 )
379
380 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
381 output, _rc = await self._local_async_exec(
382 command=command, raise_exception_on_error=True, env=env
383 )
384
385 if output and len(output) > 0:
386 self.log.debug("instances list output: {}".format(output))
387 return yaml.load(output, Loader=yaml.SafeLoader)
388 else:
389 return []
390
391 def _get_inspect_command(
392 self, show_command: str, kdu_model: str, repo_str: str, version: str
393 ):
394 """Generates the command to obtain the information about an Helm Chart package
395 (´helm show ...´ command)
396
397 Args:
398 show_command: the second part of the command (`helm show <show_command>`)
399 kdu_model: The name or path of a Helm Chart
400 repo_str: Helm Chart repository url
401 version: constraint with specific version of the Chart to use
402
403 Returns:
404 str: the generated Helm Chart command
405 """
406
407 inspect_command = "{} show {} {}{} {}".format(
408 self._helm_command, show_command, quote(kdu_model), repo_str, version
409 )
410 return inspect_command
411
412 def _get_get_command(
413 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
414 ):
415 get_command = (
416 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
417 kubeconfig,
418 self._helm_command,
419 get_command,
420 quote(kdu_instance),
421 quote(namespace),
422 )
423 )
424 return get_command
425
426 async def _status_kdu(
427 self,
428 cluster_id: str,
429 kdu_instance: str,
430 namespace: str = None,
431 yaml_format: bool = False,
432 show_error_log: bool = False,
433 ) -> Union[str, dict]:
434 self.log.debug(
435 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
436 )
437
438 if not namespace:
439 namespace = "kube-system"
440
441 # init config, env
442 paths, env = self._init_paths_env(
443 cluster_name=cluster_id, create_if_not_exist=True
444 )
445 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
446 paths["kube_config"],
447 self._helm_command,
448 quote(kdu_instance),
449 quote(namespace),
450 )
451
452 output, rc = await self._local_async_exec(
453 command=command,
454 raise_exception_on_error=True,
455 show_error_log=show_error_log,
456 env=env,
457 )
458
459 if yaml_format:
460 return str(output)
461
462 if rc != 0:
463 return None
464
465 data = yaml.load(output, Loader=yaml.SafeLoader)
466
467 # remove field 'notes' and manifest
468 try:
469 del data.get("info")["notes"]
470 except KeyError:
471 pass
472
473 # parse the manifest to a list of dictionaries
474 if "manifest" in data:
475 manifest_str = data.get("manifest")
476 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
477
478 data["manifest"] = []
479 for doc in manifest_docs:
480 data["manifest"].append(doc)
481
482 return data
483
484 def _get_install_command(
485 self,
486 kdu_model: str,
487 kdu_instance: str,
488 namespace: str,
489 labels: dict,
490 params_str: str,
491 version: str,
492 atomic: bool,
493 timeout: float,
494 kubeconfig: str,
495 ) -> str:
496 timeout_str = ""
497 if timeout:
498 timeout_str = "--timeout {}s".format(timeout)
499
500 # atomic
501 atomic_str = ""
502 if atomic:
503 atomic_str = "--atomic"
504 # namespace
505 namespace_str = ""
506 if namespace:
507 namespace_str = "--namespace {}".format(quote(namespace))
508
509 # version
510 version_str = ""
511 if version:
512 version_str = "--version {}".format(version)
513
514 # labels
515 post_renderer_args = []
516 post_renderer_str = post_renderer_args_str = ""
517 if labels and self.podLabels_post_renderer_path:
518 post_renderer_args.append(
519 "{}={}".format(
520 self.podLabels_post_renderer_path,
521 " ".join(
522 ["{}:{}".format(key, value) for key, value in labels.items()]
523 ),
524 )
525 )
526
527 if len(post_renderer_args) > 0 and self.main_post_renderer_path:
528 post_renderer_str = "--post-renderer {}".format(
529 self.main_post_renderer_path,
530 )
531 post_renderer_args_str += (
532 "--post-renderer-args '" + ",".join(post_renderer_args) + "'"
533 )
534
535 command = (
536 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
537 "{params} {timeout} {ns} {post_renderer} {post_renderer_args} {model} {ver}".format(
538 kubeconfig=kubeconfig,
539 helm=self._helm_command,
540 name=quote(kdu_instance),
541 atomic=atomic_str,
542 params=params_str,
543 timeout=timeout_str,
544 ns=namespace_str,
545 post_renderer=post_renderer_str,
546 post_renderer_args=post_renderer_args_str,
547 model=quote(kdu_model),
548 ver=version_str,
549 )
550 )
551 return command
552
553 def _get_upgrade_scale_command(
554 self,
555 kdu_model: str,
556 kdu_instance: str,
557 namespace: str,
558 scale: int,
559 labels: dict,
560 version: str,
561 atomic: bool,
562 replica_str: str,
563 timeout: float,
564 resource_name: str,
565 kubeconfig: str,
566 ) -> str:
567 """Generates the command to scale a Helm Chart release
568
569 Args:
570 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
571 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
572 namespace (str): Namespace where this KDU instance is deployed
573 scale (int): Scale count
574 version (str): Constraint with specific version of the Chart to use
575 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
576 The --wait flag will be set automatically if --atomic is used
577 replica_str (str): The key under resource_name key where the scale count is stored
578 timeout (float): The time, in seconds, to wait
579 resource_name (str): The KDU's resource to scale
580 kubeconfig (str): Kubeconfig file path
581
582 Returns:
583 str: command to scale a Helm Chart release
584 """
585
586 # scale
587 if resource_name:
588 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
589 else:
590 scale_dict = {replica_str: scale}
591
592 scale_str = self._params_to_set_option(scale_dict)
593
594 return self._get_upgrade_command(
595 kdu_model=kdu_model,
596 kdu_instance=kdu_instance,
597 namespace=namespace,
598 params_str=scale_str,
599 labels=labels,
600 version=version,
601 atomic=atomic,
602 timeout=timeout,
603 kubeconfig=kubeconfig,
604 )
605
606 def _get_upgrade_command(
607 self,
608 kdu_model: str,
609 kdu_instance: str,
610 namespace: str,
611 params_str: str,
612 labels: dict,
613 version: str,
614 atomic: bool,
615 timeout: float,
616 kubeconfig: str,
617 targetHostK8sLabels: dict = None,
618 reset_values: bool = False,
619 reuse_values: bool = True,
620 reset_then_reuse_values: bool = False,
621 force: bool = False,
622 ) -> str:
623 """Generates the command to upgrade a Helm Chart release
624
625 Args:
626 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
627 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
628 namespace (str): Namespace where this KDU instance is deployed
629 params_str (str): Params used to upgrade the Helm Chart release
630 version (str): Constraint with specific version of the Chart to use
631 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
632 The --wait flag will be set automatically if --atomic is used
633 timeout (float): The time, in seconds, to wait
634 kubeconfig (str): Kubeconfig file path
635 reset_values(bool): If set, helm resets values instead of reusing previous values.
636 reuse_values(bool): If set, helm reuses previous values.
637 reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values
638 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
639 Returns:
640 str: command to upgrade a Helm Chart release
641 """
642
643 timeout_str = ""
644 if timeout:
645 timeout_str = "--timeout {}s".format(timeout)
646
647 # atomic
648 atomic_str = ""
649 if atomic:
650 atomic_str = "--atomic"
651
652 # force
653 force_str = ""
654 if force:
655 force_str = "--force "
656
657 # version
658 version_str = ""
659 if version:
660 version_str = "--version {}".format(quote(version))
661
662 # namespace
663 namespace_str = ""
664 if namespace:
665 namespace_str = "--namespace {}".format(quote(namespace))
666
667 # reset, reuse or reset_then_reuse values
668 on_values_str = "--reuse-values"
669 if reset_values:
670 on_values_str = "--reset-values"
671 elif reuse_values:
672 on_values_str = "--reuse-values"
673 elif reset_then_reuse_values:
674 on_values_str = "--reset-then-reuse-values"
675
676 # labels
677 post_renderer_args = []
678 post_renderer_str = post_renderer_args_str = ""
679 if labels and self.podLabels_post_renderer_path:
680 post_renderer_args.append(
681 "{}={}".format(
682 self.podLabels_post_renderer_path,
683 " ".join(
684 ["{}:{}".format(key, value) for key, value in labels.items()]
685 ),
686 )
687 )
688
689 # migration
690 if targetHostK8sLabels and self.nodeSelector_post_renderer_path:
691 post_renderer_args.append(
692 "{}={}".format(
693 self.nodeSelector_post_renderer_path,
694 " ".join(
695 [
696 "{}:{}".format(key, value)
697 for key, value in targetHostK8sLabels.items()
698 ]
699 ),
700 )
701 )
702
703 if len(post_renderer_args) > 0 and self.main_post_renderer_path:
704 post_renderer_str = "--post-renderer {}".format(
705 self.main_post_renderer_path,
706 )
707 post_renderer_args_str += (
708 "--post-renderer-args '" + ",".join(post_renderer_args) + "'"
709 )
710
711 command = (
712 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
713 "--output yaml {params} {timeout} {post_renderer} {post_renderer_args} {on_values} {ver}"
714 ).format(
715 kubeconfig=kubeconfig,
716 helm=self._helm_command,
717 name=quote(kdu_instance),
718 namespace=namespace_str,
719 atomic=atomic_str,
720 force=force_str,
721 params=params_str,
722 timeout=timeout_str,
723 post_renderer=post_renderer_str,
724 post_renderer_args=post_renderer_args_str,
725 model=quote(kdu_model),
726 on_values=on_values_str,
727 ver=version_str,
728 )
729 return command
730
731 def _get_rollback_command(
732 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
733 ) -> str:
734 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
735 kubeconfig,
736 self._helm_command,
737 quote(kdu_instance),
738 revision,
739 quote(namespace),
740 )
741
742 def _get_uninstall_command(
743 self, kdu_instance: str, namespace: str, kubeconfig: str
744 ) -> str:
745 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
746 kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
747 )
748
749 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
750 repo_ids = []
751 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
752 cluster = self.db.get_one("k8sclusters", cluster_filter)
753 if cluster:
754 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
755 return repo_ids
756 else:
757 raise K8sException(
758 "k8cluster with helm-id : {} not found".format(cluster_uuid)
759 )