Code Coverage

Cobertura Coverage Report > n2vc >

k8s_helm_conn.py

Trend

File Coverage summary

NameClassesLinesConditionals
k8s_helm_conn.py
100%
1/1
51%
130/253
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_helm_conn.py
51%
130/253
N/A

Source

n2vc/k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 1 import asyncio
23 1 from typing import Union
24 1 import os
25 1 import yaml
26
27 1 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
28 1 from n2vc.exceptions import K8sException
29
30
31 1 class K8sHelmConnector(K8sHelmBaseConnector):
32
33     """
34     ####################################################################################
35     ################################### P U B L I C ####################################
36     ####################################################################################
37     """
38
39 1     def __init__(
40         self,
41         fs: object,
42         db: object,
43         kubectl_command: str = "/usr/bin/kubectl",
44         helm_command: str = "/usr/bin/helm",
45         log: object = None,
46         on_update_db=None,
47     ):
48         """
49         Initializes helm connector for helm v2
50
51         :param fs: file system for kubernetes and helm configuration
52         :param db: database object to write current operation status
53         :param kubectl_command: path to kubectl executable
54         :param helm_command: path to helm executable
55         :param log: logger
56         :param on_update_db: callback called when k8s connector updates database
57         """
58
59         # parent class
60 1         K8sHelmBaseConnector.__init__(
61             self,
62             db=db,
63             log=log,
64             fs=fs,
65             kubectl_command=kubectl_command,
66             helm_command=helm_command,
67             on_update_db=on_update_db,
68         )
69
70 1         self.log.info("Initializing K8S Helm2 connector")
71
72         # initialize helm client-only
73 1         self.log.debug("Initializing helm client-only...")
74 1         command = "{} init --client-only {} ".format(
75             self._helm_command,
76             "--stable-repo-url {}".format(self._stable_repo_url)
77             if self._stable_repo_url
78             else "--skip-repos",
79         )
80 1         try:
81 1             asyncio.ensure_future(
82                 self._local_async_exec(command=command, raise_exception_on_error=False)
83             )
84             # loop = asyncio.get_event_loop()
85             # loop.run_until_complete(self._local_async_exec(command=command,
86             # raise_exception_on_error=False))
87 0         except Exception as e:
88 0             self.warning(
89                 msg="helm init failed (it was already initialized): {}".format(e)
90             )
91
92 1         self.log.info("K8S Helm2 connector initialized")
93
94 1     async def install(
95         self,
96         cluster_uuid: str,
97         kdu_model: str,
98         kdu_instance: str,
99         atomic: bool = True,
100         timeout: float = 300,
101         params: dict = None,
102         db_dict: dict = None,
103         kdu_name: str = None,
104         namespace: str = None,
105         **kwargs,
106     ):
107         """
108         Deploys of a new KDU instance. It would implicitly rely on the `install` call
109         to deploy the Chart/Bundle properly parametrized (in practice, this call would
110         happen before any _initial-config-primitive_of the VNF is called).
111
112         :param cluster_uuid: UUID of a K8s cluster known by OSM
113         :param kdu_model: chart/reference (string), which can be either
114             of these options:
115             - a name of chart available via the repos known by OSM
116               (e.g. stable/openldap, stable/openldap:1.2.4)
117             - a path to a packaged chart (e.g. mychart.tgz)
118             - a path to an unpacked chart directory or a URL (e.g. mychart)
119         :param kdu_instance: Kdu instance name
120         :param atomic: If set, installation process purges chart/bundle on fail, also
121             will wait until all the K8s objects are active
122         :param timeout: Time in seconds to wait for the install of the chart/bundle
123             (defaults to Helm default timeout: 300s)
124         :param params: dictionary of key-value pairs for instantiation parameters
125             (overriding default values)
126         :param dict db_dict: where to write into database when the status changes.
127                         It contains a dict with {collection: <str>, filter: {},
128                         path: <str>},
129                             e.g. {collection: "nsrs", filter:
130                             {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
131         :param kdu_name: Name of the KDU instance to be installed
132         :param namespace: K8s namespace to use for the KDU instance
133         :param kwargs: Additional parameters (None yet)
134         :return: True if successful
135         """
136 1         self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
137
138         # sync local dir
139 1         self.fs.sync(from_path=cluster_uuid)
140
141         # init env, paths
142 1         paths, env = self._init_paths_env(
143             cluster_name=cluster_uuid, create_if_not_exist=True
144         )
145
146 1         await self._install_impl(
147             cluster_uuid,
148             kdu_model,
149             paths,
150             env,
151             kdu_instance,
152             atomic=atomic,
153             timeout=timeout,
154             params=params,
155             db_dict=db_dict,
156             kdu_name=kdu_name,
157             namespace=namespace,
158         )
159
160         # sync fs
161 1         self.fs.reverse_sync(from_path=cluster_uuid)
162
163 1         self.log.debug("Returning kdu_instance {}".format(kdu_instance))
164 1         return True
165
166 1     async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
167 1         self.log.debug(
168             "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
169         )
170
171 1         return await self._exec_inspect_command(
172             inspect_command="", kdu_model=kdu_model, repo_url=repo_url
173         )
174
175     """
176     ####################################################################################
177     ################################### P R I V A T E ##################################
178     ####################################################################################
179     """
180
181 1     def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
182         """
183         Creates and returns base cluster and kube dirs and returns them.
184         Also created helm3 dirs according to new directory specification, paths are
185         returned and also environment variables that must be provided to execute commands
186
187         Helm 2 directory specification uses helm_home dir:
188
189         The variables assigned for this paths are:
190         - Helm hone: $HELM_HOME
191         - helm kubeconfig: $KUBECONFIG
192
193         :param cluster_name:  cluster_name
194         :return: Dictionary with config_paths and dictionary with helm environment variables
195         """
196 1         base = self.fs.path
197 1         if base.endswith("/") or base.endswith("\\"):
198 1             base = base[:-1]
199
200         # base dir for cluster
201 1         cluster_dir = base + "/" + cluster_name
202
203         # kube dir
204 1         kube_dir = cluster_dir + "/" + ".kube"
205 1         if create_if_not_exist and not os.path.exists(kube_dir):
206 1             self.log.debug("Creating dir {}".format(kube_dir))
207 1             os.makedirs(kube_dir)
208
209         # helm home dir
210 1         helm_dir = cluster_dir + "/" + ".helm"
211 1         if create_if_not_exist and not os.path.exists(helm_dir):
212 1             self.log.debug("Creating dir {}".format(helm_dir))
213 1             os.makedirs(helm_dir)
214
215 1         config_filename = kube_dir + "/config"
216
217         # 2 - Prepare dictionary with paths
218 1         paths = {
219             "kube_dir": kube_dir,
220             "kube_config": config_filename,
221             "cluster_dir": cluster_dir,
222             "helm_dir": helm_dir,
223         }
224
225 1         for file_name, file in paths.items():
226 1             if "dir" in file_name and not os.path.exists(file):
227 0                 err_msg = "{} dir does not exist".format(file)
228 0                 self.log.error(err_msg)
229 0                 raise K8sException(err_msg)
230
231         # 3 - Prepare environment variables
232 1         env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
233
234 1         return paths, env
235
236 1     async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
237         # init config, env
238 1         paths, env = self._init_paths_env(
239             cluster_name=cluster_id, create_if_not_exist=True
240         )
241
242 1         command1 = "env KUBECONFIG={} {} get manifest {} ".format(
243             kubeconfig, self._helm_command, kdu_instance
244         )
245 1         command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
246 1         output, _rc = await self._local_async_exec_pipe(
247             command1, command2, env=env, raise_exception_on_error=True
248         )
249 1         services = self._parse_services(output)
250
251 1         return services
252
253 1     async def _cluster_init(
254         self, cluster_id: str, namespace: str, paths: dict, env: dict
255     ):
256         """
257         Implements the helm version dependent cluster initialization:
258         For helm2 it initialized tiller environment if needed
259         """
260
261         # check if tiller pod is up in cluster
262 0         command = "{} --kubeconfig={} --namespace={} get deployments".format(
263             self.kubectl_command, paths["kube_config"], namespace
264         )
265 0         output, _rc = await self._local_async_exec(
266             command=command, raise_exception_on_error=True, env=env
267         )
268
269 0         output_table = self._output_to_table(output=output)
270
271         # find 'tiller' pod in all pods
272 0         already_initialized = False
273 0         try:
274 0             for row in output_table:
275 0                 if row[0].startswith("tiller-deploy"):
276 0                     already_initialized = True
277 0                     break
278 0         except Exception:
279 0             pass
280
281         # helm init
282 0         n2vc_installed_sw = False
283 0         if not already_initialized:
284 0             self.log.info(
285                 "Initializing helm in client and server: {}".format(cluster_id)
286             )
287 0             command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288                 self.kubectl_command, paths["kube_config"], self.service_account
289             )
290 0             _, _rc = await self._local_async_exec(
291                 command=command, raise_exception_on_error=False, env=env
292             )
293
294 0             command = (
295                 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296                 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297             ).format(self.kubectl_command, paths["kube_config"], self.service_account)
298 0             _, _rc = await self._local_async_exec(
299                 command=command, raise_exception_on_error=False, env=env
300             )
301
302 0             command = (
303                 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
304                 " {} init"
305             ).format(
306                 self._helm_command,
307                 paths["kube_config"],
308                 namespace,
309                 paths["helm_dir"],
310                 self.service_account,
311                 "--stable-repo-url {}".format(self._stable_repo_url)
312                 if self._stable_repo_url
313                 else "--skip-repos",
314             )
315 0             _, _rc = await self._local_async_exec(
316                 command=command, raise_exception_on_error=True, env=env
317             )
318 0             n2vc_installed_sw = True
319         else:
320             # check client helm installation
321 0             check_file = paths["helm_dir"] + "/repository/repositories.yaml"
322 0             if not self._check_file_exists(
323                 filename=check_file, exception_if_not_exists=False
324             ):
325 0                 self.log.info("Initializing helm in client: {}".format(cluster_id))
326 0                 command = (
327                     "{} --kubeconfig={} --tiller-namespace={} "
328                     "--home={} init --client-only {} "
329                 ).format(
330                     self._helm_command,
331                     paths["kube_config"],
332                     namespace,
333                     paths["helm_dir"],
334                     "--stable-repo-url {}".format(self._stable_repo_url)
335                     if self._stable_repo_url
336                     else "--skip-repos",
337                 )
338 0                 output, _rc = await self._local_async_exec(
339                     command=command, raise_exception_on_error=True, env=env
340                 )
341             else:
342 0                 self.log.info("Helm client already initialized")
343
344 0         repo_list = await self.repo_list(cluster_id)
345 0         for repo in repo_list:
346 0             if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
347 0                 self.log.debug("Add new stable repo url: {}")
348 0                 await self.repo_remove(cluster_id, "stable")
349 0                 if self._stable_repo_url:
350 0                     await self.repo_add(cluster_id, "stable", self._stable_repo_url)
351 0                 break
352
353 0         return n2vc_installed_sw
354
355 1     async def _uninstall_sw(self, cluster_id: str, namespace: str):
356         # uninstall Tiller if necessary
357
358 1         self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
359
360         # init paths, env
361 1         paths, env = self._init_paths_env(
362             cluster_name=cluster_id, create_if_not_exist=True
363         )
364
365 1         if not namespace:
366             # find namespace for tiller pod
367 0             command = "{} --kubeconfig={} get deployments --all-namespaces".format(
368                 self.kubectl_command, paths["kube_config"]
369             )
370 0             output, _rc = await self._local_async_exec(
371                 command=command, raise_exception_on_error=False, env=env
372             )
373 0             output_table = self._output_to_table(output=output)
374 0             namespace = None
375 0             for r in output_table:
376 0                 try:
377 0                     if "tiller-deploy" in r[1]:
378 0                         namespace = r[0]
379 0                         break
380 0                 except Exception:
381 0                     pass
382             else:
383 0                 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
384 0                 self.log.error(msg)
385
386 0             self.log.debug("namespace for tiller: {}".format(namespace))
387
388 1         if namespace:
389             # uninstall tiller from cluster
390 1             self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
391 1             command = "{} --kubeconfig={} --home={} reset".format(
392                 self._helm_command, paths["kube_config"], paths["helm_dir"]
393             )
394 1             self.log.debug("resetting: {}".format(command))
395 1             output, _rc = await self._local_async_exec(
396                 command=command, raise_exception_on_error=True, env=env
397             )
398             # Delete clusterrolebinding and serviceaccount.
399             # Ignore if errors for backward compatibility
400 1             command = (
401                 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
402                 "io/osm-tiller-cluster-rule"
403             ).format(self.kubectl_command, paths["kube_config"])
404 1             output, _rc = await self._local_async_exec(
405                 command=command, raise_exception_on_error=False, env=env
406             )
407 1             command = (
408                 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
409                     self.kubectl_command,
410                     paths["kube_config"],
411                     namespace,
412                     self.service_account,
413                 )
414             )
415 1             output, _rc = await self._local_async_exec(
416                 command=command, raise_exception_on_error=False, env=env
417             )
418
419         else:
420 0             self.log.debug("namespace not found")
421
422 1     async def _instances_list(self, cluster_id):
423         # init paths, env
424 1         paths, env = self._init_paths_env(
425             cluster_name=cluster_id, create_if_not_exist=True
426         )
427
428 1         command = "{} list --output yaml".format(self._helm_command)
429
430 1         output, _rc = await self._local_async_exec(
431             command=command, raise_exception_on_error=True, env=env
432         )
433
434 1         if output and len(output) > 0:
435             # parse yaml and update keys to lower case to unify with helm3
436 0             instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
437 0             new_instances = []
438 0             for instance in instances:
439 0                 new_instance = dict((k.lower(), v) for k, v in instance.items())
440 0                 new_instances.append(new_instance)
441 0             return new_instances
442         else:
443 1             return []
444
445 1     def _get_inspect_command(
446         self, show_command: str, kdu_model: str, repo_str: str, version: str
447     ):
448 1         inspect_command = "{} inspect {} {}{} {}".format(
449             self._helm_command, show_command, kdu_model, repo_str, version
450         )
451 1         return inspect_command
452
453 1     def _get_get_command(
454         self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
455     ):
456 1         get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
457             kubeconfig, self._helm_command, get_command, kdu_instance
458         )
459 1         return get_command
460
461 1     async def _status_kdu(
462         self,
463         cluster_id: str,
464         kdu_instance: str,
465         namespace: str = None,
466         yaml_format: bool = False,
467         show_error_log: bool = False,
468     ) -> Union[str, dict]:
469 1         self.log.debug(
470             "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
471         )
472
473         # init config, env
474 1         paths, env = self._init_paths_env(
475             cluster_name=cluster_id, create_if_not_exist=True
476         )
477 1         command = ("env KUBECONFIG={} {} status {} --output yaml").format(
478             paths["kube_config"], self._helm_command, kdu_instance
479         )
480 1         output, rc = await self._local_async_exec(
481             command=command,
482             raise_exception_on_error=True,
483             show_error_log=show_error_log,
484             env=env,
485         )
486
487 1         if yaml_format:
488 1             return str(output)
489
490 0         if rc != 0:
491 0             return None
492
493 0         data = yaml.load(output, Loader=yaml.SafeLoader)
494
495         # remove field 'notes'
496 0         try:
497 0             del data.get("info").get("status")["notes"]
498 0         except KeyError:
499 0             pass
500
501         # parse the manifest to a list of dictionaries
502 0         if "manifest" in data:
503 0             manifest_str = data.get("manifest")
504 0             manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
505
506 0             data["manifest"] = []
507 0             for doc in manifest_docs:
508 0                 data["manifest"].append(doc)
509
510         # parse field 'resources'
511 0         try:
512 0             resources = str(data.get("info").get("status").get("resources"))
513 0             resource_table = self._output_to_table(resources)
514 0             data.get("info").get("status")["resources"] = resource_table
515 0         except Exception:
516 0             pass
517
518         # set description to lowercase (unify with helm3)
519 0         try:
520 0             data.get("info")["description"] = data.get("info").pop("Description")
521 0         except KeyError:
522 0             pass
523
524 0         return data
525
526 1     def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
527 0         repo_ids = []
528 0         cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
529 0         cluster = self.db.get_one("k8sclusters", cluster_filter)
530 0         if cluster:
531 0             repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
532 0             return repo_ids
533         else:
534 0             raise K8sException(
535                 "k8cluster with helm-id : {} not found".format(cluster_uuid)
536             )
537
538 1     async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
539         # init config, env
540 0         paths, env = self._init_paths_env(
541             cluster_name=cluster_id, create_if_not_exist=True
542         )
543
544 0         status = await self._status_kdu(
545             cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
546         )
547
548         # extract info.status.resources-> str
549         # format:
550         #       ==> v1/Deployment
551         #       NAME                    READY   UP-TO-DATE   AVAILABLE   AGE
552         #       halting-horse-mongodb   0/1     1            0           0s
553         #       halting-petit-mongodb   1/1     1            0           0s
554         # blank line
555 0         resources = K8sHelmBaseConnector._get_deep(
556             status, ("info", "status", "resources")
557         )
558
559         # convert to table
560 0         resources = K8sHelmBaseConnector._output_to_table(resources)
561
562 0         num_lines = len(resources)
563 0         index = 0
564 0         ready = True
565 0         while index < num_lines:
566 0             try:
567 0                 line1 = resources[index]
568 0                 index += 1
569                 # find '==>' in column 0
570 0                 if line1[0] == "==>":
571 0                     line2 = resources[index]
572 0                     index += 1
573                     # find READY in column 1
574 0                     if line2[1] == "READY":
575                         # read next lines
576 0                         line3 = resources[index]
577 0                         index += 1
578 0                         while len(line3) > 1 and index < num_lines:
579 0                             ready_value = line3[1]
580 0                             parts = ready_value.split(sep="/")
581 0                             current = int(parts[0])
582 0                             total = int(parts[1])
583 0                             if current < total:
584 0                                 self.log.debug("NOT READY:\n    {}".format(line3))
585 0                                 ready = False
586 0                             line3 = resources[index]
587 0                             index += 1
588
589 0             except Exception:
590 0                 pass
591
592 0         return ready
593
594 1     def _get_install_command(
595         self,
596         kdu_model,
597         kdu_instance,
598         namespace,
599         params_str,
600         version,
601         atomic,
602         timeout,
603         kubeconfig,
604     ) -> str:
605 1         timeout_str = ""
606 1         if timeout:
607 1             timeout_str = "--timeout {}".format(timeout)
608
609         # atomic
610 1         atomic_str = ""
611 1         if atomic:
612 1             atomic_str = "--atomic"
613         # namespace
614 1         namespace_str = ""
615 1         if namespace:
616 1             namespace_str = "--namespace {}".format(namespace)
617
618         # version
619 1         version_str = ""
620 1         if version:
621 1             version_str = "--version {}".format(version)
622
623 1         command = (
624             "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml  "
625             "{params} {timeout} --name={name} {ns} {model} {ver}".format(
626                 kubeconfig=kubeconfig,
627                 helm=self._helm_command,
628                 atomic=atomic_str,
629                 params=params_str,
630                 timeout=timeout_str,
631                 name=kdu_instance,
632                 ns=namespace_str,
633                 model=kdu_model,
634                 ver=version_str,
635             )
636         )
637 1         return command
638
639 1     def _get_upgrade_scale_command(
640         self,
641         kdu_model: str,
642         kdu_instance: str,
643         namespace: str,
644         scale: int,
645         version: str,
646         atomic: bool,
647         replica_str: str,
648         timeout: float,
649         resource_name: str,
650         kubeconfig: str,
651     ) -> str:
652         """Generates the command to scale a Helm Chart release
653
654         Args:
655             kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
656             kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
657             namespace (str): Namespace where this KDU instance is deployed
658             scale (int): Scale count
659             version (str): Constraint with specific version of the Chart to use
660             atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
661                 The --wait flag will be set automatically if --atomic is used
662             replica_str (str): The key under resource_name key where the scale count is stored
663             timeout (float): The time, in seconds, to wait
664             resource_name (str): The KDU's resource to scale
665             kubeconfig (str): Kubeconfig file path
666
667         Returns:
668             str: command to scale a Helm Chart release
669         """
670
671         # scale
672 1         if resource_name:
673 1             scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
674         else:
675 1             scale_dict = {replica_str: scale}
676
677 1         scale_str = self._params_to_set_option(scale_dict)
678
679 1         return self._get_upgrade_command(
680             kdu_model=kdu_model,
681             kdu_instance=kdu_instance,
682             namespace=namespace,
683             params_str=scale_str,
684             version=version,
685             atomic=atomic,
686             timeout=timeout,
687             kubeconfig=kubeconfig,
688         )
689
690 1     def _get_upgrade_command(
691         self,
692         kdu_model,
693         kdu_instance,
694         namespace,
695         params_str,
696         version,
697         atomic,
698         timeout,
699         kubeconfig,
700         force: bool = False,
701     ) -> str:
702         """Generates the command to upgrade a Helm Chart release
703
704         Args:
705             kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
706             kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
707             namespace (str): Namespace where this KDU instance is deployed
708             params_str (str): Params used to upgrade the Helm Chart release
709             version (str): Constraint with specific version of the Chart to use
710             atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
711                 The --wait flag will be set automatically if --atomic is used
712             timeout (float): The time, in seconds, to wait
713             kubeconfig (str): Kubeconfig file path
714             force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
715         Returns:
716             str: command to upgrade a Helm Chart release
717         """
718
719 1         timeout_str = ""
720 1         if timeout:
721 1             timeout_str = "--timeout {}".format(timeout)
722
723         # atomic
724 1         atomic_str = ""
725 1         if atomic:
726 1             atomic_str = "--atomic"
727
728         # force
729 1         force_str = ""
730 1         if force:
731 1             force_str = "--force "
732
733         # version
734 1         version_str = ""
735 1         if version:
736 1             version_str = "--version {}".format(version)
737
738         # namespace
739 1         namespace_str = ""
740 1         if namespace:
741 1             namespace_str = "--namespace {}".format(namespace)
742
743 1         command = (
744             "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}"
745             "--reuse-values {name} {model} {ver}"
746         ).format(
747             kubeconfig=kubeconfig,
748             helm=self._helm_command,
749             namespace=namespace_str,
750             atomic=atomic_str,
751             force=force_str,
752             params=params_str,
753             timeout=timeout_str,
754             name=kdu_instance,
755             model=kdu_model,
756             ver=version_str,
757         )
758 1         return command
759
760 1     def _get_rollback_command(
761         self, kdu_instance, namespace, revision, kubeconfig
762     ) -> str:
763 1         return "env KUBECONFIG={} {} rollback {} {} --wait".format(
764             kubeconfig, self._helm_command, kdu_instance, revision
765         )
766
767 1     def _get_uninstall_command(
768         self, kdu_instance: str, namespace: str, kubeconfig: str
769     ) -> str:
770 1         return "env KUBECONFIG={} {} delete --purge  {}".format(
771             kubeconfig, self._helm_command, kdu_instance
772         )