blob: 88c94c5c49e535ef58fbf0b3d987a373d86fb181 [file] [log] [blame]
quilesj26c78a42019-10-28 18:10:42 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22
23import paramiko
24import subprocess
25import os
26import shutil
27import asyncio
28import uuid
29import time
30import yaml
31from uuid import uuid4
32import random
33from n2vc.k8s_conn import K8sConnector
34
35
36class K8sHelmConnector(K8sConnector):
37
38 """
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = '/usr/bin/kubectl',
49 helm_command: str = '/usr/bin/helm',
50 log: object = None,
51 on_update_db=None
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(
65 self,
66 db=db,
67 log=log,
68 on_update_db=on_update_db
69 )
70
71 self.info('Initializing K8S Helm connector')
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 self.info('K8S Helm connector initialized')
88
89 async def init_env(
90 self,
91 k8s_creds: str,
92 namespace: str = 'kube-system',
93 reuse_cluster_uuid=None
94 ) -> (str, bool):
95
96 cluster_uuid = reuse_cluster_uuid
97 if not cluster_uuid:
98 cluster_uuid = str(uuid4())
99
100 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
101
102 # create config filename
103 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
104 f = open(config_filename, "w")
105 f.write(k8s_creds)
106 f.close()
107
108 # check if tiller pod is up in cluster
109 command = '{} --kubeconfig={} --namespace={} get deployments'\
110 .format(self.kubectl_command, config_filename, namespace)
111 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
112
113 output_table = K8sHelmConnector._output_to_table(output=output)
114
115 # find 'tiller' pod in all pods
116 already_initialized = False
117 try:
118 for row in output_table:
119 if row[0].startswith('tiller-deploy'):
120 already_initialized = True
121 break
122 except Exception as e:
123 pass
124
125 # helm init
126 n2vc_installed_sw = False
127 if not already_initialized:
128 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
129 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
130 .format(self._helm_command, config_filename, namespace, helm_dir)
131 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
132 n2vc_installed_sw = True
133 else:
134 # check client helm installation
135 check_file = helm_dir + '/repository/repositories.yaml'
136 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
137 self.info('Initializing helm in client: {}'.format(cluster_uuid))
138 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
139 .format(self._helm_command, config_filename, namespace, helm_dir)
140 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
141 else:
142 self.info('Helm client already initialized')
143
144 self.info('Cluster initialized {}'.format(cluster_uuid))
145
146 return cluster_uuid, n2vc_installed_sw
147
148 async def repo_add(
149 self,
150 cluster_uuid: str,
151 name: str,
152 url: str,
153 repo_type: str = 'chart'
154 ):
155
156 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
157
158 # config filename
159 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
160
161 # helm repo update
162 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
163 self.debug('updating repo: {}'.format(command))
164 await self._local_async_exec(command=command, raise_exception_on_error=False)
165
166 # helm repo add name url
167 command = '{} --kubeconfig={} --home={} repo add {} {}'\
168 .format(self._helm_command, config_filename, helm_dir, name, url)
169 self.debug('adding repo: {}'.format(command))
170 await self._local_async_exec(command=command, raise_exception_on_error=True)
171
172 async def repo_list(
173 self,
174 cluster_uuid: str
175 ) -> list:
176 """
177 Get the list of registered repositories
178
179 :return: list of registered repositories: [ (name, url) .... ]
180 """
181
182 self.debug('list repositories for cluster {}'.format(cluster_uuid))
183
184 # config filename
185 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
186
187 command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
188
189 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
190 if output and len(output) > 0:
191 return yaml.load(output, Loader=yaml.SafeLoader)
192 else:
193 return []
194
195 async def repo_remove(
196 self,
197 cluster_uuid: str,
198 name: str
199 ):
200 """
201 Remove a repository from OSM
202
203 :param cluster_uuid: the cluster
204 :param name: repo name in OSM
205 :return: True if successful
206 """
207
208 self.debug('list repositories for cluster {}'.format(cluster_uuid))
209
210 # config filename
211 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
212
213 command = '{} --kubeconfig={} --home={} repo remove {}'\
214 .format(self._helm_command, config_filename, helm_dir, name)
215
216 await self._local_async_exec(command=command, raise_exception_on_error=True)
217
218 async def reset(
219 self,
220 cluster_uuid: str,
221 force: bool = False,
222 uninstall_sw: bool = False
223 ) -> bool:
224
225 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
226
227 # get kube and helm directories
228 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
229
230 # uninstall releases if needed
231 releases = await self.instances_list(cluster_uuid=cluster_uuid)
232 if len(releases) > 0:
233 if force:
234 for r in releases:
235 try:
236 kdu_instance = r.get('Name')
237 chart = r.get('Chart')
238 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
239 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
240 except Exception as e:
241 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
242 else:
243 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
244 .format(cluster_uuid)
245 self.error(msg)
246 raise Exception(msg)
247
248 if uninstall_sw:
249
250 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
251
252 # find namespace for tiller pod
253 command = '{} --kubeconfig={} get deployments --all-namespaces'\
254 .format(self.kubectl_command, config_filename)
255 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
256 output_table = K8sHelmConnector._output_to_table(output=output)
257 namespace = None
258 for r in output_table:
259 try:
260 if 'tiller-deploy' in r[1]:
261 namespace = r[0]
262 break
263 except Exception as e:
264 pass
265 else:
266 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
267 self.error(msg)
268 # raise Exception(msg)
269
270 self.debug('namespace for tiller: {}'.format(namespace))
271
272 force_str = '--force'
273
274 if namespace:
275 # delete tiller deployment
276 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
277 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
278 .format(self.kubectl_command, namespace, config_filename, force_str)
279 await self._local_async_exec(command=command, raise_exception_on_error=False)
280
281 # uninstall tiller from cluster
282 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
283 command = '{} --kubeconfig={} --home={} reset'\
284 .format(self._helm_command, config_filename, helm_dir)
285 self.debug('resetting: {}'.format(command))
286 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
287 else:
288 self.debug('namespace not found')
289
290 # delete cluster directory
291 dir = self.fs.path + '/' + cluster_uuid
292 self.debug('Removing directory {}'.format(dir))
293 shutil.rmtree(dir, ignore_errors=True)
294
295 return True
296
297 async def install(
298 self,
299 cluster_uuid: str,
300 kdu_model: str,
301 atomic: bool = True,
302 timeout: float = 300,
303 params: dict = None,
304 db_dict: dict = None
305 ):
306
307 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
308
309 start = time.time()
310 end = start + timeout
311
312 # config filename
313 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
314
315 # params to str
316 params_str = K8sHelmConnector._params_to_set_option(params)
317
318 timeout_str = ''
319 if timeout:
320 timeout_str = '--timeout {}'.format(timeout)
321
322 # atomic
323 atomic_str = ''
324 if atomic:
325 atomic_str = '--atomic'
326
327 # version
328 version_str = ''
329 if ':' in kdu_model:
330 parts = kdu_model.split(sep=':')
331 if len(parts) == 2:
332 version_str = '--version {}'.format(parts[1])
333 kdu_model = parts[0]
334
335 # generate a name for the releas. Then, check if already exists
336 kdu_instance = None
337 while kdu_instance is None:
338 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
339 try:
340 result = await self._status_kdu(
341 cluster_uuid=cluster_uuid,
342 kdu_instance=kdu_instance,
343 show_error_log=False
344 )
345 if result is not None:
346 # instance already exists: generate a new one
347 kdu_instance = None
348 except:
349 kdu_instance = None
350
351 # helm repo install
352 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
353 .format(self._helm_command, atomic_str, config_filename, helm_dir,
354 params_str, timeout_str, kdu_instance, kdu_model, version_str)
355 self.debug('installing: {}'.format(command))
356
357 if atomic:
358 # exec helm in a task
359 exec_task = asyncio.ensure_future(
360 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
361 )
362 # write status in another task
363 status_task = asyncio.ensure_future(
364 coro_or_future=self._store_status(
365 cluster_uuid=cluster_uuid,
366 kdu_instance=kdu_instance,
367 db_dict=db_dict,
368 operation='install',
369 run_once=False
370 )
371 )
372
373 # wait for execution task
374 await asyncio.wait([exec_task])
375
376 # cancel status task
377 status_task.cancel()
378
379 output, rc = exec_task.result()
380
381 else:
382
383 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
384
385 # write final status
386 await self._store_status(
387 cluster_uuid=cluster_uuid,
388 kdu_instance=kdu_instance,
389 db_dict=db_dict,
390 operation='install',
391 run_once=True,
392 check_every=0
393 )
394
395 if rc != 0:
396 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
397 self.error(msg)
398 raise Exception(msg)
399
400 self.debug('Returning kdu_instance {}'.format(kdu_instance))
401 return kdu_instance
402
403 async def instances_list(
404 self,
405 cluster_uuid: str
406 ) -> list:
407 """
408 returns a list of deployed releases in a cluster
409
410 :param cluster_uuid: the cluster
411 :return:
412 """
413
414 self.debug('list releases for cluster {}'.format(cluster_uuid))
415
416 # config filename
417 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
418
419 command = '{} --kubeconfig={} --home={} list --output yaml'\
420 .format(self._helm_command, config_filename, helm_dir)
421
422 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
423
424 if output and len(output) > 0:
425 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
426 else:
427 return []
428
429 async def upgrade(
430 self,
431 cluster_uuid: str,
432 kdu_instance: str,
433 kdu_model: str = None,
434 atomic: bool = True,
435 timeout: float = 300,
436 params: dict = None,
437 db_dict: dict = None
438 ):
439
440 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
441
442 start = time.time()
443 end = start + timeout
444
445 # config filename
446 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
447
448 # params to str
449 params_str = K8sHelmConnector._params_to_set_option(params)
450
451 timeout_str = ''
452 if timeout:
453 timeout_str = '--timeout {}'.format(timeout)
454
455 # atomic
456 atomic_str = ''
457 if atomic:
458 atomic_str = '--atomic'
459
460 # version
461 version_str = ''
462 if ':' in kdu_model:
463 parts = kdu_model.split(sep=':')
464 if len(parts) == 2:
465 version_str = '--version {}'.format(parts[1])
466 kdu_model = parts[0]
467
468 # helm repo upgrade
469 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
470 .format(self._helm_command, atomic_str, config_filename, helm_dir,
471 params_str, timeout_str, kdu_instance, kdu_model, version_str)
472 self.debug('upgrading: {}'.format(command))
473
474 if atomic:
475
476 # exec helm in a task
477 exec_task = asyncio.ensure_future(
478 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
479 )
480 # write status in another task
481 status_task = asyncio.ensure_future(
482 coro_or_future=self._store_status(
483 cluster_uuid=cluster_uuid,
484 kdu_instance=kdu_instance,
485 db_dict=db_dict,
486 operation='upgrade',
487 run_once=False
488 )
489 )
490
491 # wait for execution task
492 await asyncio.wait([ exec_task ])
493
494 # cancel status task
495 status_task.cancel()
496 output, rc = exec_task.result()
497
498 else:
499
500 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
501
502 # write final status
503 await self._store_status(
504 cluster_uuid=cluster_uuid,
505 kdu_instance=kdu_instance,
506 db_dict=db_dict,
507 operation='upgrade',
508 run_once=True,
509 check_every=0
510 )
511
512 if rc != 0:
513 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
514 self.error(msg)
515 raise Exception(msg)
516
517 # return new revision number
518 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
519 if instance:
520 revision = int(instance.get('Revision'))
521 self.debug('New revision: {}'.format(revision))
522 return revision
523 else:
524 return 0
525
526 async def rollback(
527 self,
528 cluster_uuid: str,
529 kdu_instance: str,
530 revision=0,
531 db_dict: dict = None
532 ):
533
534 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
535 .format(kdu_instance, revision, cluster_uuid))
536
537 # config filename
538 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
539
540 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
541 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
542
543 # exec helm in a task
544 exec_task = asyncio.ensure_future(
545 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
546 )
547 # write status in another task
548 status_task = asyncio.ensure_future(
549 coro_or_future=self._store_status(
550 cluster_uuid=cluster_uuid,
551 kdu_instance=kdu_instance,
552 db_dict=db_dict,
553 operation='rollback',
554 run_once=False
555 )
556 )
557
558 # wait for execution task
559 await asyncio.wait([exec_task])
560
561 # cancel status task
562 status_task.cancel()
563
564 output, rc = exec_task.result()
565
566 # write final status
567 await self._store_status(
568 cluster_uuid=cluster_uuid,
569 kdu_instance=kdu_instance,
570 db_dict=db_dict,
571 operation='rollback',
572 run_once=True,
573 check_every=0
574 )
575
576 if rc != 0:
577 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
578 self.error(msg)
579 raise Exception(msg)
580
581 # return new revision number
582 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
583 if instance:
584 revision = int(instance.get('Revision'))
585 self.debug('New revision: {}'.format(revision))
586 return revision
587 else:
588 return 0
589
590 async def uninstall(
591 self,
592 cluster_uuid: str,
593 kdu_instance: str
594 ):
595 """
596 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
597 after all _terminate-config-primitive_ of the VNF are invoked).
598
599 :param cluster_uuid: UUID of a K8s cluster known by OSM
600 :param kdu_instance: unique name for the KDU instance to be deleted
601 :return: True if successful
602 """
603
604 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
605
606 # config filename
607 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
608
609 command = '{} --kubeconfig={} --home={} delete --purge {}'\
610 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
611
612 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
613
614 return self._output_to_table(output)
615
616 async def inspect_kdu(
617 self,
618 kdu_model: str
619 ) -> str:
620
621 self.debug('inspect kdu_model {}'.format(kdu_model))
622
623 command = '{} inspect values {}'\
624 .format(self._helm_command, kdu_model)
625
626 output, rc = await self._local_async_exec(command=command)
627
628 return output
629
630 async def help_kdu(
631 self,
632 kdu_model: str
633 ):
634
635 self.debug('help kdu_model {}'.format(kdu_model))
636
637 command = '{} inspect readme {}'\
638 .format(self._helm_command, kdu_model)
639
640 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
641
642 return output
643
644 async def status_kdu(
645 self,
646 cluster_uuid: str,
647 kdu_instance: str
648 ):
649
650 return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
651
652
653 """
654 ##################################################################################################
655 ########################################## P R I V A T E #########################################
656 ##################################################################################################
657 """
658
659 async def _status_kdu(
660 self,
661 cluster_uuid: str,
662 kdu_instance: str,
663 show_error_log: bool = False
664 ):
665
666 self.debug('status of kdu_instance {}'.format(kdu_instance))
667
668 # config filename
669 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
670
671 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
672 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
673
674 output, rc = await self._local_async_exec(
675 command=command,
676 raise_exception_on_error=True,
677 show_error_log=show_error_log
678 )
679
680 if rc != 0:
681 return None
682
683 data = yaml.load(output, Loader=yaml.SafeLoader)
684
685 # remove field 'notes'
686 try:
687 del data.get('info').get('status')['notes']
688 except KeyError:
689 pass
690
691 # parse field 'resources'
692 try:
693 resources = str(data.get('info').get('status').get('resources'))
694 resource_table = self._output_to_table(resources)
695 data.get('info').get('status')['resources'] = resource_table
696 except Exception as e:
697 pass
698
699 return data
700
701
702 async def get_instance_info(
703 self,
704 cluster_uuid: str,
705 kdu_instance: str
706 ):
707 instances = await self.instances_list(cluster_uuid=cluster_uuid)
708 for instance in instances:
709 if instance.get('Name') == kdu_instance:
710 return instance
711 self.debug('Instance {} not found'.format(kdu_instance))
712 return None
713
714 @staticmethod
715 def _generate_release_name(
716 chart_name: str
717 ):
718 name = ''
719 for c in chart_name:
720 if c.isalpha() or c.isnumeric():
721 name += c
722 else:
723 name += '-'
724 if len(name) > 35:
725 name = name[0:35]
726
727 # if does not start with alpha character, prefix 'a'
728 if not name[0].isalpha():
729 name = 'a' + name
730
731 name += '-'
732
733 def get_random_number():
734 r = random.randrange(start=1, stop=99999999)
735 s = str(r)
736 while len(s) < 10:
737 s = '0' + s
738 return s
739
740 name = name + get_random_number()
741 return name.lower()
742
743 async def _store_status(
744 self,
745 cluster_uuid: str,
746 operation: str,
747 kdu_instance: str,
748 check_every: float = 10,
749 db_dict: dict = None,
750 run_once: bool = False
751 ):
752 while True:
753 try:
754 await asyncio.sleep(check_every)
755 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
756 status = detailed_status.get('info').get('Description')
757 print('=' * 60)
758 self.debug('STATUS:\n{}'.format(status))
759 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
760 print('=' * 60)
761 # write status to db
762 result = await self.write_app_status_to_db(
763 db_dict=db_dict,
764 status=str(status),
765 detailed_status=str(detailed_status),
766 operation=operation)
767 if not result:
768 self.info('Error writing in database. Task exiting...')
769 return
770 except asyncio.CancelledError:
771 self.debug('Task cancelled')
772 return
773 except Exception as e:
774 pass
775 finally:
776 if run_once:
777 return
778
779 async def _is_install_completed(
780 self,
781 cluster_uuid: str,
782 kdu_instance: str
783 ) -> bool:
784
785 status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
786
787 # extract info.status.resources-> str
788 # format:
789 # ==> v1/Deployment
790 # NAME READY UP-TO-DATE AVAILABLE AGE
791 # halting-horse-mongodb 0/1 1 0 0s
792 # halting-petit-mongodb 1/1 1 0 0s
793 # blank line
794 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
795
796 # convert to table
797 resources = K8sHelmConnector._output_to_table(resources)
798
799 num_lines = len(resources)
800 index = 0
801 while index < num_lines:
802 try:
803 line1 = resources[index]
804 index += 1
805 # find '==>' in column 0
806 if line1[0] == '==>':
807 line2 = resources[index]
808 index += 1
809 # find READY in column 1
810 if line2[1] == 'READY':
811 # read next lines
812 line3 = resources[index]
813 index += 1
814 while len(line3) > 1 and index < num_lines:
815 ready_value = line3[1]
816 parts = ready_value.split(sep='/')
817 current = int(parts[0])
818 total = int(parts[1])
819 if current < total:
820 self.debug('NOT READY:\n {}'.format(line3))
821 ready = False
822 line3 = resources[index]
823 index += 1
824
825 except Exception as e:
826 pass
827
828 return ready
829
830 @staticmethod
831 def _get_deep(dictionary: dict, members: tuple):
832 target = dictionary
833 value = None
834 try:
835 for m in members:
836 value = target.get(m)
837 if not value:
838 return None
839 else:
840 target = value
841 except Exception as e:
842 pass
843 return value
844
845 # find key:value in several lines
846 @staticmethod
847 def _find_in_lines(p_lines: list, p_key: str) -> str:
848 for line in p_lines:
849 try:
850 if line.startswith(p_key + ':'):
851 parts = line.split(':')
852 the_value = parts[1].strip()
853 return the_value
854 except Exception as e:
855 # ignore it
856 pass
857 return None
858
859 # params for use in --set option
860 @staticmethod
861 def _params_to_set_option(params: dict) -> str:
862 params_str = ''
863 if params and len(params) > 0:
864 start = True
865 for key in params:
866 value = params.get(key, None)
867 if value is not None:
868 if start:
869 params_str += '--set '
870 start = False
871 else:
872 params_str += ','
873 params_str += '{}={}'.format(key, value)
874 return params_str
875
876 @staticmethod
877 def _output_to_lines(output: str) -> list:
878 output_lines = list()
879 lines = output.splitlines(keepends=False)
880 for line in lines:
881 line = line.strip()
882 if len(line) > 0:
883 output_lines.append(line)
884 return output_lines
885
886 @staticmethod
887 def _output_to_table(output: str) -> list:
888 output_table = list()
889 lines = output.splitlines(keepends=False)
890 for line in lines:
891 line = line.replace('\t', ' ')
892 line_list = list()
893 output_table.append(line_list)
894 cells = line.split(sep=' ')
895 for cell in cells:
896 cell = cell.strip()
897 if len(cell) > 0:
898 line_list.append(cell)
899 return output_table
900
901 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str):
902 """
903 Returns kube and helm directories
904
905 :param cluster_name:
906 :param create_if_not_exist:
907 :return: kube, helm directories and config filename. Raises exception if not exist and cannot create
908 """
909
910 base = self.fs.path
911 if base.endswith("/") or base.endswith("\\"):
912 base = base[:-1]
913
914 # base dir for cluster
915 cluster_dir = base + '/' + cluster_name
916 if create_if_not_exist and not os.path.exists(cluster_dir):
917 self.debug('Creating dir {}'.format(cluster_dir))
918 os.makedirs(cluster_dir)
919 if not os.path.exists(cluster_dir):
920 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
921 self.error(msg)
922 raise Exception(msg)
923
924 # kube dir
925 kube_dir = cluster_dir + '/' + '.kube'
926 if create_if_not_exist and not os.path.exists(kube_dir):
927 self.debug('Creating dir {}'.format(kube_dir))
928 os.makedirs(kube_dir)
929 if not os.path.exists(kube_dir):
930 msg = 'Kube config dir {} does not exist'.format(kube_dir)
931 self.error(msg)
932 raise Exception(msg)
933
934 # helm home dir
935 helm_dir = cluster_dir + '/' + '.helm'
936 if create_if_not_exist and not os.path.exists(helm_dir):
937 self.debug('Creating dir {}'.format(helm_dir))
938 os.makedirs(helm_dir)
939 if not os.path.exists(helm_dir):
940 msg = 'Helm config dir {} does not exist'.format(helm_dir)
941 self.error(msg)
942 raise Exception(msg)
943
944 config_filename = kube_dir + '/config'
945 return kube_dir, helm_dir, config_filename
946
947 @staticmethod
948 def _remove_multiple_spaces(str):
949 str = str.strip()
950 while ' ' in str:
951 str = str.replace(' ', ' ')
952 return str
953
954 def _local_exec(
955 self,
956 command: str
957 ) -> (str, int):
958 command = K8sHelmConnector._remove_multiple_spaces(command)
959 self.debug('Executing sync local command: {}'.format(command))
960 # raise exception if fails
961 output = ''
962 try:
963 output = subprocess.check_output(command, shell=True, universal_newlines=True)
964 return_code = 0
965 self.debug(output)
966 except Exception as e:
967 return_code = 1
968
969 return output, return_code
970
971 async def _local_async_exec(
972 self,
973 command: str,
974 raise_exception_on_error: bool = False,
975 show_error_log: bool = False
976 ) -> (str, int):
977
978 command = K8sHelmConnector._remove_multiple_spaces(command)
979 self.debug('Executing async local command: {}'.format(command))
980
981 # split command
982 command = command.split(sep=' ')
983
984 try:
985 process = await asyncio.create_subprocess_exec(
986 *command,
987 stdout=asyncio.subprocess.PIPE,
988 stderr=asyncio.subprocess.PIPE
989 )
990
991 # wait for command terminate
992 stdout, stderr = await process.communicate()
993
994 return_code = process.returncode
995
996 output = ''
997 if stdout:
998 output = stdout.decode('utf-8').strip()
999 if stderr:
1000 output = stderr.decode('utf-8').strip()
1001
1002 if return_code != 0 and show_error_log:
1003 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1004 else:
1005 self.debug('Return code: {}'.format(return_code))
1006
1007 if raise_exception_on_error and return_code != 0:
1008 raise Exception(output)
1009
1010 return output, return_code
1011
1012 except Exception as e:
1013 msg = 'Exception executing command: {} -> {}'.format(command, e)
1014 if show_error_log:
1015 self.error(msg)
1016 return '', -1
1017
1018 def _remote_exec(
1019 self,
1020 hostname: str,
1021 username: str,
1022 password: str,
1023 command: str,
1024 timeout: int = 10
1025 ) -> (str, int):
1026
1027 command = K8sHelmConnector._remove_multiple_spaces(command)
1028 self.debug('Executing sync remote ssh command: {}'.format(command))
1029
1030 ssh = paramiko.SSHClient()
1031 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1032 ssh.connect(hostname=hostname, username=username, password=password)
1033 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1034 output = ssh_stdout.read().decode('utf-8')
1035 error = ssh_stderr.read().decode('utf-8')
1036 if error:
1037 self.error('ERROR: {}'.format(error))
1038 return_code = 1
1039 else:
1040 return_code = 0
1041 output = output.replace('\\n', '\n')
1042 self.debug('OUTPUT: {}'.format(output))
1043
1044 return output, return_code
1045
1046 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1047 self.debug('Checking if file {} exists...'.format(filename))
1048 if os.path.exists(filename):
1049 return True
1050 else:
1051 msg = 'File {} does not exist'.format(filename)
1052 if exception_if_not_exists:
1053 self.error(msg)
1054 raise Exception(msg)
1055