Revert "Feature 11002: Deprecate helmv2"
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 from typing import Union
24 from shlex import quote
25 import os
26 import yaml
27
28 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
29 from n2vc.exceptions import K8sException
30
31
32 class K8sHelmConnector(K8sHelmBaseConnector):
33
34 """
35 ####################################################################################
36 ################################### P U B L I C ####################################
37 ####################################################################################
38 """
39
40 def __init__(
41 self,
42 fs: object,
43 db: object,
44 kubectl_command: str = "/usr/bin/kubectl",
45 helm_command: str = "/usr/bin/helm",
46 log: object = None,
47 on_update_db=None,
48 ):
49 """
50 Initializes helm connector for helm v2
51
52 :param fs: file system for kubernetes and helm configuration
53 :param db: database object to write current operation status
54 :param kubectl_command: path to kubectl executable
55 :param helm_command: path to helm executable
56 :param log: logger
57 :param on_update_db: callback called when k8s connector updates database
58 """
59
60 # parent class
61 K8sHelmBaseConnector.__init__(
62 self,
63 db=db,
64 log=log,
65 fs=fs,
66 kubectl_command=kubectl_command,
67 helm_command=helm_command,
68 on_update_db=on_update_db,
69 )
70
71 self.log.info("Initializing K8S Helm2 connector")
72
73 # initialize helm client-only
74 self.log.debug("Initializing helm client-only...")
75 command = "{} init --client-only {} ".format(
76 self._helm_command,
77 "--stable-repo-url {}".format(quote(self._stable_repo_url))
78 if self._stable_repo_url
79 else "--skip-repos",
80 )
81 try:
82 asyncio.create_task(
83 self._local_async_exec(command=command, raise_exception_on_error=False)
84 )
85 except Exception as e:
86 self.warning(
87 msg="helm init failed (it was already initialized): {}".format(e)
88 )
89
90 self.log.info("K8S Helm2 connector initialized")
91
92 async def install(
93 self,
94 cluster_uuid: str,
95 kdu_model: str,
96 kdu_instance: str,
97 atomic: bool = True,
98 timeout: float = 300,
99 params: dict = None,
100 db_dict: dict = None,
101 kdu_name: str = None,
102 namespace: str = None,
103 **kwargs,
104 ):
105 """
106 Deploys of a new KDU instance. It would implicitly rely on the `install` call
107 to deploy the Chart/Bundle properly parametrized (in practice, this call would
108 happen before any _initial-config-primitive_of the VNF is called).
109
110 :param cluster_uuid: UUID of a K8s cluster known by OSM
111 :param kdu_model: chart/reference (string), which can be either
112 of these options:
113 - a name of chart available via the repos known by OSM
114 (e.g. stable/openldap, stable/openldap:1.2.4)
115 - a path to a packaged chart (e.g. mychart.tgz)
116 - a path to an unpacked chart directory or a URL (e.g. mychart)
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
126 path: <str>},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
133 """
134 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
135
136 # sync local dir
137 self.fs.sync(from_path=cluster_uuid)
138
139 # init env, paths
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_uuid, create_if_not_exist=True
142 )
143
144 await self._install_impl(
145 cluster_uuid,
146 kdu_model,
147 paths,
148 env,
149 kdu_instance,
150 atomic=atomic,
151 timeout=timeout,
152 params=params,
153 db_dict=db_dict,
154 kdu_name=kdu_name,
155 namespace=namespace,
156 )
157
158 # sync fs
159 self.fs.reverse_sync(from_path=cluster_uuid)
160
161 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
162 return True
163
164 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
165 self.log.debug(
166 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
167 )
168
169 return await self._exec_inspect_command(
170 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
171 )
172
173 """
174 ####################################################################################
175 ################################### P R I V A T E ##################################
176 ####################################################################################
177 """
178
179 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
180 """
181 Creates and returns base cluster and kube dirs and returns them.
182 Also created helm3 dirs according to new directory specification, paths are
183 returned and also environment variables that must be provided to execute commands
184
185 Helm 2 directory specification uses helm_home dir:
186
187 The variables assigned for this paths are:
188 - Helm hone: $HELM_HOME
189 - helm kubeconfig: $KUBECONFIG
190
191 :param cluster_name: cluster_name
192 :return: Dictionary with config_paths and dictionary with helm environment variables
193 """
194 base = self.fs.path
195 if base.endswith("/") or base.endswith("\\"):
196 base = base[:-1]
197
198 # base dir for cluster
199 cluster_dir = base + "/" + cluster_name
200
201 # kube dir
202 kube_dir = cluster_dir + "/" + ".kube"
203 if create_if_not_exist and not os.path.exists(kube_dir):
204 self.log.debug("Creating dir {}".format(kube_dir))
205 os.makedirs(kube_dir)
206
207 # helm home dir
208 helm_dir = cluster_dir + "/" + ".helm"
209 if create_if_not_exist and not os.path.exists(helm_dir):
210 self.log.debug("Creating dir {}".format(helm_dir))
211 os.makedirs(helm_dir)
212
213 config_filename = kube_dir + "/config"
214
215 # 2 - Prepare dictionary with paths
216 paths = {
217 "kube_dir": kube_dir,
218 "kube_config": config_filename,
219 "cluster_dir": cluster_dir,
220 "helm_dir": helm_dir,
221 }
222
223 for file_name, file in paths.items():
224 if "dir" in file_name and not os.path.exists(file):
225 err_msg = "{} dir does not exist".format(file)
226 self.log.error(err_msg)
227 raise K8sException(err_msg)
228
229 # 3 - Prepare environment variables
230 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
231
232 return paths, env
233
234 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
235 # init config, env
236 paths, env = self._init_paths_env(
237 cluster_name=cluster_id, create_if_not_exist=True
238 )
239
240 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
241 kubeconfig, self._helm_command, quote(kdu_instance)
242 )
243 command2 = "{} get --namespace={} -f -".format(
244 self.kubectl_command, quote(namespace)
245 )
246 output, _rc = await self._local_async_exec_pipe(
247 command1, command2, env=env, raise_exception_on_error=True
248 )
249 services = self._parse_services(output)
250
251 return services
252
253 async def _cluster_init(
254 self, cluster_id: str, namespace: str, paths: dict, env: dict
255 ):
256 """
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
259 """
260
261 # check if tiller pod is up in cluster
262 command = "{} --kubeconfig={} --namespace={} get deployments".format(
263 self.kubectl_command, paths["kube_config"], quote(namespace)
264 )
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=True, env=env
267 )
268
269 output_table = self._output_to_table(output=output)
270
271 # find 'tiller' pod in all pods
272 already_initialized = False
273 try:
274 for row in output_table:
275 if row[0].startswith("tiller-deploy"):
276 already_initialized = True
277 break
278 except Exception:
279 pass
280
281 # helm init
282 n2vc_installed_sw = False
283 if not already_initialized:
284 self.log.info(
285 "Initializing helm in client and server: {}".format(cluster_id)
286 )
287 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self.kubectl_command, paths["kube_config"], quote(self.service_account)
289 )
290 _, _rc = await self._local_async_exec(
291 command=command, raise_exception_on_error=False, env=env
292 )
293
294 command = (
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(
298 self.kubectl_command, paths["kube_config"], quote(self.service_account)
299 )
300 _, _rc = await self._local_async_exec(
301 command=command, raise_exception_on_error=False, env=env
302 )
303
304 command = (
305 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
306 " {} init"
307 ).format(
308 self._helm_command,
309 paths["kube_config"],
310 quote(namespace),
311 quote(paths["helm_dir"]),
312 quote(self.service_account),
313 "--stable-repo-url {}".format(quote(self._stable_repo_url))
314 if self._stable_repo_url
315 else "--skip-repos",
316 )
317 _, _rc = await self._local_async_exec(
318 command=command, raise_exception_on_error=True, env=env
319 )
320 n2vc_installed_sw = True
321 else:
322 # check client helm installation
323 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
324 if not self._check_file_exists(
325 filename=check_file, exception_if_not_exists=False
326 ):
327 self.log.info("Initializing helm in client: {}".format(cluster_id))
328 command = (
329 "{} --kubeconfig={} --tiller-namespace={} "
330 "--home={} init --client-only {} "
331 ).format(
332 self._helm_command,
333 paths["kube_config"],
334 quote(namespace),
335 quote(paths["helm_dir"]),
336 "--stable-repo-url {}".format(quote(self._stable_repo_url))
337 if self._stable_repo_url
338 else "--skip-repos",
339 )
340 output, _rc = await self._local_async_exec(
341 command=command, raise_exception_on_error=True, env=env
342 )
343 else:
344 self.log.info("Helm client already initialized")
345
346 repo_list = await self.repo_list(cluster_id)
347 for repo in repo_list:
348 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
349 self.log.debug("Add new stable repo url: {}")
350 await self.repo_remove(cluster_id, "stable")
351 if self._stable_repo_url:
352 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
353 break
354
355 return n2vc_installed_sw
356
357 async def _uninstall_sw(self, cluster_id: str, namespace: str):
358 # uninstall Tiller if necessary
359
360 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
361
362 # init paths, env
363 paths, env = self._init_paths_env(
364 cluster_name=cluster_id, create_if_not_exist=True
365 )
366
367 if not namespace:
368 # find namespace for tiller pod
369 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
370 self.kubectl_command, quote(paths["kube_config"])
371 )
372 output, _rc = await self._local_async_exec(
373 command=command, raise_exception_on_error=False, env=env
374 )
375 output_table = self._output_to_table(output=output)
376 namespace = None
377 for r in output_table:
378 try:
379 if "tiller-deploy" in r[1]:
380 namespace = r[0]
381 break
382 except Exception:
383 pass
384 else:
385 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
386 self.log.error(msg)
387
388 self.log.debug("namespace for tiller: {}".format(namespace))
389
390 if namespace:
391 # uninstall tiller from cluster
392 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
393 command = "{} --kubeconfig={} --home={} reset".format(
394 self._helm_command,
395 quote(paths["kube_config"]),
396 quote(paths["helm_dir"]),
397 )
398 self.log.debug("resetting: {}".format(command))
399 output, _rc = await self._local_async_exec(
400 command=command, raise_exception_on_error=True, env=env
401 )
402 # Delete clusterrolebinding and serviceaccount.
403 # Ignore if errors for backward compatibility
404 command = (
405 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
406 "io/osm-tiller-cluster-rule"
407 ).format(self.kubectl_command, quote(paths["kube_config"]))
408 output, _rc = await self._local_async_exec(
409 command=command, raise_exception_on_error=False, env=env
410 )
411 command = (
412 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
413 self.kubectl_command,
414 quote(paths["kube_config"]),
415 quote(namespace),
416 quote(self.service_account),
417 )
418 )
419 output, _rc = await self._local_async_exec(
420 command=command, raise_exception_on_error=False, env=env
421 )
422
423 else:
424 self.log.debug("namespace not found")
425
426 async def _instances_list(self, cluster_id):
427 # init paths, env
428 paths, env = self._init_paths_env(
429 cluster_name=cluster_id, create_if_not_exist=True
430 )
431
432 command = "{} list --output yaml".format(self._helm_command)
433
434 output, _rc = await self._local_async_exec(
435 command=command, raise_exception_on_error=True, env=env
436 )
437
438 if output and len(output) > 0:
439 # parse yaml and update keys to lower case to unify with helm3
440 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
441 new_instances = []
442 for instance in instances:
443 new_instance = dict((k.lower(), v) for k, v in instance.items())
444 new_instances.append(new_instance)
445 return new_instances
446 else:
447 return []
448
449 def _get_inspect_command(
450 self, show_command: str, kdu_model: str, repo_str: str, version: str
451 ):
452 inspect_command = "{} inspect {} {}{} {}".format(
453 self._helm_command, show_command, quote(kdu_model), repo_str, version
454 )
455 return inspect_command
456
457 def _get_get_command(
458 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
459 ):
460 get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
461 kubeconfig, self._helm_command, get_command, quote(kdu_instance)
462 )
463 return get_command
464
465 async def _status_kdu(
466 self,
467 cluster_id: str,
468 kdu_instance: str,
469 namespace: str = None,
470 yaml_format: bool = False,
471 show_error_log: bool = False,
472 ) -> Union[str, dict]:
473 self.log.debug(
474 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
475 )
476
477 # init config, env
478 paths, env = self._init_paths_env(
479 cluster_name=cluster_id, create_if_not_exist=True
480 )
481 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
482 paths["kube_config"], self._helm_command, quote(kdu_instance)
483 )
484 output, rc = await self._local_async_exec(
485 command=command,
486 raise_exception_on_error=True,
487 show_error_log=show_error_log,
488 env=env,
489 )
490
491 if yaml_format:
492 return str(output)
493
494 if rc != 0:
495 return None
496
497 data = yaml.load(output, Loader=yaml.SafeLoader)
498
499 # remove field 'notes'
500 try:
501 del data.get("info").get("status")["notes"]
502 except KeyError:
503 pass
504
505 # parse the manifest to a list of dictionaries
506 if "manifest" in data:
507 manifest_str = data.get("manifest")
508 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
509
510 data["manifest"] = []
511 for doc in manifest_docs:
512 data["manifest"].append(doc)
513
514 # parse field 'resources'
515 try:
516 resources = str(data.get("info").get("status").get("resources"))
517 resource_table = self._output_to_table(resources)
518 data.get("info").get("status")["resources"] = resource_table
519 except Exception:
520 pass
521
522 # set description to lowercase (unify with helm3)
523 try:
524 data.get("info")["description"] = data.get("info").pop("Description")
525 except KeyError:
526 pass
527
528 return data
529
530 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
531 repo_ids = []
532 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
533 cluster = self.db.get_one("k8sclusters", cluster_filter)
534 if cluster:
535 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
536 return repo_ids
537 else:
538 raise K8sException(
539 "k8cluster with helm-id : {} not found".format(cluster_uuid)
540 )
541
542 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
543 # init config, env
544 paths, env = self._init_paths_env(
545 cluster_name=cluster_id, create_if_not_exist=True
546 )
547
548 status = await self._status_kdu(
549 cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
550 )
551
552 # extract info.status.resources-> str
553 # format:
554 # ==> v1/Deployment
555 # NAME READY UP-TO-DATE AVAILABLE AGE
556 # halting-horse-mongodb 0/1 1 0 0s
557 # halting-petit-mongodb 1/1 1 0 0s
558 # blank line
559 resources = K8sHelmBaseConnector._get_deep(
560 status, ("info", "status", "resources")
561 )
562
563 # convert to table
564 resources = K8sHelmBaseConnector._output_to_table(resources)
565
566 num_lines = len(resources)
567 index = 0
568 ready = True
569 while index < num_lines:
570 try:
571 line1 = resources[index]
572 index += 1
573 # find '==>' in column 0
574 if line1[0] == "==>":
575 line2 = resources[index]
576 index += 1
577 # find READY in column 1
578 if line2[1] == "READY":
579 # read next lines
580 line3 = resources[index]
581 index += 1
582 while len(line3) > 1 and index < num_lines:
583 ready_value = line3[1]
584 parts = ready_value.split(sep="/")
585 current = int(parts[0])
586 total = int(parts[1])
587 if current < total:
588 self.log.debug("NOT READY:\n {}".format(line3))
589 ready = False
590 line3 = resources[index]
591 index += 1
592
593 except Exception:
594 pass
595
596 return ready
597
598 def _get_install_command(
599 self,
600 kdu_model,
601 kdu_instance,
602 namespace,
603 params_str,
604 version,
605 atomic,
606 timeout,
607 kubeconfig,
608 ) -> str:
609 timeout_str = ""
610 if timeout:
611 timeout_str = "--timeout {}".format(timeout)
612
613 # atomic
614 atomic_str = ""
615 if atomic:
616 atomic_str = "--atomic"
617 # namespace
618 namespace_str = ""
619 if namespace:
620 namespace_str = "--namespace {}".format(quote(namespace))
621
622 # version
623 version_str = ""
624 if version:
625 version_str = "--version {}".format(version)
626
627 command = (
628 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
629 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
630 kubeconfig=kubeconfig,
631 helm=self._helm_command,
632 atomic=atomic_str,
633 params=params_str,
634 timeout=timeout_str,
635 name=quote(kdu_instance),
636 ns=namespace_str,
637 model=quote(kdu_model),
638 ver=version_str,
639 )
640 )
641 return command
642
643 def _get_upgrade_scale_command(
644 self,
645 kdu_model: str,
646 kdu_instance: str,
647 namespace: str,
648 scale: int,
649 version: str,
650 atomic: bool,
651 replica_str: str,
652 timeout: float,
653 resource_name: str,
654 kubeconfig: str,
655 ) -> str:
656 """Generates the command to scale a Helm Chart release
657
658 Args:
659 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
660 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
661 namespace (str): Namespace where this KDU instance is deployed
662 scale (int): Scale count
663 version (str): Constraint with specific version of the Chart to use
664 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
665 The --wait flag will be set automatically if --atomic is used
666 replica_str (str): The key under resource_name key where the scale count is stored
667 timeout (float): The time, in seconds, to wait
668 resource_name (str): The KDU's resource to scale
669 kubeconfig (str): Kubeconfig file path
670
671 Returns:
672 str: command to scale a Helm Chart release
673 """
674
675 # scale
676 if resource_name:
677 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
678 else:
679 scale_dict = {replica_str: scale}
680
681 scale_str = self._params_to_set_option(scale_dict)
682
683 return self._get_upgrade_command(
684 kdu_model=kdu_model,
685 kdu_instance=kdu_instance,
686 namespace=namespace,
687 params_str=scale_str,
688 version=version,
689 atomic=atomic,
690 timeout=timeout,
691 kubeconfig=kubeconfig,
692 )
693
694 def _get_upgrade_command(
695 self,
696 kdu_model,
697 kdu_instance,
698 namespace,
699 params_str,
700 version,
701 atomic,
702 timeout,
703 kubeconfig,
704 force: bool = False,
705 ) -> str:
706 """Generates the command to upgrade a Helm Chart release
707
708 Args:
709 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
710 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
711 namespace (str): Namespace where this KDU instance is deployed
712 params_str (str): Params used to upgrade the Helm Chart release
713 version (str): Constraint with specific version of the Chart to use
714 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
715 The --wait flag will be set automatically if --atomic is used
716 timeout (float): The time, in seconds, to wait
717 kubeconfig (str): Kubeconfig file path
718 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
719 Returns:
720 str: command to upgrade a Helm Chart release
721 """
722
723 timeout_str = ""
724 if timeout:
725 timeout_str = "--timeout {}".format(timeout)
726
727 # atomic
728 atomic_str = ""
729 if atomic:
730 atomic_str = "--atomic"
731
732 # force
733 force_str = ""
734 if force:
735 force_str = "--force "
736
737 # version
738 version_str = ""
739 if version:
740 version_str = "--version {}".format(quote(version))
741
742 # namespace
743 namespace_str = ""
744 if namespace:
745 namespace_str = "--namespace {}".format(quote(namespace))
746
747 command = (
748 "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}"
749 "--reuse-values {name} {model} {ver}"
750 ).format(
751 kubeconfig=kubeconfig,
752 helm=self._helm_command,
753 namespace=namespace_str,
754 atomic=atomic_str,
755 force=force_str,
756 params=params_str,
757 timeout=timeout_str,
758 name=quote(kdu_instance),
759 model=quote(kdu_model),
760 ver=version_str,
761 )
762 return command
763
764 def _get_rollback_command(
765 self, kdu_instance, namespace, revision, kubeconfig
766 ) -> str:
767 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
768 kubeconfig, self._helm_command, quote(kdu_instance), revision
769 )
770
771 def _get_uninstall_command(
772 self, kdu_instance: str, namespace: str, kubeconfig: str
773 ) -> str:
774 return "env KUBECONFIG={} {} delete --purge {}".format(
775 kubeconfig, self._helm_command, quote(kdu_instance)
776 )