Sync with k8s api
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import paramiko
24 import subprocess
25 import os
26 import shutil
27 import asyncio
28 import time
29 import yaml
30 from uuid import uuid4
31 import random
32 from n2vc.k8s_conn import K8sConnector
33
34
35 class K8sHelmConnector(K8sConnector):
36
37 """
38 ##################################################################################################
39 ########################################## P U B L I C ###########################################
40 ##################################################################################################
41 """
42
43 def __init__(
44 self,
45 fs: object,
46 db: object,
47 kubectl_command: str = '/usr/bin/kubectl',
48 helm_command: str = '/usr/bin/helm',
49 log: object = None,
50 on_update_db=None
51 ):
52 """
53
54 :param fs: file system for kubernetes and helm configuration
55 :param db: database object to write current operation status
56 :param kubectl_command: path to kubectl executable
57 :param helm_command: path to helm executable
58 :param log: logger
59 :param on_update_db: callback called when k8s connector updates database
60 """
61
62 # parent class
63 K8sConnector.__init__(
64 self,
65 db=db,
66 log=log,
67 on_update_db=on_update_db
68 )
69
70 self.info('Initializing K8S Helm connector')
71
72 # random numbers for release name generation
73 random.seed(time.time())
74
75 # the file system
76 self.fs = fs
77
78 # exception if kubectl is not installed
79 self.kubectl_command = kubectl_command
80 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
81
82 # exception if helm is not installed
83 self._helm_command = helm_command
84 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
85
86 # initialize helm client-only
87 self.debug('Initializing helm client-only...')
88 command = '{} init --client-only'.format(self._helm_command)
89 try:
90 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
91 # loop = asyncio.get_event_loop()
92 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
95
96 self.info('K8S Helm connector initialized')
97
98 async def init_env(
99 self,
100 k8s_creds: str,
101 namespace: str = 'kube-system',
102 reuse_cluster_uuid=None
103 ) -> (str, bool):
104
105 cluster_uuid = reuse_cluster_uuid
106 if not cluster_uuid:
107 cluster_uuid = str(uuid4())
108
109 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
110
111 # create config filename
112 kube_dir, helm_dir, config_filename, cluster_dir = \
113 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
114 f = open(config_filename, "w")
115 f.write(k8s_creds)
116 f.close()
117
118 # check if tiller pod is up in cluster
119 command = '{} --kubeconfig={} --namespace={} get deployments'\
120 .format(self.kubectl_command, config_filename, namespace)
121 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
122
123 output_table = K8sHelmConnector._output_to_table(output=output)
124
125 # find 'tiller' pod in all pods
126 already_initialized = False
127 try:
128 for row in output_table:
129 if row[0].startswith('tiller-deploy'):
130 already_initialized = True
131 break
132 except Exception as e:
133 pass
134
135 # helm init
136 n2vc_installed_sw = False
137 if not already_initialized:
138 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
139 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
140 .format(self._helm_command, config_filename, namespace, helm_dir)
141 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
142 n2vc_installed_sw = True
143 else:
144 # check client helm installation
145 check_file = helm_dir + '/repository/repositories.yaml'
146 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
147 self.info('Initializing helm in client: {}'.format(cluster_uuid))
148 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
149 .format(self._helm_command, config_filename, namespace, helm_dir)
150 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
151 else:
152 self.info('Helm client already initialized')
153
154 self.info('Cluster initialized {}'.format(cluster_uuid))
155
156 return cluster_uuid, n2vc_installed_sw
157
158 async def repo_add(
159 self,
160 cluster_uuid: str,
161 name: str,
162 url: str,
163 repo_type: str = 'chart'
164 ):
165
166 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
167
168 # config filename
169 kube_dir, helm_dir, config_filename, cluster_dir = \
170 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
171
172 # helm repo update
173 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
174 self.debug('updating repo: {}'.format(command))
175 await self._local_async_exec(command=command, raise_exception_on_error=False)
176
177 # helm repo add name url
178 command = '{} --kubeconfig={} --home={} repo add {} {}'\
179 .format(self._helm_command, config_filename, helm_dir, name, url)
180 self.debug('adding repo: {}'.format(command))
181 await self._local_async_exec(command=command, raise_exception_on_error=True)
182
183 async def repo_list(
184 self,
185 cluster_uuid: str
186 ) -> list:
187 """
188 Get the list of registered repositories
189
190 :return: list of registered repositories: [ (name, url) .... ]
191 """
192
193 self.debug('list repositories for cluster {}'.format(cluster_uuid))
194
195 # config filename
196 kube_dir, helm_dir, config_filename, cluster_dir = \
197 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
198
199 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
200 .format(self._helm_command, config_filename, helm_dir)
201
202 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
203 if output and len(output) > 0:
204 return yaml.load(output, Loader=yaml.SafeLoader)
205 else:
206 return []
207
208 async def repo_remove(
209 self,
210 cluster_uuid: str,
211 name: str
212 ):
213 """
214 Remove a repository from OSM
215
216 :param cluster_uuid: the cluster
217 :param name: repo name in OSM
218 :return: True if successful
219 """
220
221 self.debug('list repositories for cluster {}'.format(cluster_uuid))
222
223 # config filename
224 kube_dir, helm_dir, config_filename, cluster_dir = \
225 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
226
227 command = '{} --kubeconfig={} --home={} repo remove {}'\
228 .format(self._helm_command, config_filename, helm_dir, name)
229
230 await self._local_async_exec(command=command, raise_exception_on_error=True)
231
232 async def reset(
233 self,
234 cluster_uuid: str,
235 force: bool = False,
236 uninstall_sw: bool = False
237 ) -> bool:
238
239 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
240
241 # get kube and helm directories
242 kube_dir, helm_dir, config_filename, cluster_dir = \
243 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
244
245 # uninstall releases if needed
246 releases = await self.instances_list(cluster_uuid=cluster_uuid)
247 if len(releases) > 0:
248 if force:
249 for r in releases:
250 try:
251 kdu_instance = r.get('Name')
252 chart = r.get('Chart')
253 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
254 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
255 except Exception as e:
256 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
257 else:
258 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
259 .format(cluster_uuid)
260 self.error(msg)
261 raise Exception(msg)
262
263 if uninstall_sw:
264
265 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
266
267 # find namespace for tiller pod
268 command = '{} --kubeconfig={} get deployments --all-namespaces'\
269 .format(self.kubectl_command, config_filename)
270 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
271 output_table = K8sHelmConnector._output_to_table(output=output)
272 namespace = None
273 for r in output_table:
274 try:
275 if 'tiller-deploy' in r[1]:
276 namespace = r[0]
277 break
278 except Exception as e:
279 pass
280 else:
281 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
282 self.error(msg)
283 # raise Exception(msg)
284
285 self.debug('namespace for tiller: {}'.format(namespace))
286
287 force_str = '--force'
288
289 if namespace:
290 # delete tiller deployment
291 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
292 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
293 .format(self.kubectl_command, namespace, config_filename, force_str)
294 await self._local_async_exec(command=command, raise_exception_on_error=False)
295
296 # uninstall tiller from cluster
297 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
298 command = '{} --kubeconfig={} --home={} reset'\
299 .format(self._helm_command, config_filename, helm_dir)
300 self.debug('resetting: {}'.format(command))
301 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
302 else:
303 self.debug('namespace not found')
304
305 # delete cluster directory
306 dir = self.fs.path + '/' + cluster_uuid
307 self.debug('Removing directory {}'.format(dir))
308 shutil.rmtree(dir, ignore_errors=True)
309
310 return True
311
312 async def install(
313 self,
314 cluster_uuid: str,
315 kdu_model: str,
316 atomic: bool = True,
317 timeout: float = 300,
318 params: dict = None,
319 db_dict: dict = None
320 ):
321
322 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
323
324 # config filename
325 kube_dir, helm_dir, config_filename, cluster_dir = \
326 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
327
328 # params to str
329 # params_str = K8sHelmConnector._params_to_set_option(params)
330 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
331
332 timeout_str = ''
333 if timeout:
334 timeout_str = '--timeout {}'.format(timeout)
335
336 # atomic
337 atomic_str = ''
338 if atomic:
339 atomic_str = '--atomic'
340
341 # version
342 version_str = ''
343 if ':' in kdu_model:
344 parts = kdu_model.split(sep=':')
345 if len(parts) == 2:
346 version_str = '--version {}'.format(parts[1])
347 kdu_model = parts[0]
348
349 # generate a name for the releas. Then, check if already exists
350 kdu_instance = None
351 while kdu_instance is None:
352 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
353 try:
354 result = await self._status_kdu(
355 cluster_uuid=cluster_uuid,
356 kdu_instance=kdu_instance,
357 show_error_log=False
358 )
359 if result is not None:
360 # instance already exists: generate a new one
361 kdu_instance = None
362 except Exception as e:
363 kdu_instance = None
364
365 # helm repo install
366 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
367 .format(self._helm_command, atomic_str, config_filename, helm_dir,
368 params_str, timeout_str, kdu_instance, kdu_model, version_str)
369 self.debug('installing: {}'.format(command))
370
371 if atomic:
372 # exec helm in a task
373 exec_task = asyncio.ensure_future(
374 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
375 )
376 # write status in another task
377 status_task = asyncio.ensure_future(
378 coro_or_future=self._store_status(
379 cluster_uuid=cluster_uuid,
380 kdu_instance=kdu_instance,
381 db_dict=db_dict,
382 operation='install',
383 run_once=False
384 )
385 )
386
387 # wait for execution task
388 await asyncio.wait([exec_task])
389
390 # cancel status task
391 status_task.cancel()
392
393 output, rc = exec_task.result()
394
395 else:
396
397 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
398
399 # remove temporal values yaml file
400 if file_to_delete:
401 os.remove(file_to_delete)
402
403 # write final status
404 await self._store_status(
405 cluster_uuid=cluster_uuid,
406 kdu_instance=kdu_instance,
407 db_dict=db_dict,
408 operation='install',
409 run_once=True,
410 check_every=0
411 )
412
413 if rc != 0:
414 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
415 self.error(msg)
416 raise Exception(msg)
417
418 self.debug('Returning kdu_instance {}'.format(kdu_instance))
419 return kdu_instance
420
421 async def instances_list(
422 self,
423 cluster_uuid: str
424 ) -> list:
425 """
426 returns a list of deployed releases in a cluster
427
428 :param cluster_uuid: the cluster
429 :return:
430 """
431
432 self.debug('list releases for cluster {}'.format(cluster_uuid))
433
434 # config filename
435 kube_dir, helm_dir, config_filename, cluster_dir = \
436 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
437
438 command = '{} --kubeconfig={} --home={} list --output yaml'\
439 .format(self._helm_command, config_filename, helm_dir)
440
441 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
442
443 if output and len(output) > 0:
444 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
445 else:
446 return []
447
448 async def upgrade(
449 self,
450 cluster_uuid: str,
451 kdu_instance: str,
452 kdu_model: str = None,
453 atomic: bool = True,
454 timeout: float = 300,
455 params: dict = None,
456 db_dict: dict = None
457 ):
458
459 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
460
461 # config filename
462 kube_dir, helm_dir, config_filename, cluster_dir = \
463 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
464
465 # params to str
466 # params_str = K8sHelmConnector._params_to_set_option(params)
467 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
468
469 timeout_str = ''
470 if timeout:
471 timeout_str = '--timeout {}'.format(timeout)
472
473 # atomic
474 atomic_str = ''
475 if atomic:
476 atomic_str = '--atomic'
477
478 # version
479 version_str = ''
480 if kdu_model and ':' in kdu_model:
481 parts = kdu_model.split(sep=':')
482 if len(parts) == 2:
483 version_str = '--version {}'.format(parts[1])
484 kdu_model = parts[0]
485
486 # helm repo upgrade
487 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
488 .format(self._helm_command, atomic_str, config_filename, helm_dir,
489 params_str, timeout_str, kdu_instance, kdu_model, version_str)
490 self.debug('upgrading: {}'.format(command))
491
492 if atomic:
493
494 # exec helm in a task
495 exec_task = asyncio.ensure_future(
496 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
497 )
498 # write status in another task
499 status_task = asyncio.ensure_future(
500 coro_or_future=self._store_status(
501 cluster_uuid=cluster_uuid,
502 kdu_instance=kdu_instance,
503 db_dict=db_dict,
504 operation='upgrade',
505 run_once=False
506 )
507 )
508
509 # wait for execution task
510 await asyncio.wait([exec_task])
511
512 # cancel status task
513 status_task.cancel()
514 output, rc = exec_task.result()
515
516 else:
517
518 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
519
520 # remove temporal values yaml file
521 if file_to_delete:
522 os.remove(file_to_delete)
523
524 # write final status
525 await self._store_status(
526 cluster_uuid=cluster_uuid,
527 kdu_instance=kdu_instance,
528 db_dict=db_dict,
529 operation='upgrade',
530 run_once=True,
531 check_every=0
532 )
533
534 if rc != 0:
535 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
536 self.error(msg)
537 raise Exception(msg)
538
539 # return new revision number
540 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
541 if instance:
542 revision = int(instance.get('Revision'))
543 self.debug('New revision: {}'.format(revision))
544 return revision
545 else:
546 return 0
547
548 async def rollback(
549 self,
550 cluster_uuid: str,
551 kdu_instance: str,
552 revision=0,
553 db_dict: dict = None
554 ):
555
556 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
557 .format(kdu_instance, revision, cluster_uuid))
558
559 # config filename
560 kube_dir, helm_dir, config_filename, cluster_dir = \
561 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
562
563 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
564 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
565
566 # exec helm in a task
567 exec_task = asyncio.ensure_future(
568 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
569 )
570 # write status in another task
571 status_task = asyncio.ensure_future(
572 coro_or_future=self._store_status(
573 cluster_uuid=cluster_uuid,
574 kdu_instance=kdu_instance,
575 db_dict=db_dict,
576 operation='rollback',
577 run_once=False
578 )
579 )
580
581 # wait for execution task
582 await asyncio.wait([exec_task])
583
584 # cancel status task
585 status_task.cancel()
586
587 output, rc = exec_task.result()
588
589 # write final status
590 await self._store_status(
591 cluster_uuid=cluster_uuid,
592 kdu_instance=kdu_instance,
593 db_dict=db_dict,
594 operation='rollback',
595 run_once=True,
596 check_every=0
597 )
598
599 if rc != 0:
600 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
601 self.error(msg)
602 raise Exception(msg)
603
604 # return new revision number
605 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
606 if instance:
607 revision = int(instance.get('Revision'))
608 self.debug('New revision: {}'.format(revision))
609 return revision
610 else:
611 return 0
612
613 async def uninstall(
614 self,
615 cluster_uuid: str,
616 kdu_instance: str
617 ):
618 """
619 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
620 after all _terminate-config-primitive_ of the VNF are invoked).
621
622 :param cluster_uuid: UUID of a K8s cluster known by OSM
623 :param kdu_instance: unique name for the KDU instance to be deleted
624 :return: True if successful
625 """
626
627 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
628
629 # config filename
630 kube_dir, helm_dir, config_filename, cluster_dir = \
631 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
632
633 command = '{} --kubeconfig={} --home={} delete --purge {}'\
634 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
635
636 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
637
638 return self._output_to_table(output)
639
640 async def inspect_kdu(
641 self,
642 kdu_model: str,
643 repo_url: str = None
644 ) -> str:
645
646 self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
647
648 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
649
650 async def values_kdu(
651 self,
652 kdu_model: str,
653 repo_url: str = None
654 ) -> str:
655
656 self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
657
658 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
659
660 async def help_kdu(
661 self,
662 kdu_model: str,
663 repo_url: str = None
664 ) -> str:
665
666 self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
667
668 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
669
670 async def status_kdu(
671 self,
672 cluster_uuid: str,
673 kdu_instance: str
674 ) -> str:
675
676 # call internal function
677 return await self._status_kdu(
678 cluster_uuid=cluster_uuid,
679 kdu_instance=kdu_instance,
680 show_error_log=True,
681 return_text=True
682 )
683
684 """
685 ##################################################################################################
686 ########################################## P R I V A T E #########################################
687 ##################################################################################################
688 """
689
690 async def _exec_inspect_comand(
691 self,
692 inspect_command: str,
693 kdu_model: str,
694 repo_url: str = None
695 ):
696
697 repo_str = ''
698 if repo_url:
699 repo_str = ' --repo {}'.format(repo_url)
700 idx = kdu_model.find('/')
701 if idx >= 0:
702 idx += 1
703 kdu_model = kdu_model[idx:]
704
705 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
706 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
707
708 return output
709
710 async def _status_kdu(
711 self,
712 cluster_uuid: str,
713 kdu_instance: str,
714 show_error_log: bool = False,
715 return_text: bool = False
716 ):
717
718 self.debug('status of kdu_instance {}'.format(kdu_instance))
719
720 # config filename
721 kube_dir, helm_dir, config_filename, cluster_dir = \
722 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
723
724 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
725 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
726
727 output, rc = await self._local_async_exec(
728 command=command,
729 raise_exception_on_error=True,
730 show_error_log=show_error_log
731 )
732
733 if return_text:
734 return str(output)
735
736 if rc != 0:
737 return None
738
739 data = yaml.load(output, Loader=yaml.SafeLoader)
740
741 # remove field 'notes'
742 try:
743 del data.get('info').get('status')['notes']
744 except KeyError:
745 pass
746
747 # parse field 'resources'
748 try:
749 resources = str(data.get('info').get('status').get('resources'))
750 resource_table = self._output_to_table(resources)
751 data.get('info').get('status')['resources'] = resource_table
752 except Exception as e:
753 pass
754
755 return data
756
757 async def get_instance_info(
758 self,
759 cluster_uuid: str,
760 kdu_instance: str
761 ):
762 instances = await self.instances_list(cluster_uuid=cluster_uuid)
763 for instance in instances:
764 if instance.get('Name') == kdu_instance:
765 return instance
766 self.debug('Instance {} not found'.format(kdu_instance))
767 return None
768
769 @staticmethod
770 def _generate_release_name(
771 chart_name: str
772 ):
773 name = ''
774 for c in chart_name:
775 if c.isalpha() or c.isnumeric():
776 name += c
777 else:
778 name += '-'
779 if len(name) > 35:
780 name = name[0:35]
781
782 # if does not start with alpha character, prefix 'a'
783 if not name[0].isalpha():
784 name = 'a' + name
785
786 name += '-'
787
788 def get_random_number():
789 r = random.randrange(start=1, stop=99999999)
790 s = str(r)
791 s = s.rjust(width=10, fillchar=' ')
792 return s
793
794 name = name + get_random_number()
795 return name.lower()
796
797 async def _store_status(
798 self,
799 cluster_uuid: str,
800 operation: str,
801 kdu_instance: str,
802 check_every: float = 10,
803 db_dict: dict = None,
804 run_once: bool = False
805 ):
806 while True:
807 try:
808 await asyncio.sleep(check_every)
809 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
810 status = detailed_status.get('info').get('Description')
811 print('=' * 60)
812 self.debug('STATUS:\n{}'.format(status))
813 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
814 print('=' * 60)
815 # write status to db
816 result = await self.write_app_status_to_db(
817 db_dict=db_dict,
818 status=str(status),
819 detailed_status=str(detailed_status),
820 operation=operation)
821 if not result:
822 self.info('Error writing in database. Task exiting...')
823 return
824 except asyncio.CancelledError:
825 self.debug('Task cancelled')
826 return
827 except Exception as e:
828 pass
829 finally:
830 if run_once:
831 return
832
833 async def _is_install_completed(
834 self,
835 cluster_uuid: str,
836 kdu_instance: str
837 ) -> bool:
838
839 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
840
841 # extract info.status.resources-> str
842 # format:
843 # ==> v1/Deployment
844 # NAME READY UP-TO-DATE AVAILABLE AGE
845 # halting-horse-mongodb 0/1 1 0 0s
846 # halting-petit-mongodb 1/1 1 0 0s
847 # blank line
848 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
849
850 # convert to table
851 resources = K8sHelmConnector._output_to_table(resources)
852
853 num_lines = len(resources)
854 index = 0
855 while index < num_lines:
856 try:
857 line1 = resources[index]
858 index += 1
859 # find '==>' in column 0
860 if line1[0] == '==>':
861 line2 = resources[index]
862 index += 1
863 # find READY in column 1
864 if line2[1] == 'READY':
865 # read next lines
866 line3 = resources[index]
867 index += 1
868 while len(line3) > 1 and index < num_lines:
869 ready_value = line3[1]
870 parts = ready_value.split(sep='/')
871 current = int(parts[0])
872 total = int(parts[1])
873 if current < total:
874 self.debug('NOT READY:\n {}'.format(line3))
875 ready = False
876 line3 = resources[index]
877 index += 1
878
879 except Exception as e:
880 pass
881
882 return ready
883
884 @staticmethod
885 def _get_deep(dictionary: dict, members: tuple):
886 target = dictionary
887 value = None
888 try:
889 for m in members:
890 value = target.get(m)
891 if not value:
892 return None
893 else:
894 target = value
895 except Exception as e:
896 pass
897 return value
898
899 # find key:value in several lines
900 @staticmethod
901 def _find_in_lines(p_lines: list, p_key: str) -> str:
902 for line in p_lines:
903 try:
904 if line.startswith(p_key + ':'):
905 parts = line.split(':')
906 the_value = parts[1].strip()
907 return the_value
908 except Exception as e:
909 # ignore it
910 pass
911 return None
912
913 # params for use in -f file
914 # returns values file option and filename (in order to delete it at the end)
915 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
916
917 if params and len(params) > 0:
918 kube_dir, helm_dir, config_filename, cluster_dir = \
919 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
920
921 def get_random_number():
922 r = random.randrange(start=1, stop=99999999)
923 s = str(r)
924 while len(s) < 10:
925 s = '0' + s
926 return s
927
928 params2 = dict()
929 for key in params:
930 value = params.get(key)
931 if '!!yaml' in str(value):
932 value = yaml.load(value[7:])
933 params2[key] = value
934
935 values_file = get_random_number() + '.yaml'
936 with open(values_file, 'w') as stream:
937 yaml.dump(params2, stream, indent=4, default_flow_style=False)
938
939 return '-f {}'.format(values_file), values_file
940
941 return '', None
942
943 # params for use in --set option
944 @staticmethod
945 def _params_to_set_option(params: dict) -> str:
946 params_str = ''
947 if params and len(params) > 0:
948 start = True
949 for key in params:
950 value = params.get(key, None)
951 if value is not None:
952 if start:
953 params_str += '--set '
954 start = False
955 else:
956 params_str += ','
957 params_str += '{}={}'.format(key, value)
958 return params_str
959
960 @staticmethod
961 def _output_to_lines(output: str) -> list:
962 output_lines = list()
963 lines = output.splitlines(keepends=False)
964 for line in lines:
965 line = line.strip()
966 if len(line) > 0:
967 output_lines.append(line)
968 return output_lines
969
970 @staticmethod
971 def _output_to_table(output: str) -> list:
972 output_table = list()
973 lines = output.splitlines(keepends=False)
974 for line in lines:
975 line = line.replace('\t', ' ')
976 line_list = list()
977 output_table.append(line_list)
978 cells = line.split(sep=' ')
979 for cell in cells:
980 cell = cell.strip()
981 if len(cell) > 0:
982 line_list.append(cell)
983 return output_table
984
985 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
986 """
987 Returns kube and helm directories
988
989 :param cluster_name:
990 :param create_if_not_exist:
991 :return: kube, helm directories, config filename and cluster dir.
992 Raises exception if not exist and cannot create
993 """
994
995 base = self.fs.path
996 if base.endswith("/") or base.endswith("\\"):
997 base = base[:-1]
998
999 # base dir for cluster
1000 cluster_dir = base + '/' + cluster_name
1001 if create_if_not_exist and not os.path.exists(cluster_dir):
1002 self.debug('Creating dir {}'.format(cluster_dir))
1003 os.makedirs(cluster_dir)
1004 if not os.path.exists(cluster_dir):
1005 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
1006 self.error(msg)
1007 raise Exception(msg)
1008
1009 # kube dir
1010 kube_dir = cluster_dir + '/' + '.kube'
1011 if create_if_not_exist and not os.path.exists(kube_dir):
1012 self.debug('Creating dir {}'.format(kube_dir))
1013 os.makedirs(kube_dir)
1014 if not os.path.exists(kube_dir):
1015 msg = 'Kube config dir {} does not exist'.format(kube_dir)
1016 self.error(msg)
1017 raise Exception(msg)
1018
1019 # helm home dir
1020 helm_dir = cluster_dir + '/' + '.helm'
1021 if create_if_not_exist and not os.path.exists(helm_dir):
1022 self.debug('Creating dir {}'.format(helm_dir))
1023 os.makedirs(helm_dir)
1024 if not os.path.exists(helm_dir):
1025 msg = 'Helm config dir {} does not exist'.format(helm_dir)
1026 self.error(msg)
1027 raise Exception(msg)
1028
1029 config_filename = kube_dir + '/config'
1030 return kube_dir, helm_dir, config_filename, cluster_dir
1031
1032 @staticmethod
1033 def _remove_multiple_spaces(str):
1034 str = str.strip()
1035 while ' ' in str:
1036 str = str.replace(' ', ' ')
1037 return str
1038
1039 def _local_exec(
1040 self,
1041 command: str
1042 ) -> (str, int):
1043 command = K8sHelmConnector._remove_multiple_spaces(command)
1044 self.debug('Executing sync local command: {}'.format(command))
1045 # raise exception if fails
1046 output = ''
1047 try:
1048 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1049 return_code = 0
1050 self.debug(output)
1051 except Exception as e:
1052 return_code = 1
1053
1054 return output, return_code
1055
1056 async def _local_async_exec(
1057 self,
1058 command: str,
1059 raise_exception_on_error: bool = False,
1060 show_error_log: bool = True,
1061 encode_utf8: bool = False
1062 ) -> (str, int):
1063
1064 command = K8sHelmConnector._remove_multiple_spaces(command)
1065 self.debug('Executing async local command: {}'.format(command))
1066
1067 # split command
1068 command = command.split(sep=' ')
1069
1070 try:
1071 process = await asyncio.create_subprocess_exec(
1072 *command,
1073 stdout=asyncio.subprocess.PIPE,
1074 stderr=asyncio.subprocess.PIPE
1075 )
1076
1077 # wait for command terminate
1078 stdout, stderr = await process.communicate()
1079
1080 return_code = process.returncode
1081
1082 output = ''
1083 if stdout:
1084 output = stdout.decode('utf-8').strip()
1085 # output = stdout.decode()
1086 if stderr:
1087 output = stderr.decode('utf-8').strip()
1088 # output = stderr.decode()
1089
1090 if return_code != 0 and show_error_log:
1091 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1092 else:
1093 self.debug('Return code: {}'.format(return_code))
1094
1095 if raise_exception_on_error and return_code != 0:
1096 raise Exception(output)
1097
1098 if encode_utf8:
1099 output = output.encode('utf-8').strip()
1100 output = str(output).replace('\\n', '\n')
1101
1102 return output, return_code
1103
1104 except Exception as e:
1105 msg = 'Exception executing command: {} -> {}'.format(command, e)
1106 if show_error_log:
1107 self.error(msg)
1108 return '', -1
1109
1110 def _remote_exec(
1111 self,
1112 hostname: str,
1113 username: str,
1114 password: str,
1115 command: str,
1116 timeout: int = 10
1117 ) -> (str, int):
1118
1119 command = K8sHelmConnector._remove_multiple_spaces(command)
1120 self.debug('Executing sync remote ssh command: {}'.format(command))
1121
1122 ssh = paramiko.SSHClient()
1123 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1124 ssh.connect(hostname=hostname, username=username, password=password)
1125 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1126 output = ssh_stdout.read().decode('utf-8')
1127 error = ssh_stderr.read().decode('utf-8')
1128 if error:
1129 self.error('ERROR: {}'.format(error))
1130 return_code = 1
1131 else:
1132 return_code = 0
1133 output = output.replace('\\n', '\n')
1134 self.debug('OUTPUT: {}'.format(output))
1135
1136 return output, return_code
1137
1138 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1139 self.debug('Checking if file {} exists...'.format(filename))
1140 if os.path.exists(filename):
1141 return True
1142 else:
1143 msg = 'File {} does not exist'.format(filename)
1144 if exception_if_not_exists:
1145 self.error(msg)
1146 raise Exception(msg)