88c94c5c49e535ef58fbf0b3d987a373d86fb181
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import paramiko
24 import subprocess
25 import os
26 import shutil
27 import asyncio
28 import uuid
29 import time
30 import yaml
31 from uuid import uuid4
32 import random
33 from n2vc.k8s_conn import K8sConnector
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = '/usr/bin/kubectl',
49 helm_command: str = '/usr/bin/helm',
50 log: object = None,
51 on_update_db=None
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(
65 self,
66 db=db,
67 log=log,
68 on_update_db=on_update_db
69 )
70
71 self.info('Initializing K8S Helm connector')
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 self.info('K8S Helm connector initialized')
88
89 async def init_env(
90 self,
91 k8s_creds: str,
92 namespace: str = 'kube-system',
93 reuse_cluster_uuid=None
94 ) -> (str, bool):
95
96 cluster_uuid = reuse_cluster_uuid
97 if not cluster_uuid:
98 cluster_uuid = str(uuid4())
99
100 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
101
102 # create config filename
103 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
104 f = open(config_filename, "w")
105 f.write(k8s_creds)
106 f.close()
107
108 # check if tiller pod is up in cluster
109 command = '{} --kubeconfig={} --namespace={} get deployments'\
110 .format(self.kubectl_command, config_filename, namespace)
111 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
112
113 output_table = K8sHelmConnector._output_to_table(output=output)
114
115 # find 'tiller' pod in all pods
116 already_initialized = False
117 try:
118 for row in output_table:
119 if row[0].startswith('tiller-deploy'):
120 already_initialized = True
121 break
122 except Exception as e:
123 pass
124
125 # helm init
126 n2vc_installed_sw = False
127 if not already_initialized:
128 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
129 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
130 .format(self._helm_command, config_filename, namespace, helm_dir)
131 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
132 n2vc_installed_sw = True
133 else:
134 # check client helm installation
135 check_file = helm_dir + '/repository/repositories.yaml'
136 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
137 self.info('Initializing helm in client: {}'.format(cluster_uuid))
138 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
139 .format(self._helm_command, config_filename, namespace, helm_dir)
140 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
141 else:
142 self.info('Helm client already initialized')
143
144 self.info('Cluster initialized {}'.format(cluster_uuid))
145
146 return cluster_uuid, n2vc_installed_sw
147
148 async def repo_add(
149 self,
150 cluster_uuid: str,
151 name: str,
152 url: str,
153 repo_type: str = 'chart'
154 ):
155
156 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
157
158 # config filename
159 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
160
161 # helm repo update
162 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
163 self.debug('updating repo: {}'.format(command))
164 await self._local_async_exec(command=command, raise_exception_on_error=False)
165
166 # helm repo add name url
167 command = '{} --kubeconfig={} --home={} repo add {} {}'\
168 .format(self._helm_command, config_filename, helm_dir, name, url)
169 self.debug('adding repo: {}'.format(command))
170 await self._local_async_exec(command=command, raise_exception_on_error=True)
171
172 async def repo_list(
173 self,
174 cluster_uuid: str
175 ) -> list:
176 """
177 Get the list of registered repositories
178
179 :return: list of registered repositories: [ (name, url) .... ]
180 """
181
182 self.debug('list repositories for cluster {}'.format(cluster_uuid))
183
184 # config filename
185 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
186
187 command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
188
189 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
190 if output and len(output) > 0:
191 return yaml.load(output, Loader=yaml.SafeLoader)
192 else:
193 return []
194
195 async def repo_remove(
196 self,
197 cluster_uuid: str,
198 name: str
199 ):
200 """
201 Remove a repository from OSM
202
203 :param cluster_uuid: the cluster
204 :param name: repo name in OSM
205 :return: True if successful
206 """
207
208 self.debug('list repositories for cluster {}'.format(cluster_uuid))
209
210 # config filename
211 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
212
213 command = '{} --kubeconfig={} --home={} repo remove {}'\
214 .format(self._helm_command, config_filename, helm_dir, name)
215
216 await self._local_async_exec(command=command, raise_exception_on_error=True)
217
218 async def reset(
219 self,
220 cluster_uuid: str,
221 force: bool = False,
222 uninstall_sw: bool = False
223 ) -> bool:
224
225 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
226
227 # get kube and helm directories
228 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
229
230 # uninstall releases if needed
231 releases = await self.instances_list(cluster_uuid=cluster_uuid)
232 if len(releases) > 0:
233 if force:
234 for r in releases:
235 try:
236 kdu_instance = r.get('Name')
237 chart = r.get('Chart')
238 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
239 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
240 except Exception as e:
241 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
242 else:
243 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
244 .format(cluster_uuid)
245 self.error(msg)
246 raise Exception(msg)
247
248 if uninstall_sw:
249
250 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
251
252 # find namespace for tiller pod
253 command = '{} --kubeconfig={} get deployments --all-namespaces'\
254 .format(self.kubectl_command, config_filename)
255 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
256 output_table = K8sHelmConnector._output_to_table(output=output)
257 namespace = None
258 for r in output_table:
259 try:
260 if 'tiller-deploy' in r[1]:
261 namespace = r[0]
262 break
263 except Exception as e:
264 pass
265 else:
266 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
267 self.error(msg)
268 # raise Exception(msg)
269
270 self.debug('namespace for tiller: {}'.format(namespace))
271
272 force_str = '--force'
273
274 if namespace:
275 # delete tiller deployment
276 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
277 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
278 .format(self.kubectl_command, namespace, config_filename, force_str)
279 await self._local_async_exec(command=command, raise_exception_on_error=False)
280
281 # uninstall tiller from cluster
282 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
283 command = '{} --kubeconfig={} --home={} reset'\
284 .format(self._helm_command, config_filename, helm_dir)
285 self.debug('resetting: {}'.format(command))
286 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
287 else:
288 self.debug('namespace not found')
289
290 # delete cluster directory
291 dir = self.fs.path + '/' + cluster_uuid
292 self.debug('Removing directory {}'.format(dir))
293 shutil.rmtree(dir, ignore_errors=True)
294
295 return True
296
297 async def install(
298 self,
299 cluster_uuid: str,
300 kdu_model: str,
301 atomic: bool = True,
302 timeout: float = 300,
303 params: dict = None,
304 db_dict: dict = None
305 ):
306
307 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
308
309 start = time.time()
310 end = start + timeout
311
312 # config filename
313 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
314
315 # params to str
316 params_str = K8sHelmConnector._params_to_set_option(params)
317
318 timeout_str = ''
319 if timeout:
320 timeout_str = '--timeout {}'.format(timeout)
321
322 # atomic
323 atomic_str = ''
324 if atomic:
325 atomic_str = '--atomic'
326
327 # version
328 version_str = ''
329 if ':' in kdu_model:
330 parts = kdu_model.split(sep=':')
331 if len(parts) == 2:
332 version_str = '--version {}'.format(parts[1])
333 kdu_model = parts[0]
334
335 # generate a name for the releas. Then, check if already exists
336 kdu_instance = None
337 while kdu_instance is None:
338 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
339 try:
340 result = await self._status_kdu(
341 cluster_uuid=cluster_uuid,
342 kdu_instance=kdu_instance,
343 show_error_log=False
344 )
345 if result is not None:
346 # instance already exists: generate a new one
347 kdu_instance = None
348 except:
349 kdu_instance = None
350
351 # helm repo install
352 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
353 .format(self._helm_command, atomic_str, config_filename, helm_dir,
354 params_str, timeout_str, kdu_instance, kdu_model, version_str)
355 self.debug('installing: {}'.format(command))
356
357 if atomic:
358 # exec helm in a task
359 exec_task = asyncio.ensure_future(
360 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
361 )
362 # write status in another task
363 status_task = asyncio.ensure_future(
364 coro_or_future=self._store_status(
365 cluster_uuid=cluster_uuid,
366 kdu_instance=kdu_instance,
367 db_dict=db_dict,
368 operation='install',
369 run_once=False
370 )
371 )
372
373 # wait for execution task
374 await asyncio.wait([exec_task])
375
376 # cancel status task
377 status_task.cancel()
378
379 output, rc = exec_task.result()
380
381 else:
382
383 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
384
385 # write final status
386 await self._store_status(
387 cluster_uuid=cluster_uuid,
388 kdu_instance=kdu_instance,
389 db_dict=db_dict,
390 operation='install',
391 run_once=True,
392 check_every=0
393 )
394
395 if rc != 0:
396 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
397 self.error(msg)
398 raise Exception(msg)
399
400 self.debug('Returning kdu_instance {}'.format(kdu_instance))
401 return kdu_instance
402
403 async def instances_list(
404 self,
405 cluster_uuid: str
406 ) -> list:
407 """
408 returns a list of deployed releases in a cluster
409
410 :param cluster_uuid: the cluster
411 :return:
412 """
413
414 self.debug('list releases for cluster {}'.format(cluster_uuid))
415
416 # config filename
417 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
418
419 command = '{} --kubeconfig={} --home={} list --output yaml'\
420 .format(self._helm_command, config_filename, helm_dir)
421
422 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
423
424 if output and len(output) > 0:
425 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
426 else:
427 return []
428
429 async def upgrade(
430 self,
431 cluster_uuid: str,
432 kdu_instance: str,
433 kdu_model: str = None,
434 atomic: bool = True,
435 timeout: float = 300,
436 params: dict = None,
437 db_dict: dict = None
438 ):
439
440 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
441
442 start = time.time()
443 end = start + timeout
444
445 # config filename
446 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
447
448 # params to str
449 params_str = K8sHelmConnector._params_to_set_option(params)
450
451 timeout_str = ''
452 if timeout:
453 timeout_str = '--timeout {}'.format(timeout)
454
455 # atomic
456 atomic_str = ''
457 if atomic:
458 atomic_str = '--atomic'
459
460 # version
461 version_str = ''
462 if ':' in kdu_model:
463 parts = kdu_model.split(sep=':')
464 if len(parts) == 2:
465 version_str = '--version {}'.format(parts[1])
466 kdu_model = parts[0]
467
468 # helm repo upgrade
469 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
470 .format(self._helm_command, atomic_str, config_filename, helm_dir,
471 params_str, timeout_str, kdu_instance, kdu_model, version_str)
472 self.debug('upgrading: {}'.format(command))
473
474 if atomic:
475
476 # exec helm in a task
477 exec_task = asyncio.ensure_future(
478 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
479 )
480 # write status in another task
481 status_task = asyncio.ensure_future(
482 coro_or_future=self._store_status(
483 cluster_uuid=cluster_uuid,
484 kdu_instance=kdu_instance,
485 db_dict=db_dict,
486 operation='upgrade',
487 run_once=False
488 )
489 )
490
491 # wait for execution task
492 await asyncio.wait([ exec_task ])
493
494 # cancel status task
495 status_task.cancel()
496 output, rc = exec_task.result()
497
498 else:
499
500 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
501
502 # write final status
503 await self._store_status(
504 cluster_uuid=cluster_uuid,
505 kdu_instance=kdu_instance,
506 db_dict=db_dict,
507 operation='upgrade',
508 run_once=True,
509 check_every=0
510 )
511
512 if rc != 0:
513 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
514 self.error(msg)
515 raise Exception(msg)
516
517 # return new revision number
518 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
519 if instance:
520 revision = int(instance.get('Revision'))
521 self.debug('New revision: {}'.format(revision))
522 return revision
523 else:
524 return 0
525
526 async def rollback(
527 self,
528 cluster_uuid: str,
529 kdu_instance: str,
530 revision=0,
531 db_dict: dict = None
532 ):
533
534 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
535 .format(kdu_instance, revision, cluster_uuid))
536
537 # config filename
538 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
539
540 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
541 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
542
543 # exec helm in a task
544 exec_task = asyncio.ensure_future(
545 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
546 )
547 # write status in another task
548 status_task = asyncio.ensure_future(
549 coro_or_future=self._store_status(
550 cluster_uuid=cluster_uuid,
551 kdu_instance=kdu_instance,
552 db_dict=db_dict,
553 operation='rollback',
554 run_once=False
555 )
556 )
557
558 # wait for execution task
559 await asyncio.wait([exec_task])
560
561 # cancel status task
562 status_task.cancel()
563
564 output, rc = exec_task.result()
565
566 # write final status
567 await self._store_status(
568 cluster_uuid=cluster_uuid,
569 kdu_instance=kdu_instance,
570 db_dict=db_dict,
571 operation='rollback',
572 run_once=True,
573 check_every=0
574 )
575
576 if rc != 0:
577 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
578 self.error(msg)
579 raise Exception(msg)
580
581 # return new revision number
582 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
583 if instance:
584 revision = int(instance.get('Revision'))
585 self.debug('New revision: {}'.format(revision))
586 return revision
587 else:
588 return 0
589
590 async def uninstall(
591 self,
592 cluster_uuid: str,
593 kdu_instance: str
594 ):
595 """
596 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
597 after all _terminate-config-primitive_ of the VNF are invoked).
598
599 :param cluster_uuid: UUID of a K8s cluster known by OSM
600 :param kdu_instance: unique name for the KDU instance to be deleted
601 :return: True if successful
602 """
603
604 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
605
606 # config filename
607 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
608
609 command = '{} --kubeconfig={} --home={} delete --purge {}'\
610 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
611
612 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
613
614 return self._output_to_table(output)
615
616 async def inspect_kdu(
617 self,
618 kdu_model: str
619 ) -> str:
620
621 self.debug('inspect kdu_model {}'.format(kdu_model))
622
623 command = '{} inspect values {}'\
624 .format(self._helm_command, kdu_model)
625
626 output, rc = await self._local_async_exec(command=command)
627
628 return output
629
630 async def help_kdu(
631 self,
632 kdu_model: str
633 ):
634
635 self.debug('help kdu_model {}'.format(kdu_model))
636
637 command = '{} inspect readme {}'\
638 .format(self._helm_command, kdu_model)
639
640 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
641
642 return output
643
644 async def status_kdu(
645 self,
646 cluster_uuid: str,
647 kdu_instance: str
648 ):
649
650 return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
651
652
653 """
654 ##################################################################################################
655 ########################################## P R I V A T E #########################################
656 ##################################################################################################
657 """
658
659 async def _status_kdu(
660 self,
661 cluster_uuid: str,
662 kdu_instance: str,
663 show_error_log: bool = False
664 ):
665
666 self.debug('status of kdu_instance {}'.format(kdu_instance))
667
668 # config filename
669 kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
670
671 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
672 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
673
674 output, rc = await self._local_async_exec(
675 command=command,
676 raise_exception_on_error=True,
677 show_error_log=show_error_log
678 )
679
680 if rc != 0:
681 return None
682
683 data = yaml.load(output, Loader=yaml.SafeLoader)
684
685 # remove field 'notes'
686 try:
687 del data.get('info').get('status')['notes']
688 except KeyError:
689 pass
690
691 # parse field 'resources'
692 try:
693 resources = str(data.get('info').get('status').get('resources'))
694 resource_table = self._output_to_table(resources)
695 data.get('info').get('status')['resources'] = resource_table
696 except Exception as e:
697 pass
698
699 return data
700
701
702 async def get_instance_info(
703 self,
704 cluster_uuid: str,
705 kdu_instance: str
706 ):
707 instances = await self.instances_list(cluster_uuid=cluster_uuid)
708 for instance in instances:
709 if instance.get('Name') == kdu_instance:
710 return instance
711 self.debug('Instance {} not found'.format(kdu_instance))
712 return None
713
714 @staticmethod
715 def _generate_release_name(
716 chart_name: str
717 ):
718 name = ''
719 for c in chart_name:
720 if c.isalpha() or c.isnumeric():
721 name += c
722 else:
723 name += '-'
724 if len(name) > 35:
725 name = name[0:35]
726
727 # if does not start with alpha character, prefix 'a'
728 if not name[0].isalpha():
729 name = 'a' + name
730
731 name += '-'
732
733 def get_random_number():
734 r = random.randrange(start=1, stop=99999999)
735 s = str(r)
736 while len(s) < 10:
737 s = '0' + s
738 return s
739
740 name = name + get_random_number()
741 return name.lower()
742
743 async def _store_status(
744 self,
745 cluster_uuid: str,
746 operation: str,
747 kdu_instance: str,
748 check_every: float = 10,
749 db_dict: dict = None,
750 run_once: bool = False
751 ):
752 while True:
753 try:
754 await asyncio.sleep(check_every)
755 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
756 status = detailed_status.get('info').get('Description')
757 print('=' * 60)
758 self.debug('STATUS:\n{}'.format(status))
759 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
760 print('=' * 60)
761 # write status to db
762 result = await self.write_app_status_to_db(
763 db_dict=db_dict,
764 status=str(status),
765 detailed_status=str(detailed_status),
766 operation=operation)
767 if not result:
768 self.info('Error writing in database. Task exiting...')
769 return
770 except asyncio.CancelledError:
771 self.debug('Task cancelled')
772 return
773 except Exception as e:
774 pass
775 finally:
776 if run_once:
777 return
778
779 async def _is_install_completed(
780 self,
781 cluster_uuid: str,
782 kdu_instance: str
783 ) -> bool:
784
785 status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
786
787 # extract info.status.resources-> str
788 # format:
789 # ==> v1/Deployment
790 # NAME READY UP-TO-DATE AVAILABLE AGE
791 # halting-horse-mongodb 0/1 1 0 0s
792 # halting-petit-mongodb 1/1 1 0 0s
793 # blank line
794 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
795
796 # convert to table
797 resources = K8sHelmConnector._output_to_table(resources)
798
799 num_lines = len(resources)
800 index = 0
801 while index < num_lines:
802 try:
803 line1 = resources[index]
804 index += 1
805 # find '==>' in column 0
806 if line1[0] == '==>':
807 line2 = resources[index]
808 index += 1
809 # find READY in column 1
810 if line2[1] == 'READY':
811 # read next lines
812 line3 = resources[index]
813 index += 1
814 while len(line3) > 1 and index < num_lines:
815 ready_value = line3[1]
816 parts = ready_value.split(sep='/')
817 current = int(parts[0])
818 total = int(parts[1])
819 if current < total:
820 self.debug('NOT READY:\n {}'.format(line3))
821 ready = False
822 line3 = resources[index]
823 index += 1
824
825 except Exception as e:
826 pass
827
828 return ready
829
830 @staticmethod
831 def _get_deep(dictionary: dict, members: tuple):
832 target = dictionary
833 value = None
834 try:
835 for m in members:
836 value = target.get(m)
837 if not value:
838 return None
839 else:
840 target = value
841 except Exception as e:
842 pass
843 return value
844
845 # find key:value in several lines
846 @staticmethod
847 def _find_in_lines(p_lines: list, p_key: str) -> str:
848 for line in p_lines:
849 try:
850 if line.startswith(p_key + ':'):
851 parts = line.split(':')
852 the_value = parts[1].strip()
853 return the_value
854 except Exception as e:
855 # ignore it
856 pass
857 return None
858
859 # params for use in --set option
860 @staticmethod
861 def _params_to_set_option(params: dict) -> str:
862 params_str = ''
863 if params and len(params) > 0:
864 start = True
865 for key in params:
866 value = params.get(key, None)
867 if value is not None:
868 if start:
869 params_str += '--set '
870 start = False
871 else:
872 params_str += ','
873 params_str += '{}={}'.format(key, value)
874 return params_str
875
876 @staticmethod
877 def _output_to_lines(output: str) -> list:
878 output_lines = list()
879 lines = output.splitlines(keepends=False)
880 for line in lines:
881 line = line.strip()
882 if len(line) > 0:
883 output_lines.append(line)
884 return output_lines
885
886 @staticmethod
887 def _output_to_table(output: str) -> list:
888 output_table = list()
889 lines = output.splitlines(keepends=False)
890 for line in lines:
891 line = line.replace('\t', ' ')
892 line_list = list()
893 output_table.append(line_list)
894 cells = line.split(sep=' ')
895 for cell in cells:
896 cell = cell.strip()
897 if len(cell) > 0:
898 line_list.append(cell)
899 return output_table
900
901 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str):
902 """
903 Returns kube and helm directories
904
905 :param cluster_name:
906 :param create_if_not_exist:
907 :return: kube, helm directories and config filename. Raises exception if not exist and cannot create
908 """
909
910 base = self.fs.path
911 if base.endswith("/") or base.endswith("\\"):
912 base = base[:-1]
913
914 # base dir for cluster
915 cluster_dir = base + '/' + cluster_name
916 if create_if_not_exist and not os.path.exists(cluster_dir):
917 self.debug('Creating dir {}'.format(cluster_dir))
918 os.makedirs(cluster_dir)
919 if not os.path.exists(cluster_dir):
920 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
921 self.error(msg)
922 raise Exception(msg)
923
924 # kube dir
925 kube_dir = cluster_dir + '/' + '.kube'
926 if create_if_not_exist and not os.path.exists(kube_dir):
927 self.debug('Creating dir {}'.format(kube_dir))
928 os.makedirs(kube_dir)
929 if not os.path.exists(kube_dir):
930 msg = 'Kube config dir {} does not exist'.format(kube_dir)
931 self.error(msg)
932 raise Exception(msg)
933
934 # helm home dir
935 helm_dir = cluster_dir + '/' + '.helm'
936 if create_if_not_exist and not os.path.exists(helm_dir):
937 self.debug('Creating dir {}'.format(helm_dir))
938 os.makedirs(helm_dir)
939 if not os.path.exists(helm_dir):
940 msg = 'Helm config dir {} does not exist'.format(helm_dir)
941 self.error(msg)
942 raise Exception(msg)
943
944 config_filename = kube_dir + '/config'
945 return kube_dir, helm_dir, config_filename
946
947 @staticmethod
948 def _remove_multiple_spaces(str):
949 str = str.strip()
950 while ' ' in str:
951 str = str.replace(' ', ' ')
952 return str
953
954 def _local_exec(
955 self,
956 command: str
957 ) -> (str, int):
958 command = K8sHelmConnector._remove_multiple_spaces(command)
959 self.debug('Executing sync local command: {}'.format(command))
960 # raise exception if fails
961 output = ''
962 try:
963 output = subprocess.check_output(command, shell=True, universal_newlines=True)
964 return_code = 0
965 self.debug(output)
966 except Exception as e:
967 return_code = 1
968
969 return output, return_code
970
971 async def _local_async_exec(
972 self,
973 command: str,
974 raise_exception_on_error: bool = False,
975 show_error_log: bool = False
976 ) -> (str, int):
977
978 command = K8sHelmConnector._remove_multiple_spaces(command)
979 self.debug('Executing async local command: {}'.format(command))
980
981 # split command
982 command = command.split(sep=' ')
983
984 try:
985 process = await asyncio.create_subprocess_exec(
986 *command,
987 stdout=asyncio.subprocess.PIPE,
988 stderr=asyncio.subprocess.PIPE
989 )
990
991 # wait for command terminate
992 stdout, stderr = await process.communicate()
993
994 return_code = process.returncode
995
996 output = ''
997 if stdout:
998 output = stdout.decode('utf-8').strip()
999 if stderr:
1000 output = stderr.decode('utf-8').strip()
1001
1002 if return_code != 0 and show_error_log:
1003 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1004 else:
1005 self.debug('Return code: {}'.format(return_code))
1006
1007 if raise_exception_on_error and return_code != 0:
1008 raise Exception(output)
1009
1010 return output, return_code
1011
1012 except Exception as e:
1013 msg = 'Exception executing command: {} -> {}'.format(command, e)
1014 if show_error_log:
1015 self.error(msg)
1016 return '', -1
1017
1018 def _remote_exec(
1019 self,
1020 hostname: str,
1021 username: str,
1022 password: str,
1023 command: str,
1024 timeout: int = 10
1025 ) -> (str, int):
1026
1027 command = K8sHelmConnector._remove_multiple_spaces(command)
1028 self.debug('Executing sync remote ssh command: {}'.format(command))
1029
1030 ssh = paramiko.SSHClient()
1031 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1032 ssh.connect(hostname=hostname, username=username, password=password)
1033 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1034 output = ssh_stdout.read().decode('utf-8')
1035 error = ssh_stderr.read().decode('utf-8')
1036 if error:
1037 self.error('ERROR: {}'.format(error))
1038 return_code = 1
1039 else:
1040 return_code = 0
1041 output = output.replace('\\n', '\n')
1042 self.debug('OUTPUT: {}'.format(output))
1043
1044 return output, return_code
1045
1046 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1047 self.debug('Checking if file {} exists...'.format(filename))
1048 if os.path.exists(filename):
1049 return True
1050 else:
1051 msg = 'File {} does not exist'.format(filename)
1052 if exception_if_not_exists:
1053 self.error(msg)
1054 raise Exception(msg)
1055