minor fix in yaml load with Loader
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import paramiko
24 import subprocess
25 import os
26 import shutil
27 import asyncio
28 import time
29 import yaml
30 from uuid import uuid4
31 import random
32 from n2vc.k8s_conn import K8sConnector
33
34
35 class K8sHelmConnector(K8sConnector):
36
37 """
38 ##################################################################################################
39 ########################################## P U B L I C ###########################################
40 ##################################################################################################
41 """
42
43 def __init__(
44 self,
45 fs: object,
46 db: object,
47 kubectl_command: str = '/usr/bin/kubectl',
48 helm_command: str = '/usr/bin/helm',
49 log: object = None,
50 on_update_db=None
51 ):
52 """
53
54 :param fs: file system for kubernetes and helm configuration
55 :param db: database object to write current operation status
56 :param kubectl_command: path to kubectl executable
57 :param helm_command: path to helm executable
58 :param log: logger
59 :param on_update_db: callback called when k8s connector updates database
60 """
61
62 # parent class
63 K8sConnector.__init__(
64 self,
65 db=db,
66 log=log,
67 on_update_db=on_update_db
68 )
69
70 self.info('Initializing K8S Helm connector')
71
72 # random numbers for release name generation
73 random.seed(time.time())
74
75 # the file system
76 self.fs = fs
77
78 # exception if kubectl is not installed
79 self.kubectl_command = kubectl_command
80 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
81
82 # exception if helm is not installed
83 self._helm_command = helm_command
84 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
85
86 self.info('K8S Helm connector initialized')
87
88 async def init_env(
89 self,
90 k8s_creds: str,
91 namespace: str = 'kube-system',
92 reuse_cluster_uuid=None
93 ) -> (str, bool):
94
95 cluster_uuid = reuse_cluster_uuid
96 if not cluster_uuid:
97 cluster_uuid = str(uuid4())
98
99 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
100
101 # create config filename
102 kube_dir, helm_dir, config_filename, cluster_dir = \
103 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
104 f = open(config_filename, "w")
105 f.write(k8s_creds)
106 f.close()
107
108 # check if tiller pod is up in cluster
109 command = '{} --kubeconfig={} --namespace={} get deployments'\
110 .format(self.kubectl_command, config_filename, namespace)
111 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
112
113 output_table = K8sHelmConnector._output_to_table(output=output)
114
115 # find 'tiller' pod in all pods
116 already_initialized = False
117 try:
118 for row in output_table:
119 if row[0].startswith('tiller-deploy'):
120 already_initialized = True
121 break
122 except Exception as e:
123 pass
124
125 # helm init
126 n2vc_installed_sw = False
127 if not already_initialized:
128 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
129 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
130 .format(self._helm_command, config_filename, namespace, helm_dir)
131 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
132 n2vc_installed_sw = True
133 else:
134 # check client helm installation
135 check_file = helm_dir + '/repository/repositories.yaml'
136 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
137 self.info('Initializing helm in client: {}'.format(cluster_uuid))
138 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
139 .format(self._helm_command, config_filename, namespace, helm_dir)
140 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
141 else:
142 self.info('Helm client already initialized')
143
144 self.info('Cluster initialized {}'.format(cluster_uuid))
145
146 return cluster_uuid, n2vc_installed_sw
147
148 async def repo_add(
149 self,
150 cluster_uuid: str,
151 name: str,
152 url: str,
153 repo_type: str = 'chart'
154 ):
155
156 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
157
158 # config filename
159 kube_dir, helm_dir, config_filename, cluster_dir = \
160 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
161
162 # helm repo update
163 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
164 self.debug('updating repo: {}'.format(command))
165 await self._local_async_exec(command=command, raise_exception_on_error=False)
166
167 # helm repo add name url
168 command = '{} --kubeconfig={} --home={} repo add {} {}'\
169 .format(self._helm_command, config_filename, helm_dir, name, url)
170 self.debug('adding repo: {}'.format(command))
171 await self._local_async_exec(command=command, raise_exception_on_error=True)
172
173 async def repo_list(
174 self,
175 cluster_uuid: str
176 ) -> list:
177 """
178 Get the list of registered repositories
179
180 :return: list of registered repositories: [ (name, url) .... ]
181 """
182
183 self.debug('list repositories for cluster {}'.format(cluster_uuid))
184
185 # config filename
186 kube_dir, helm_dir, config_filename, cluster_dir = \
187 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
188
189 command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
190
191 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
192 if output and len(output) > 0:
193 return yaml.load(output, Loader=yaml.SafeLoader)
194 else:
195 return []
196
197 async def repo_remove(
198 self,
199 cluster_uuid: str,
200 name: str
201 ):
202 """
203 Remove a repository from OSM
204
205 :param cluster_uuid: the cluster
206 :param name: repo name in OSM
207 :return: True if successful
208 """
209
210 self.debug('list repositories for cluster {}'.format(cluster_uuid))
211
212 # config filename
213 kube_dir, helm_dir, config_filename, cluster_dir = \
214 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
215
216 command = '{} --kubeconfig={} --home={} repo remove {}'\
217 .format(self._helm_command, config_filename, helm_dir, name)
218
219 await self._local_async_exec(command=command, raise_exception_on_error=True)
220
221 async def reset(
222 self,
223 cluster_uuid: str,
224 force: bool = False,
225 uninstall_sw: bool = False
226 ) -> bool:
227
228 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
229
230 # get kube and helm directories
231 kube_dir, helm_dir, config_filename, cluster_dir = \
232 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
233
234 # uninstall releases if needed
235 releases = await self.instances_list(cluster_uuid=cluster_uuid)
236 if len(releases) > 0:
237 if force:
238 for r in releases:
239 try:
240 kdu_instance = r.get('Name')
241 chart = r.get('Chart')
242 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
243 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
244 except Exception as e:
245 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
246 else:
247 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
248 .format(cluster_uuid)
249 self.error(msg)
250 raise Exception(msg)
251
252 if uninstall_sw:
253
254 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
255
256 # find namespace for tiller pod
257 command = '{} --kubeconfig={} get deployments --all-namespaces'\
258 .format(self.kubectl_command, config_filename)
259 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
260 output_table = K8sHelmConnector._output_to_table(output=output)
261 namespace = None
262 for r in output_table:
263 try:
264 if 'tiller-deploy' in r[1]:
265 namespace = r[0]
266 break
267 except Exception as e:
268 pass
269 else:
270 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
271 self.error(msg)
272 # raise Exception(msg)
273
274 self.debug('namespace for tiller: {}'.format(namespace))
275
276 force_str = '--force'
277
278 if namespace:
279 # delete tiller deployment
280 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
281 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
282 .format(self.kubectl_command, namespace, config_filename, force_str)
283 await self._local_async_exec(command=command, raise_exception_on_error=False)
284
285 # uninstall tiller from cluster
286 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
287 command = '{} --kubeconfig={} --home={} reset'\
288 .format(self._helm_command, config_filename, helm_dir)
289 self.debug('resetting: {}'.format(command))
290 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
291 else:
292 self.debug('namespace not found')
293
294 # delete cluster directory
295 dir = self.fs.path + '/' + cluster_uuid
296 self.debug('Removing directory {}'.format(dir))
297 shutil.rmtree(dir, ignore_errors=True)
298
299 return True
300
301 async def install(
302 self,
303 cluster_uuid: str,
304 kdu_model: str,
305 atomic: bool = True,
306 timeout: float = 300,
307 params: dict = None,
308 db_dict: dict = None
309 ):
310
311 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
312
313 start = time.time()
314 end = start + timeout
315
316 # config filename
317 kube_dir, helm_dir, config_filename, cluster_dir = \
318 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
319
320 # params to str
321 # params_str = K8sHelmConnector._params_to_set_option(params)
322 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
323
324 timeout_str = ''
325 if timeout:
326 timeout_str = '--timeout {}'.format(timeout)
327
328 # atomic
329 atomic_str = ''
330 if atomic:
331 atomic_str = '--atomic'
332
333 # version
334 version_str = ''
335 if ':' in kdu_model:
336 parts = kdu_model.split(sep=':')
337 if len(parts) == 2:
338 version_str = '--version {}'.format(parts[1])
339 kdu_model = parts[0]
340
341 # generate a name for the releas. Then, check if already exists
342 kdu_instance = None
343 while kdu_instance is None:
344 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
345 try:
346 result = await self._status_kdu(
347 cluster_uuid=cluster_uuid,
348 kdu_instance=kdu_instance,
349 show_error_log=False
350 )
351 if result is not None:
352 # instance already exists: generate a new one
353 kdu_instance = None
354 except:
355 kdu_instance = None
356
357 # helm repo install
358 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
359 .format(self._helm_command, atomic_str, config_filename, helm_dir,
360 params_str, timeout_str, kdu_instance, kdu_model, version_str)
361 self.debug('installing: {}'.format(command))
362
363 if atomic:
364 # exec helm in a task
365 exec_task = asyncio.ensure_future(
366 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
367 )
368 # write status in another task
369 status_task = asyncio.ensure_future(
370 coro_or_future=self._store_status(
371 cluster_uuid=cluster_uuid,
372 kdu_instance=kdu_instance,
373 db_dict=db_dict,
374 operation='install',
375 run_once=False
376 )
377 )
378
379 # wait for execution task
380 await asyncio.wait([exec_task])
381
382 # cancel status task
383 status_task.cancel()
384
385 output, rc = exec_task.result()
386
387 else:
388
389 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
390
391 # remove temporal values yaml file
392 if file_to_delete:
393 os.remove(file_to_delete)
394
395 # write final status
396 await self._store_status(
397 cluster_uuid=cluster_uuid,
398 kdu_instance=kdu_instance,
399 db_dict=db_dict,
400 operation='install',
401 run_once=True,
402 check_every=0
403 )
404
405 if rc != 0:
406 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
407 self.error(msg)
408 raise Exception(msg)
409
410 self.debug('Returning kdu_instance {}'.format(kdu_instance))
411 return kdu_instance
412
413 async def instances_list(
414 self,
415 cluster_uuid: str
416 ) -> list:
417 """
418 returns a list of deployed releases in a cluster
419
420 :param cluster_uuid: the cluster
421 :return:
422 """
423
424 self.debug('list releases for cluster {}'.format(cluster_uuid))
425
426 # config filename
427 kube_dir, helm_dir, config_filename, cluster_dir = \
428 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
429
430 command = '{} --kubeconfig={} --home={} list --output yaml'\
431 .format(self._helm_command, config_filename, helm_dir)
432
433 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
434
435 if output and len(output) > 0:
436 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
437 else:
438 return []
439
440 async def upgrade(
441 self,
442 cluster_uuid: str,
443 kdu_instance: str,
444 kdu_model: str = None,
445 atomic: bool = True,
446 timeout: float = 300,
447 params: dict = None,
448 db_dict: dict = None
449 ):
450
451 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
452
453 start = time.time()
454 end = start + timeout
455
456 # config filename
457 kube_dir, helm_dir, config_filename, cluster_dir = \
458 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
459
460 # params to str
461 # params_str = K8sHelmConnector._params_to_set_option(params)
462 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
463
464 timeout_str = ''
465 if timeout:
466 timeout_str = '--timeout {}'.format(timeout)
467
468 # atomic
469 atomic_str = ''
470 if atomic:
471 atomic_str = '--atomic'
472
473 # version
474 version_str = ''
475 if kdu_model and ':' in kdu_model:
476 parts = kdu_model.split(sep=':')
477 if len(parts) == 2:
478 version_str = '--version {}'.format(parts[1])
479 kdu_model = parts[0]
480
481 # helm repo upgrade
482 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
483 .format(self._helm_command, atomic_str, config_filename, helm_dir,
484 params_str, timeout_str, kdu_instance, kdu_model, version_str)
485 self.debug('upgrading: {}'.format(command))
486
487 if atomic:
488
489 # exec helm in a task
490 exec_task = asyncio.ensure_future(
491 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
492 )
493 # write status in another task
494 status_task = asyncio.ensure_future(
495 coro_or_future=self._store_status(
496 cluster_uuid=cluster_uuid,
497 kdu_instance=kdu_instance,
498 db_dict=db_dict,
499 operation='upgrade',
500 run_once=False
501 )
502 )
503
504 # wait for execution task
505 await asyncio.wait([ exec_task ])
506
507 # cancel status task
508 status_task.cancel()
509 output, rc = exec_task.result()
510
511 else:
512
513 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
514
515 # remove temporal values yaml file
516 if file_to_delete:
517 os.remove(file_to_delete)
518
519 # write final status
520 await self._store_status(
521 cluster_uuid=cluster_uuid,
522 kdu_instance=kdu_instance,
523 db_dict=db_dict,
524 operation='upgrade',
525 run_once=True,
526 check_every=0
527 )
528
529 if rc != 0:
530 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
531 self.error(msg)
532 raise Exception(msg)
533
534 # return new revision number
535 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
536 if instance:
537 revision = int(instance.get('Revision'))
538 self.debug('New revision: {}'.format(revision))
539 return revision
540 else:
541 return 0
542
543 async def rollback(
544 self,
545 cluster_uuid: str,
546 kdu_instance: str,
547 revision=0,
548 db_dict: dict = None
549 ):
550
551 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
552 .format(kdu_instance, revision, cluster_uuid))
553
554 # config filename
555 kube_dir, helm_dir, config_filename, cluster_dir = \
556 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
557
558 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
559 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
560
561 # exec helm in a task
562 exec_task = asyncio.ensure_future(
563 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
564 )
565 # write status in another task
566 status_task = asyncio.ensure_future(
567 coro_or_future=self._store_status(
568 cluster_uuid=cluster_uuid,
569 kdu_instance=kdu_instance,
570 db_dict=db_dict,
571 operation='rollback',
572 run_once=False
573 )
574 )
575
576 # wait for execution task
577 await asyncio.wait([exec_task])
578
579 # cancel status task
580 status_task.cancel()
581
582 output, rc = exec_task.result()
583
584 # write final status
585 await self._store_status(
586 cluster_uuid=cluster_uuid,
587 kdu_instance=kdu_instance,
588 db_dict=db_dict,
589 operation='rollback',
590 run_once=True,
591 check_every=0
592 )
593
594 if rc != 0:
595 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
596 self.error(msg)
597 raise Exception(msg)
598
599 # return new revision number
600 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
601 if instance:
602 revision = int(instance.get('Revision'))
603 self.debug('New revision: {}'.format(revision))
604 return revision
605 else:
606 return 0
607
608 async def uninstall(
609 self,
610 cluster_uuid: str,
611 kdu_instance: str
612 ):
613 """
614 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
615 after all _terminate-config-primitive_ of the VNF are invoked).
616
617 :param cluster_uuid: UUID of a K8s cluster known by OSM
618 :param kdu_instance: unique name for the KDU instance to be deleted
619 :return: True if successful
620 """
621
622 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
623
624 # config filename
625 kube_dir, helm_dir, config_filename, cluster_dir = \
626 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
627
628 command = '{} --kubeconfig={} --home={} delete --purge {}'\
629 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
630
631 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
632
633 return self._output_to_table(output)
634
635 async def inspect_kdu(
636 self,
637 kdu_model: str
638 ) -> str:
639
640 self.debug('inspect kdu_model {}'.format(kdu_model))
641
642 command = '{} inspect values {}'\
643 .format(self._helm_command, kdu_model)
644
645 output, rc = await self._local_async_exec(command=command)
646
647 return output
648
649 async def help_kdu(
650 self,
651 kdu_model: str
652 ):
653
654 self.debug('help kdu_model {}'.format(kdu_model))
655
656 command = '{} inspect readme {}'\
657 .format(self._helm_command, kdu_model)
658
659 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
660
661 return output
662
663 async def status_kdu(
664 self,
665 cluster_uuid: str,
666 kdu_instance: str
667 ):
668
669 return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
670
671
672 """
673 ##################################################################################################
674 ########################################## P R I V A T E #########################################
675 ##################################################################################################
676 """
677
678 async def _status_kdu(
679 self,
680 cluster_uuid: str,
681 kdu_instance: str,
682 show_error_log: bool = False
683 ):
684
685 self.debug('status of kdu_instance {}'.format(kdu_instance))
686
687 # config filename
688 kube_dir, helm_dir, config_filename, cluster_dir = \
689 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
690
691 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
692 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
693
694 output, rc = await self._local_async_exec(
695 command=command,
696 raise_exception_on_error=True,
697 show_error_log=show_error_log
698 )
699
700 if rc != 0:
701 return None
702
703 data = yaml.load(output, Loader=yaml.SafeLoader)
704
705 # remove field 'notes'
706 try:
707 del data.get('info').get('status')['notes']
708 except KeyError:
709 pass
710
711 # parse field 'resources'
712 try:
713 resources = str(data.get('info').get('status').get('resources'))
714 resource_table = self._output_to_table(resources)
715 data.get('info').get('status')['resources'] = resource_table
716 except Exception as e:
717 pass
718
719 return data
720
721 async def get_instance_info(
722 self,
723 cluster_uuid: str,
724 kdu_instance: str
725 ):
726 instances = await self.instances_list(cluster_uuid=cluster_uuid)
727 for instance in instances:
728 if instance.get('Name') == kdu_instance:
729 return instance
730 self.debug('Instance {} not found'.format(kdu_instance))
731 return None
732
733 @staticmethod
734 def _generate_release_name(
735 chart_name: str
736 ):
737 name = ''
738 for c in chart_name:
739 if c.isalpha() or c.isnumeric():
740 name += c
741 else:
742 name += '-'
743 if len(name) > 35:
744 name = name[0:35]
745
746 # if does not start with alpha character, prefix 'a'
747 if not name[0].isalpha():
748 name = 'a' + name
749
750 name += '-'
751
752 def get_random_number():
753 r = random.randrange(start=1, stop=99999999)
754 s = str(r)
755 s = s.rjust(width=10, fillchar=' ')
756 return s
757
758 name = name + get_random_number()
759 return name.lower()
760
761 async def _store_status(
762 self,
763 cluster_uuid: str,
764 operation: str,
765 kdu_instance: str,
766 check_every: float = 10,
767 db_dict: dict = None,
768 run_once: bool = False
769 ):
770 while True:
771 try:
772 await asyncio.sleep(check_every)
773 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
774 status = detailed_status.get('info').get('Description')
775 print('=' * 60)
776 self.debug('STATUS:\n{}'.format(status))
777 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
778 print('=' * 60)
779 # write status to db
780 result = await self.write_app_status_to_db(
781 db_dict=db_dict,
782 status=str(status),
783 detailed_status=str(detailed_status),
784 operation=operation)
785 if not result:
786 self.info('Error writing in database. Task exiting...')
787 return
788 except asyncio.CancelledError:
789 self.debug('Task cancelled')
790 return
791 except Exception as e:
792 pass
793 finally:
794 if run_once:
795 return
796
797 async def _is_install_completed(
798 self,
799 cluster_uuid: str,
800 kdu_instance: str
801 ) -> bool:
802
803 status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
804
805 # extract info.status.resources-> str
806 # format:
807 # ==> v1/Deployment
808 # NAME READY UP-TO-DATE AVAILABLE AGE
809 # halting-horse-mongodb 0/1 1 0 0s
810 # halting-petit-mongodb 1/1 1 0 0s
811 # blank line
812 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
813
814 # convert to table
815 resources = K8sHelmConnector._output_to_table(resources)
816
817 num_lines = len(resources)
818 index = 0
819 while index < num_lines:
820 try:
821 line1 = resources[index]
822 index += 1
823 # find '==>' in column 0
824 if line1[0] == '==>':
825 line2 = resources[index]
826 index += 1
827 # find READY in column 1
828 if line2[1] == 'READY':
829 # read next lines
830 line3 = resources[index]
831 index += 1
832 while len(line3) > 1 and index < num_lines:
833 ready_value = line3[1]
834 parts = ready_value.split(sep='/')
835 current = int(parts[0])
836 total = int(parts[1])
837 if current < total:
838 self.debug('NOT READY:\n {}'.format(line3))
839 ready = False
840 line3 = resources[index]
841 index += 1
842
843 except Exception as e:
844 pass
845
846 return ready
847
848 @staticmethod
849 def _get_deep(dictionary: dict, members: tuple):
850 target = dictionary
851 value = None
852 try:
853 for m in members:
854 value = target.get(m)
855 if not value:
856 return None
857 else:
858 target = value
859 except Exception as e:
860 pass
861 return value
862
863 # find key:value in several lines
864 @staticmethod
865 def _find_in_lines(p_lines: list, p_key: str) -> str:
866 for line in p_lines:
867 try:
868 if line.startswith(p_key + ':'):
869 parts = line.split(':')
870 the_value = parts[1].strip()
871 return the_value
872 except Exception as e:
873 # ignore it
874 pass
875 return None
876
877 # params for use in -f file
878 # returns values file option and filename (in order to delete it at the end)
879 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
880 params_str = ''
881
882 if params and len(params) > 0:
883 kube_dir, helm_dir, config_filename, cluster_dir = \
884 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
885
886 def get_random_number():
887 r = random.randrange(start=1, stop=99999999)
888 s = str(r)
889 while len(s) < 10:
890 s = '0' + s
891 return s
892
893 params2 = dict()
894 for key in params:
895 value = params.get(key)
896 if '!!yaml' in str(value):
897 value = yaml.load(value[7:], Loader=yaml.SafeLoader)
898 params2[key] = value
899
900 values_file = get_random_number() + '.yaml'
901 with open(values_file, 'w') as stream:
902 yaml.dump(params2, stream, indent=4, default_flow_style=False)
903
904 return '-f {}'.format(values_file), values_file
905
906 return '', None
907
908 # params for use in --set option
909 @staticmethod
910 def _params_to_set_option(params: dict) -> str:
911 params_str = ''
912 if params and len(params) > 0:
913 start = True
914 for key in params:
915 value = params.get(key, None)
916 if value is not None:
917 if start:
918 params_str += '--set '
919 start = False
920 else:
921 params_str += ','
922 params_str += '{}={}'.format(key, value)
923 return params_str
924
925 @staticmethod
926 def _output_to_lines(output: str) -> list:
927 output_lines = list()
928 lines = output.splitlines(keepends=False)
929 for line in lines:
930 line = line.strip()
931 if len(line) > 0:
932 output_lines.append(line)
933 return output_lines
934
935 @staticmethod
936 def _output_to_table(output: str) -> list:
937 output_table = list()
938 lines = output.splitlines(keepends=False)
939 for line in lines:
940 line = line.replace('\t', ' ')
941 line_list = list()
942 output_table.append(line_list)
943 cells = line.split(sep=' ')
944 for cell in cells:
945 cell = cell.strip()
946 if len(cell) > 0:
947 line_list.append(cell)
948 return output_table
949
950 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
951 """
952 Returns kube and helm directories
953
954 :param cluster_name:
955 :param create_if_not_exist:
956 :return: kube, helm directories, config filename and cluster dir.
957 Raises exception if not exist and cannot create
958 """
959
960 base = self.fs.path
961 if base.endswith("/") or base.endswith("\\"):
962 base = base[:-1]
963
964 # base dir for cluster
965 cluster_dir = base + '/' + cluster_name
966 if create_if_not_exist and not os.path.exists(cluster_dir):
967 self.debug('Creating dir {}'.format(cluster_dir))
968 os.makedirs(cluster_dir)
969 if not os.path.exists(cluster_dir):
970 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
971 self.error(msg)
972 raise Exception(msg)
973
974 # kube dir
975 kube_dir = cluster_dir + '/' + '.kube'
976 if create_if_not_exist and not os.path.exists(kube_dir):
977 self.debug('Creating dir {}'.format(kube_dir))
978 os.makedirs(kube_dir)
979 if not os.path.exists(kube_dir):
980 msg = 'Kube config dir {} does not exist'.format(kube_dir)
981 self.error(msg)
982 raise Exception(msg)
983
984 # helm home dir
985 helm_dir = cluster_dir + '/' + '.helm'
986 if create_if_not_exist and not os.path.exists(helm_dir):
987 self.debug('Creating dir {}'.format(helm_dir))
988 os.makedirs(helm_dir)
989 if not os.path.exists(helm_dir):
990 msg = 'Helm config dir {} does not exist'.format(helm_dir)
991 self.error(msg)
992 raise Exception(msg)
993
994 config_filename = kube_dir + '/config'
995 return kube_dir, helm_dir, config_filename, cluster_dir
996
997 @staticmethod
998 def _remove_multiple_spaces(str):
999 str = str.strip()
1000 while ' ' in str:
1001 str = str.replace(' ', ' ')
1002 return str
1003
1004 def _local_exec(
1005 self,
1006 command: str
1007 ) -> (str, int):
1008 command = K8sHelmConnector._remove_multiple_spaces(command)
1009 self.debug('Executing sync local command: {}'.format(command))
1010 # raise exception if fails
1011 output = ''
1012 try:
1013 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1014 return_code = 0
1015 self.debug(output)
1016 except Exception as e:
1017 return_code = 1
1018
1019 return output, return_code
1020
1021 async def _local_async_exec(
1022 self,
1023 command: str,
1024 raise_exception_on_error: bool = False,
1025 show_error_log: bool = True
1026 ) -> (str, int):
1027
1028 command = K8sHelmConnector._remove_multiple_spaces(command)
1029 self.debug('Executing async local command: {}'.format(command))
1030
1031 # split command
1032 command = command.split(sep=' ')
1033
1034 try:
1035 process = await asyncio.create_subprocess_exec(
1036 *command,
1037 stdout=asyncio.subprocess.PIPE,
1038 stderr=asyncio.subprocess.PIPE
1039 )
1040
1041 # wait for command terminate
1042 stdout, stderr = await process.communicate()
1043
1044 return_code = process.returncode
1045
1046 output = ''
1047 if stdout:
1048 output = stdout.decode('utf-8').strip()
1049 if stderr:
1050 output = stderr.decode('utf-8').strip()
1051
1052 if return_code != 0 and show_error_log:
1053 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1054 else:
1055 self.debug('Return code: {}'.format(return_code))
1056
1057 if raise_exception_on_error and return_code != 0:
1058 raise Exception(output)
1059
1060 return output, return_code
1061
1062 except Exception as e:
1063 msg = 'Exception executing command: {} -> {}'.format(command, e)
1064 if show_error_log:
1065 self.error(msg)
1066 return '', -1
1067
1068 def _remote_exec(
1069 self,
1070 hostname: str,
1071 username: str,
1072 password: str,
1073 command: str,
1074 timeout: int = 10
1075 ) -> (str, int):
1076
1077 command = K8sHelmConnector._remove_multiple_spaces(command)
1078 self.debug('Executing sync remote ssh command: {}'.format(command))
1079
1080 ssh = paramiko.SSHClient()
1081 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1082 ssh.connect(hostname=hostname, username=username, password=password)
1083 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1084 output = ssh_stdout.read().decode('utf-8')
1085 error = ssh_stderr.read().decode('utf-8')
1086 if error:
1087 self.error('ERROR: {}'.format(error))
1088 return_code = 1
1089 else:
1090 return_code = 0
1091 output = output.replace('\\n', '\n')
1092 self.debug('OUTPUT: {}'.format(output))
1093
1094 return output, return_code
1095
1096 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1097 self.debug('Checking if file {} exists...'.format(filename))
1098 if os.path.exists(filename):
1099 return True
1100 else:
1101 msg = 'File {} does not exist'.format(filename)
1102 if exception_if_not_exists:
1103 self.error(msg)
1104 raise Exception(msg)
1105