65cbdac525b7813f09e701f7107aa7bd42015eea
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43 service_account = "osm"
44
45 def __init__(
46 self,
47 fs: object,
48 db: object,
49 kubectl_command: str = "/usr/bin/kubectl",
50 helm_command: str = "/usr/bin/helm",
51 log: object = None,
52 on_update_db=None,
53 ):
54 """
55
56 :param fs: file system for kubernetes and helm configuration
57 :param db: database object to write current operation status
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
60 :param log: logger
61 :param on_update_db: callback called when k8s connector updates database
62 """
63
64 # parent class
65 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
66
67 self.log.info("Initializing K8S Helm connector")
68
69 # random numbers for release name generation
70 random.seed(time.time())
71
72 # the file system
73 self.fs = fs
74
75 # exception if kubectl is not installed
76 self.kubectl_command = kubectl_command
77 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
78
79 # exception if helm is not installed
80 self._helm_command = helm_command
81 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
82
83 # initialize helm client-only
84 self.log.debug("Initializing helm client-only...")
85 command = "{} init --client-only".format(self._helm_command)
86 try:
87 asyncio.ensure_future(
88 self._local_async_exec(command=command, raise_exception_on_error=False)
89 )
90 # loop = asyncio.get_event_loop()
91 # loop.run_until_complete(self._local_async_exec(command=command,
92 # raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(
95 msg="helm init failed (it was already initialized): {}".format(e)
96 )
97
98 self.log.info("K8S Helm connector initialized")
99
100 @staticmethod
101 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
102 """
103 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
104 cluster_id for backward compatibility
105 """
106 namespace, _, cluster_id = cluster_uuid.rpartition(':')
107 return namespace, cluster_id
108
109 async def init_env(
110 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts on both sides:
114 client (OSM)
115 server (Tiller)
116
117 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 '.kube/config'
119 :param namespace: optional namespace to be used for helm. By default,
120 'kube-system' will be used
121 :param reuse_cluster_uuid: existing cluster uuid for reuse
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
129 namespace = namespace_ or namespace
130 else:
131 cluster_id = str(uuid4())
132 cluster_uuid = "{}:{}".format(namespace, cluster_id)
133
134 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
135
136 # create config filename
137 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
138 cluster_name=cluster_id, create_if_not_exist=True
139 )
140 with open(config_filename, "w") as f:
141 f.write(k8s_creds)
142
143 # check if tiller pod is up in cluster
144 command = "{} --kubeconfig={} --namespace={} get deployments".format(
145 self.kubectl_command, config_filename, namespace
146 )
147 output, _rc = await self._local_async_exec(
148 command=command, raise_exception_on_error=True
149 )
150
151 output_table = self._output_to_table(output=output)
152
153 # find 'tiller' pod in all pods
154 already_initialized = False
155 try:
156 for row in output_table:
157 if row[0].startswith("tiller-deploy"):
158 already_initialized = True
159 break
160 except Exception:
161 pass
162
163 # helm init
164 n2vc_installed_sw = False
165 if not already_initialized:
166 self.log.info(
167 "Initializing helm in client and server: {}".format(cluster_id)
168 )
169 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
170 self.kubectl_command, config_filename, self.service_account)
171 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
172
173 command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
174 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
175 ).format(self.kubectl_command, config_filename, self.service_account)
176 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
179 "init").format(self._helm_command, config_filename, namespace, helm_dir,
180 self.service_account)
181 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
182 n2vc_installed_sw = True
183 else:
184 # check client helm installation
185 check_file = helm_dir + "/repository/repositories.yaml"
186 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
187 self.log.info("Initializing helm in client: {}".format(cluster_id))
188 command = (
189 "{} --kubeconfig={} --tiller-namespace={} "
190 "--home={} init --client-only"
191 ).format(self._helm_command, config_filename, namespace, helm_dir)
192 output, _rc = await self._local_async_exec(
193 command=command, raise_exception_on_error=True
194 )
195 else:
196 self.log.info("Helm client already initialized")
197
198 # sync fs with local data
199 self.fs.reverse_sync(from_path=cluster_id)
200
201 self.log.info("Cluster {} initialized".format(cluster_id))
202
203 return cluster_uuid, n2vc_installed_sw
204
205 async def repo_add(
206 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
207 ):
208 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
209 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
210 cluster_id, repo_type, name, url))
211
212 # config filename
213 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
214 cluster_name=cluster_id, create_if_not_exist=True
215 )
216
217 # sync local dir
218 self.fs.sync(from_path=cluster_id)
219
220 # helm repo update
221 command = "{} --kubeconfig={} --home={} repo update".format(
222 self._helm_command, config_filename, helm_dir
223 )
224 self.log.debug("updating repo: {}".format(command))
225 await self._local_async_exec(command=command, raise_exception_on_error=False)
226
227 # helm repo add name url
228 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
229 self._helm_command, config_filename, helm_dir, name, url
230 )
231 self.log.debug("adding repo: {}".format(command))
232 await self._local_async_exec(command=command, raise_exception_on_error=True)
233
234 # sync fs
235 self.fs.reverse_sync(from_path=cluster_id)
236
237 async def repo_list(self, cluster_uuid: str) -> list:
238 """
239 Get the list of registered repositories
240
241 :return: list of registered repositories: [ (name, url) .... ]
242 """
243
244 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
245 self.log.debug("list repositories for cluster {}".format(cluster_id))
246
247 # config filename
248 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
249 cluster_name=cluster_id, create_if_not_exist=True
250 )
251
252 # sync local dir
253 self.fs.sync(from_path=cluster_id)
254
255 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
256 self._helm_command, config_filename, helm_dir
257 )
258
259 output, _rc = await self._local_async_exec(
260 command=command, raise_exception_on_error=True
261 )
262 if output and len(output) > 0:
263 return yaml.load(output, Loader=yaml.SafeLoader)
264 else:
265 return []
266
267 # sync fs
268 self.fs.reverse_sync(from_path=cluster_id)
269
270 async def repo_remove(self, cluster_uuid: str, name: str):
271 """
272 Remove a repository from OSM
273
274 :param cluster_uuid: the cluster or 'namespace:cluster'
275 :param name: repo name in OSM
276 :return: True if successful
277 """
278
279 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
280 self.log.debug("list repositories for cluster {}".format(cluster_id))
281
282 # config filename
283 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
284 cluster_name=cluster_id, create_if_not_exist=True
285 )
286
287 # sync local dir
288 self.fs.sync(from_path=cluster_id)
289
290 command = "{} --kubeconfig={} --home={} repo remove {}".format(
291 self._helm_command, config_filename, helm_dir, name
292 )
293
294 await self._local_async_exec(command=command, raise_exception_on_error=True)
295
296 # sync fs
297 self.fs.reverse_sync(from_path=cluster_id)
298
299 async def reset(
300 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
301 ) -> bool:
302
303 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
304 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
305 .format(cluster_id, uninstall_sw))
306
307 # get kube and helm directories
308 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
309 cluster_name=cluster_id, create_if_not_exist=False
310 )
311
312 # sync local dir
313 self.fs.sync(from_path=cluster_id)
314
315 # uninstall releases if needed.
316 if uninstall_sw:
317 releases = await self.instances_list(cluster_uuid=cluster_uuid)
318 if len(releases) > 0:
319 if force:
320 for r in releases:
321 try:
322 kdu_instance = r.get("Name")
323 chart = r.get("Chart")
324 self.log.debug(
325 "Uninstalling {} -> {}".format(chart, kdu_instance)
326 )
327 await self.uninstall(
328 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
329 )
330 except Exception as e:
331 self.log.error(
332 "Error uninstalling release {}: {}".format(kdu_instance, e)
333 )
334 else:
335 msg = (
336 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
337 ).format(cluster_id)
338 self.log.warn(msg)
339 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
340
341 if uninstall_sw:
342
343 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
344
345 if not namespace:
346 # find namespace for tiller pod
347 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
348 self.kubectl_command, config_filename
349 )
350 output, _rc = await self._local_async_exec(
351 command=command, raise_exception_on_error=False
352 )
353 output_table = K8sHelmConnector._output_to_table(output=output)
354 namespace = None
355 for r in output_table:
356 try:
357 if "tiller-deploy" in r[1]:
358 namespace = r[0]
359 break
360 except Exception:
361 pass
362 else:
363 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
364 self.log.error(msg)
365
366 self.log.debug("namespace for tiller: {}".format(namespace))
367
368 if namespace:
369 # uninstall tiller from cluster
370 self.log.debug(
371 "Uninstalling tiller from cluster {}".format(cluster_id)
372 )
373 command = "{} --kubeconfig={} --home={} reset".format(
374 self._helm_command, config_filename, helm_dir
375 )
376 self.log.debug("resetting: {}".format(command))
377 output, _rc = await self._local_async_exec(
378 command=command, raise_exception_on_error=True
379 )
380 # Delete clusterrolebinding and serviceaccount.
381 # Ignore if errors for backward compatibility
382 command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
383 "io/osm-tiller-cluster-rule").format(self.kubectl_command,
384 config_filename)
385 output, _rc = await self._local_async_exec(command=command,
386 raise_exception_on_error=False)
387 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
388 format(self.kubectl_command, config_filename, self.service_account)
389 output, _rc = await self._local_async_exec(command=command,
390 raise_exception_on_error=False)
391
392 else:
393 self.log.debug("namespace not found")
394
395 # delete cluster directory
396 self.log.debug("Removing directory {}".format(cluster_id))
397 self.fs.file_delete(cluster_id, ignore_non_exist=True)
398 # Remove also local directorio if still exist
399 direct = self.fs.path + "/" + cluster_id
400 shutil.rmtree(direct, ignore_errors=True)
401
402 return True
403
404 async def install(
405 self,
406 cluster_uuid: str,
407 kdu_model: str,
408 atomic: bool = True,
409 timeout: float = 300,
410 params: dict = None,
411 db_dict: dict = None,
412 kdu_name: str = None,
413 namespace: str = None,
414 ):
415
416 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
417 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
418
419 # config filename
420 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
421 cluster_name=cluster_id, create_if_not_exist=True
422 )
423
424 # sync local dir
425 self.log.debug("sync cluster_id: {}".format(_cluster_dir))
426 self.fs.sync(from_path=cluster_id)
427
428 # params to str
429 # params_str = K8sHelmConnector._params_to_set_option(params)
430 params_str, file_to_delete = self._params_to_file_option(
431 cluster_id=cluster_id, params=params
432 )
433
434 timeout_str = ""
435 if timeout:
436 timeout_str = "--timeout {}".format(timeout)
437
438 # atomic
439 atomic_str = ""
440 if atomic:
441 atomic_str = "--atomic"
442 # namespace
443 namespace_str = ""
444 if namespace:
445 namespace_str = "--namespace {}".format(namespace)
446
447 # version
448 version_str = ""
449 if ":" in kdu_model:
450 parts = kdu_model.split(sep=":")
451 if len(parts) == 2:
452 version_str = "--version {}".format(parts[1])
453 kdu_model = parts[0]
454
455 # generate a name for the release. Then, check if already exists
456 kdu_instance = None
457 while kdu_instance is None:
458 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
459 try:
460 result = await self._status_kdu(
461 cluster_id=cluster_id,
462 kdu_instance=kdu_instance,
463 show_error_log=False,
464 )
465 if result is not None:
466 # instance already exists: generate a new one
467 kdu_instance = None
468 except K8sException:
469 pass
470
471 # helm repo install
472 command = (
473 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
474 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
475 helm=self._helm_command,
476 atomic=atomic_str,
477 config=config_filename,
478 dir=helm_dir,
479 params=params_str,
480 timeout=timeout_str,
481 name=kdu_instance,
482 ns=namespace_str,
483 model=kdu_model,
484 ver=version_str,
485 )
486 )
487 self.log.debug("installing: {}".format(command))
488
489 if atomic:
490 # exec helm in a task
491 exec_task = asyncio.ensure_future(
492 coro_or_future=self._local_async_exec(
493 command=command, raise_exception_on_error=False
494 )
495 )
496
497 # write status in another task
498 status_task = asyncio.ensure_future(
499 coro_or_future=self._store_status(
500 cluster_id=cluster_id,
501 kdu_instance=kdu_instance,
502 db_dict=db_dict,
503 operation="install",
504 run_once=False,
505 )
506 )
507
508 # wait for execution task
509 await asyncio.wait([exec_task])
510
511 # cancel status task
512 status_task.cancel()
513
514 output, rc = exec_task.result()
515
516 else:
517
518 output, rc = await self._local_async_exec(
519 command=command, raise_exception_on_error=False
520 )
521
522 # remove temporal values yaml file
523 if file_to_delete:
524 os.remove(file_to_delete)
525
526 # write final status
527 await self._store_status(
528 cluster_id=cluster_id,
529 kdu_instance=kdu_instance,
530 db_dict=db_dict,
531 operation="install",
532 run_once=True,
533 check_every=0,
534 )
535
536 if rc != 0:
537 msg = "Error executing command: {}\nOutput: {}".format(command, output)
538 self.log.error(msg)
539 raise K8sException(msg)
540
541 # sync fs
542 self.fs.reverse_sync(from_path=cluster_id)
543
544 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
545 return kdu_instance
546
547 async def instances_list(self, cluster_uuid: str) -> list:
548 """
549 returns a list of deployed releases in a cluster
550
551 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
552 :return:
553 """
554
555 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
556 self.log.debug("list releases for cluster {}".format(cluster_id))
557
558 # config filename
559 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
560 cluster_name=cluster_id, create_if_not_exist=True
561 )
562
563 # sync local dir
564 self.fs.sync(from_path=cluster_id)
565
566 command = "{} --kubeconfig={} --home={} list --output yaml".format(
567 self._helm_command, config_filename, helm_dir
568 )
569
570 output, _rc = await self._local_async_exec(
571 command=command, raise_exception_on_error=True
572 )
573
574 if output and len(output) > 0:
575 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
576 else:
577 return []
578
579 async def upgrade(
580 self,
581 cluster_uuid: str,
582 kdu_instance: str,
583 kdu_model: str = None,
584 atomic: bool = True,
585 timeout: float = 300,
586 params: dict = None,
587 db_dict: dict = None,
588 ):
589
590 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
591 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
592
593 # config filename
594 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
595 cluster_name=cluster_id, create_if_not_exist=True
596 )
597
598 # sync local dir
599 self.fs.sync(from_path=cluster_id)
600
601 # params to str
602 # params_str = K8sHelmConnector._params_to_set_option(params)
603 params_str, file_to_delete = self._params_to_file_option(
604 cluster_id=cluster_id, params=params
605 )
606
607 timeout_str = ""
608 if timeout:
609 timeout_str = "--timeout {}".format(timeout)
610
611 # atomic
612 atomic_str = ""
613 if atomic:
614 atomic_str = "--atomic"
615
616 # version
617 version_str = ""
618 if kdu_model and ":" in kdu_model:
619 parts = kdu_model.split(sep=":")
620 if len(parts) == 2:
621 version_str = "--version {}".format(parts[1])
622 kdu_model = parts[0]
623
624 # helm repo upgrade
625 command = (
626 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
627 ).format(
628 self._helm_command,
629 atomic_str,
630 config_filename,
631 helm_dir,
632 params_str,
633 timeout_str,
634 kdu_instance,
635 kdu_model,
636 version_str,
637 )
638 self.log.debug("upgrading: {}".format(command))
639
640 if atomic:
641
642 # exec helm in a task
643 exec_task = asyncio.ensure_future(
644 coro_or_future=self._local_async_exec(
645 command=command, raise_exception_on_error=False
646 )
647 )
648 # write status in another task
649 status_task = asyncio.ensure_future(
650 coro_or_future=self._store_status(
651 cluster_id=cluster_id,
652 kdu_instance=kdu_instance,
653 db_dict=db_dict,
654 operation="upgrade",
655 run_once=False,
656 )
657 )
658
659 # wait for execution task
660 await asyncio.wait([exec_task])
661
662 # cancel status task
663 status_task.cancel()
664 output, rc = exec_task.result()
665
666 else:
667
668 output, rc = await self._local_async_exec(
669 command=command, raise_exception_on_error=False
670 )
671
672 # remove temporal values yaml file
673 if file_to_delete:
674 os.remove(file_to_delete)
675
676 # write final status
677 await self._store_status(
678 cluster_id=cluster_id,
679 kdu_instance=kdu_instance,
680 db_dict=db_dict,
681 operation="upgrade",
682 run_once=True,
683 check_every=0,
684 )
685
686 if rc != 0:
687 msg = "Error executing command: {}\nOutput: {}".format(command, output)
688 self.log.error(msg)
689 raise K8sException(msg)
690
691 # sync fs
692 self.fs.reverse_sync(from_path=cluster_id)
693
694 # return new revision number
695 instance = await self.get_instance_info(
696 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
697 )
698 if instance:
699 revision = int(instance.get("Revision"))
700 self.log.debug("New revision: {}".format(revision))
701 return revision
702 else:
703 return 0
704
705 async def rollback(
706 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
707 ):
708
709 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
710 self.log.debug(
711 "rollback kdu_instance {} to revision {} from cluster {}".format(
712 kdu_instance, revision, cluster_id
713 )
714 )
715
716 # config filename
717 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
718 cluster_name=cluster_id, create_if_not_exist=True
719 )
720
721 # sync local dir
722 self.fs.sync(from_path=cluster_id)
723
724 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
725 self._helm_command, config_filename, helm_dir, kdu_instance, revision
726 )
727
728 # exec helm in a task
729 exec_task = asyncio.ensure_future(
730 coro_or_future=self._local_async_exec(
731 command=command, raise_exception_on_error=False
732 )
733 )
734 # write status in another task
735 status_task = asyncio.ensure_future(
736 coro_or_future=self._store_status(
737 cluster_id=cluster_id,
738 kdu_instance=kdu_instance,
739 db_dict=db_dict,
740 operation="rollback",
741 run_once=False,
742 )
743 )
744
745 # wait for execution task
746 await asyncio.wait([exec_task])
747
748 # cancel status task
749 status_task.cancel()
750
751 output, rc = exec_task.result()
752
753 # write final status
754 await self._store_status(
755 cluster_id=cluster_id,
756 kdu_instance=kdu_instance,
757 db_dict=db_dict,
758 operation="rollback",
759 run_once=True,
760 check_every=0,
761 )
762
763 if rc != 0:
764 msg = "Error executing command: {}\nOutput: {}".format(command, output)
765 self.log.error(msg)
766 raise K8sException(msg)
767
768 # sync fs
769 self.fs.reverse_sync(from_path=cluster_id)
770
771 # return new revision number
772 instance = await self.get_instance_info(
773 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
774 )
775 if instance:
776 revision = int(instance.get("Revision"))
777 self.log.debug("New revision: {}".format(revision))
778 return revision
779 else:
780 return 0
781
782 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
783 """
784 Removes an existing KDU instance. It would implicitly use the `delete` call
785 (this call would happen after all _terminate-config-primitive_ of the VNF
786 are invoked).
787
788 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
789 :param kdu_instance: unique name for the KDU instance to be deleted
790 :return: True if successful
791 """
792
793 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
794 self.log.debug(
795 "uninstall kdu_instance {} from cluster {}".format(
796 kdu_instance, cluster_id
797 )
798 )
799
800 # config filename
801 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
802 cluster_name=cluster_id, create_if_not_exist=True
803 )
804
805 # sync local dir
806 self.fs.sync(from_path=cluster_id)
807
808 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
809 self._helm_command, config_filename, helm_dir, kdu_instance
810 )
811
812 output, _rc = await self._local_async_exec(
813 command=command, raise_exception_on_error=True
814 )
815
816 # sync fs
817 self.fs.reverse_sync(from_path=cluster_id)
818
819 return self._output_to_table(output)
820
821 async def exec_primitive(
822 self,
823 cluster_uuid: str = None,
824 kdu_instance: str = None,
825 primitive_name: str = None,
826 timeout: float = 300,
827 params: dict = None,
828 db_dict: dict = None,
829 ) -> str:
830 """Exec primitive (Juju action)
831
832 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
833 :param kdu_instance str: The unique name of the KDU instance
834 :param primitive_name: Name of action that will be executed
835 :param timeout: Timeout for action execution
836 :param params: Dictionary of all the parameters needed for the action
837 :db_dict: Dictionary for any additional data
838
839 :return: Returns the output of the action
840 """
841 raise K8sException(
842 "KDUs deployed with Helm don't support actions "
843 "different from rollback, upgrade and status"
844 )
845
846 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
847
848 self.log.debug(
849 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
850 )
851
852 return await self._exec_inspect_comand(
853 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
854 )
855
856 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
857
858 self.log.debug(
859 "inspect kdu_model values {} from (optional) repo: {}".format(
860 kdu_model, repo_url
861 )
862 )
863
864 return await self._exec_inspect_comand(
865 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
866 )
867
868 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
869
870 self.log.debug(
871 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
872 )
873
874 return await self._exec_inspect_comand(
875 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
876 )
877
878 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
879
880 # call internal function
881 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
882 return await self._status_kdu(
883 cluster_id=cluster_id,
884 kdu_instance=kdu_instance,
885 show_error_log=True,
886 return_text=True,
887 )
888
889 async def get_services(self,
890 cluster_uuid: str,
891 kdu_instance: str,
892 namespace: str) -> list:
893
894 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
895 self.log.debug(
896 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
897 cluster_uuid, kdu_instance
898 )
899 )
900
901 # sync local dir
902 self.fs.sync(from_path=cluster_id)
903
904 status = await self._status_kdu(
905 cluster_id, kdu_instance, return_text=False
906 )
907
908 service_names = self._parse_helm_status_service_info(status)
909 service_list = []
910 for service in service_names:
911 service = await self.get_service(cluster_uuid, service, namespace)
912 service_list.append(service)
913
914 # sync fs
915 self.fs.reverse_sync(from_path=cluster_id)
916
917 return service_list
918
919 async def get_service(self,
920 cluster_uuid: str,
921 service_name: str,
922 namespace: str) -> object:
923
924 self.log.debug(
925 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
926 service_name, namespace, cluster_uuid)
927 )
928
929 # get paths
930 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
931 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
932 cluster_name=cluster_id, create_if_not_exist=True
933 )
934
935 # sync local dir
936 self.fs.sync(from_path=cluster_id)
937
938 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
939 self.kubectl_command, config_filename, namespace, service_name
940 )
941
942 output, _rc = await self._local_async_exec(
943 command=command, raise_exception_on_error=True
944 )
945
946 data = yaml.load(output, Loader=yaml.SafeLoader)
947
948 service = {
949 "name": service_name,
950 "type": self._get_deep(data, ("spec", "type")),
951 "ports": self._get_deep(data, ("spec", "ports")),
952 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
953 }
954 if service["type"] == "LoadBalancer":
955 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
956 ip_list = [elem["ip"] for elem in ip_map_list]
957 service["external_ip"] = ip_list
958
959 # sync fs
960 self.fs.reverse_sync(from_path=cluster_id)
961
962 return service
963
964 async def synchronize_repos(self, cluster_uuid: str):
965
966 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
967 self.log.debug("syncronize repos for cluster helm-id: {}",)
968 try:
969 update_repos_timeout = (
970 300 # max timeout to sync a single repos, more than this is too much
971 )
972 db_k8scluster = self.db.get_one(
973 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
974 )
975 if db_k8scluster:
976 nbi_repo_list = (
977 db_k8scluster.get("_admin").get("helm_chart_repos") or []
978 )
979 cluster_repo_dict = (
980 db_k8scluster.get("_admin").get("helm_charts_added") or {}
981 )
982 # elements that must be deleted
983 deleted_repo_list = []
984 added_repo_dict = {}
985 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
986 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
987
988 # obtain repos to add: registered by nbi but not added
989 repos_to_add = [
990 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
991 ]
992
993 # obtain repos to delete: added by cluster but not in nbi list
994 repos_to_delete = [
995 repo
996 for repo in cluster_repo_dict.keys()
997 if repo not in nbi_repo_list
998 ]
999
1000 # delete repos: must delete first then add because there may be
1001 # different repos with same name but
1002 # different id and url
1003 self.log.debug("repos to delete: {}".format(repos_to_delete))
1004 for repo_id in repos_to_delete:
1005 # try to delete repos
1006 try:
1007 repo_delete_task = asyncio.ensure_future(
1008 self.repo_remove(
1009 cluster_uuid=cluster_uuid,
1010 name=cluster_repo_dict[repo_id],
1011 )
1012 )
1013 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
1014 except Exception as e:
1015 self.warning(
1016 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
1017 repo_id, cluster_repo_dict[repo_id], str(e)
1018 )
1019 )
1020 # always add to the list of to_delete if there is an error
1021 # because if is not there
1022 # deleting raises error
1023 deleted_repo_list.append(repo_id)
1024
1025 # add repos
1026 self.log.debug("repos to add: {}".format(repos_to_add))
1027 for repo_id in repos_to_add:
1028 # obtain the repo data from the db
1029 # if there is an error getting the repo in the database we will
1030 # ignore this repo and continue
1031 # because there is a possible race condition where the repo has
1032 # been deleted while processing
1033 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1034 self.log.debug(
1035 "obtained repo: id, {}, name: {}, url: {}".format(
1036 repo_id, db_repo["name"], db_repo["url"]
1037 )
1038 )
1039 try:
1040 repo_add_task = asyncio.ensure_future(
1041 self.repo_add(
1042 cluster_uuid=cluster_uuid,
1043 name=db_repo["name"],
1044 url=db_repo["url"],
1045 repo_type="chart",
1046 )
1047 )
1048 await asyncio.wait_for(repo_add_task, update_repos_timeout)
1049 added_repo_dict[repo_id] = db_repo["name"]
1050 self.log.debug(
1051 "added repo: id, {}, name: {}".format(
1052 repo_id, db_repo["name"]
1053 )
1054 )
1055 except Exception as e:
1056 # deal with error adding repo, adding a repo that already
1057 # exists does not raise any error
1058 # will not raise error because a wrong repos added by
1059 # anyone could prevent instantiating any ns
1060 self.log.error(
1061 "Error adding repo id: {}, err_msg: {} ".format(
1062 repo_id, repr(e)
1063 )
1064 )
1065
1066 return deleted_repo_list, added_repo_dict
1067
1068 else: # else db_k8scluster does not exist
1069 raise K8sException(
1070 "k8cluster with helm-id : {} not found".format(cluster_uuid)
1071 )
1072
1073 except Exception as e:
1074 self.log.error("Error synchronizing repos: {}".format(str(e)))
1075 raise K8sException("Error synchronizing repos")
1076
1077 """
1078 ####################################################################################
1079 ################################### P R I V A T E ##################################
1080 ####################################################################################
1081 """
1082
1083 async def _exec_inspect_comand(
1084 self, inspect_command: str, kdu_model: str, repo_url: str = None
1085 ):
1086
1087 repo_str = ""
1088 if repo_url:
1089 repo_str = " --repo {}".format(repo_url)
1090 idx = kdu_model.find("/")
1091 if idx >= 0:
1092 idx += 1
1093 kdu_model = kdu_model[idx:]
1094
1095 inspect_command = "{} inspect {} {}{}".format(
1096 self._helm_command, inspect_command, kdu_model, repo_str
1097 )
1098 output, _rc = await self._local_async_exec(
1099 command=inspect_command, encode_utf8=True
1100 )
1101
1102 return output
1103
1104 async def _status_kdu(
1105 self,
1106 cluster_id: str,
1107 kdu_instance: str,
1108 show_error_log: bool = False,
1109 return_text: bool = False,
1110 ):
1111
1112 self.log.debug("status of kdu_instance {}".format(kdu_instance))
1113
1114 # config filename
1115 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
1116 cluster_name=cluster_id, create_if_not_exist=True
1117 )
1118
1119 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
1120 self._helm_command, config_filename, helm_dir, kdu_instance
1121 )
1122
1123 output, rc = await self._local_async_exec(
1124 command=command,
1125 raise_exception_on_error=True,
1126 show_error_log=show_error_log,
1127 )
1128
1129 if return_text:
1130 return str(output)
1131
1132 if rc != 0:
1133 return None
1134
1135 data = yaml.load(output, Loader=yaml.SafeLoader)
1136
1137 # remove field 'notes'
1138 try:
1139 del data.get("info").get("status")["notes"]
1140 except KeyError:
1141 pass
1142
1143 # parse field 'resources'
1144 try:
1145 resources = str(data.get("info").get("status").get("resources"))
1146 resource_table = self._output_to_table(resources)
1147 data.get("info").get("status")["resources"] = resource_table
1148 except Exception:
1149 pass
1150
1151 return data
1152
1153 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1154 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1155 for instance in instances:
1156 if instance.get("Name") == kdu_instance:
1157 return instance
1158 self.log.debug("Instance {} not found".format(kdu_instance))
1159 return None
1160
1161 @staticmethod
1162 def _generate_release_name(chart_name: str):
1163 # check embeded chart (file or dir)
1164 if chart_name.startswith("/"):
1165 # extract file or directory name
1166 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1167 # check URL
1168 elif "://" in chart_name:
1169 # extract last portion of URL
1170 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1171
1172 name = ""
1173 for c in chart_name:
1174 if c.isalpha() or c.isnumeric():
1175 name += c
1176 else:
1177 name += "-"
1178 if len(name) > 35:
1179 name = name[0:35]
1180
1181 # if does not start with alpha character, prefix 'a'
1182 if not name[0].isalpha():
1183 name = "a" + name
1184
1185 name += "-"
1186
1187 def get_random_number():
1188 r = random.randrange(start=1, stop=99999999)
1189 s = str(r)
1190 s = s.rjust(10, "0")
1191 return s
1192
1193 name = name + get_random_number()
1194 return name.lower()
1195
1196 async def _store_status(
1197 self,
1198 cluster_id: str,
1199 operation: str,
1200 kdu_instance: str,
1201 check_every: float = 10,
1202 db_dict: dict = None,
1203 run_once: bool = False,
1204 ):
1205 while True:
1206 try:
1207 await asyncio.sleep(check_every)
1208 detailed_status = await self._status_kdu(
1209 cluster_id=cluster_id, kdu_instance=kdu_instance,
1210 return_text=False
1211 )
1212 status = detailed_status.get("info").get("Description")
1213 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1214 # write status to db
1215 result = await self.write_app_status_to_db(
1216 db_dict=db_dict,
1217 status=str(status),
1218 detailed_status=str(detailed_status),
1219 operation=operation,
1220 )
1221 if not result:
1222 self.log.info("Error writing in database. Task exiting...")
1223 return
1224 except asyncio.CancelledError:
1225 self.log.debug("Task cancelled")
1226 return
1227 except Exception as e:
1228 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1229 pass
1230 finally:
1231 if run_once:
1232 return
1233
1234 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
1235
1236 status = await self._status_kdu(
1237 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
1238 )
1239
1240 # extract info.status.resources-> str
1241 # format:
1242 # ==> v1/Deployment
1243 # NAME READY UP-TO-DATE AVAILABLE AGE
1244 # halting-horse-mongodb 0/1 1 0 0s
1245 # halting-petit-mongodb 1/1 1 0 0s
1246 # blank line
1247 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1248
1249 # convert to table
1250 resources = K8sHelmConnector._output_to_table(resources)
1251
1252 num_lines = len(resources)
1253 index = 0
1254 while index < num_lines:
1255 try:
1256 line1 = resources[index]
1257 index += 1
1258 # find '==>' in column 0
1259 if line1[0] == "==>":
1260 line2 = resources[index]
1261 index += 1
1262 # find READY in column 1
1263 if line2[1] == "READY":
1264 # read next lines
1265 line3 = resources[index]
1266 index += 1
1267 while len(line3) > 1 and index < num_lines:
1268 ready_value = line3[1]
1269 parts = ready_value.split(sep="/")
1270 current = int(parts[0])
1271 total = int(parts[1])
1272 if current < total:
1273 self.log.debug("NOT READY:\n {}".format(line3))
1274 ready = False
1275 line3 = resources[index]
1276 index += 1
1277
1278 except Exception:
1279 pass
1280
1281 return ready
1282
1283 def _parse_helm_status_service_info(self, status):
1284
1285 # extract info.status.resources-> str
1286 # format:
1287 # ==> v1/Deployment
1288 # NAME READY UP-TO-DATE AVAILABLE AGE
1289 # halting-horse-mongodb 0/1 1 0 0s
1290 # halting-petit-mongodb 1/1 1 0 0s
1291 # blank line
1292 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1293
1294 service_list = []
1295 first_line_skipped = service_found = False
1296 for line in resources:
1297 if not service_found:
1298 if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
1299 service_found = True
1300 continue
1301 else:
1302 if len(line) >= 2 and line[0] == "==>":
1303 service_found = first_line_skipped = False
1304 continue
1305 if not line:
1306 continue
1307 if not first_line_skipped:
1308 first_line_skipped = True
1309 continue
1310 service_list.append(line[0])
1311
1312 return service_list
1313
1314 @staticmethod
1315 def _get_deep(dictionary: dict, members: tuple):
1316 target = dictionary
1317 value = None
1318 try:
1319 for m in members:
1320 value = target.get(m)
1321 if not value:
1322 return None
1323 else:
1324 target = value
1325 except Exception:
1326 pass
1327 return value
1328
1329 # find key:value in several lines
1330 @staticmethod
1331 def _find_in_lines(p_lines: list, p_key: str) -> str:
1332 for line in p_lines:
1333 try:
1334 if line.startswith(p_key + ":"):
1335 parts = line.split(":")
1336 the_value = parts[1].strip()
1337 return the_value
1338 except Exception:
1339 # ignore it
1340 pass
1341 return None
1342
1343 # params for use in -f file
1344 # returns values file option and filename (in order to delete it at the end)
1345 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1346
1347 if params and len(params) > 0:
1348 self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
1349
1350 def get_random_number():
1351 r = random.randrange(start=1, stop=99999999)
1352 s = str(r)
1353 while len(s) < 10:
1354 s = "0" + s
1355 return s
1356
1357 params2 = dict()
1358 for key in params:
1359 value = params.get(key)
1360 if "!!yaml" in str(value):
1361 value = yaml.load(value[7:])
1362 params2[key] = value
1363
1364 values_file = get_random_number() + ".yaml"
1365 with open(values_file, "w") as stream:
1366 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1367
1368 return "-f {}".format(values_file), values_file
1369
1370 return "", None
1371
1372 # params for use in --set option
1373 @staticmethod
1374 def _params_to_set_option(params: dict) -> str:
1375 params_str = ""
1376 if params and len(params) > 0:
1377 start = True
1378 for key in params:
1379 value = params.get(key, None)
1380 if value is not None:
1381 if start:
1382 params_str += "--set "
1383 start = False
1384 else:
1385 params_str += ","
1386 params_str += "{}={}".format(key, value)
1387 return params_str
1388
1389 @staticmethod
1390 def _output_to_lines(output: str) -> list:
1391 output_lines = list()
1392 lines = output.splitlines(keepends=False)
1393 for line in lines:
1394 line = line.strip()
1395 if len(line) > 0:
1396 output_lines.append(line)
1397 return output_lines
1398
1399 @staticmethod
1400 def _output_to_table(output: str) -> list:
1401 output_table = list()
1402 lines = output.splitlines(keepends=False)
1403 for line in lines:
1404 line = line.replace("\t", " ")
1405 line_list = list()
1406 output_table.append(line_list)
1407 cells = line.split(sep=" ")
1408 for cell in cells:
1409 cell = cell.strip()
1410 if len(cell) > 0:
1411 line_list.append(cell)
1412 return output_table
1413
1414 def _get_paths(
1415 self, cluster_name: str, create_if_not_exist: bool = False
1416 ) -> (str, str, str, str):
1417 """
1418 Returns kube and helm directories
1419
1420 :param cluster_name:
1421 :param create_if_not_exist:
1422 :return: kube, helm directories, config filename and cluster dir.
1423 Raises exception if not exist and cannot create
1424 """
1425
1426 base = self.fs.path
1427 if base.endswith("/") or base.endswith("\\"):
1428 base = base[:-1]
1429
1430 # base dir for cluster
1431 cluster_dir = base + "/" + cluster_name
1432 if create_if_not_exist and not os.path.exists(cluster_dir):
1433 self.log.debug("Creating dir {}".format(cluster_dir))
1434 os.makedirs(cluster_dir)
1435 if not os.path.exists(cluster_dir):
1436 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1437 self.log.error(msg)
1438 raise K8sException(msg)
1439
1440 # kube dir
1441 kube_dir = cluster_dir + "/" + ".kube"
1442 if create_if_not_exist and not os.path.exists(kube_dir):
1443 self.log.debug("Creating dir {}".format(kube_dir))
1444 os.makedirs(kube_dir)
1445 if not os.path.exists(kube_dir):
1446 msg = "Kube config dir {} does not exist".format(kube_dir)
1447 self.log.error(msg)
1448 raise K8sException(msg)
1449
1450 # helm home dir
1451 helm_dir = cluster_dir + "/" + ".helm"
1452 if create_if_not_exist and not os.path.exists(helm_dir):
1453 self.log.debug("Creating dir {}".format(helm_dir))
1454 os.makedirs(helm_dir)
1455 if not os.path.exists(helm_dir):
1456 msg = "Helm config dir {} does not exist".format(helm_dir)
1457 self.log.error(msg)
1458 raise K8sException(msg)
1459
1460 config_filename = kube_dir + "/config"
1461 return kube_dir, helm_dir, config_filename, cluster_dir
1462
1463 @staticmethod
1464 def _remove_multiple_spaces(strobj):
1465 strobj = strobj.strip()
1466 while " " in strobj:
1467 strobj = strobj.replace(" ", " ")
1468 return strobj
1469
1470 def _local_exec(self, command: str) -> (str, int):
1471 command = K8sHelmConnector._remove_multiple_spaces(command)
1472 self.log.debug("Executing sync local command: {}".format(command))
1473 # raise exception if fails
1474 output = ""
1475 try:
1476 output = subprocess.check_output(
1477 command, shell=True, universal_newlines=True
1478 )
1479 return_code = 0
1480 self.log.debug(output)
1481 except Exception:
1482 return_code = 1
1483
1484 return output, return_code
1485
1486 async def _local_async_exec(
1487 self,
1488 command: str,
1489 raise_exception_on_error: bool = False,
1490 show_error_log: bool = True,
1491 encode_utf8: bool = False,
1492 ) -> (str, int):
1493
1494 command = K8sHelmConnector._remove_multiple_spaces(command)
1495 self.log.debug("Executing async local command: {}".format(command))
1496
1497 # split command
1498 command = command.split(sep=" ")
1499
1500 try:
1501 process = await asyncio.create_subprocess_exec(
1502 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1503 )
1504
1505 # wait for command terminate
1506 stdout, stderr = await process.communicate()
1507
1508 return_code = process.returncode
1509
1510 output = ""
1511 if stdout:
1512 output = stdout.decode("utf-8").strip()
1513 # output = stdout.decode()
1514 if stderr:
1515 output = stderr.decode("utf-8").strip()
1516 # output = stderr.decode()
1517
1518 if return_code != 0 and show_error_log:
1519 self.log.debug(
1520 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1521 )
1522 else:
1523 self.log.debug("Return code: {}".format(return_code))
1524
1525 if raise_exception_on_error and return_code != 0:
1526 raise K8sException(output)
1527
1528 if encode_utf8:
1529 output = output.encode("utf-8").strip()
1530 output = str(output).replace("\\n", "\n")
1531
1532 return output, return_code
1533
1534 except asyncio.CancelledError:
1535 raise
1536 except K8sException:
1537 raise
1538 except Exception as e:
1539 msg = "Exception executing command: {} -> {}".format(command, e)
1540 self.log.error(msg)
1541 if raise_exception_on_error:
1542 raise K8sException(e) from e
1543 else:
1544 return "", -1
1545
1546 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1547 # self.log.debug('Checking if file {} exists...'.format(filename))
1548 if os.path.exists(filename):
1549 return True
1550 else:
1551 msg = "File {} does not exist".format(filename)
1552 if exception_if_not_exists:
1553 # self.log.error(msg)
1554 raise K8sException(msg)