Fix flake8
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import paramiko
24 import subprocess
25 import os
26 import shutil
27 import asyncio
28 import time
29 import yaml
30 from uuid import uuid4
31 import random
32 from n2vc.k8s_conn import K8sConnector
33 from n2vc.exceptions import K8sException
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = '/usr/bin/kubectl',
49 helm_command: str = '/usr/bin/helm',
50 log: object = None,
51 on_update_db=None
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(
65 self,
66 db=db,
67 log=log,
68 on_update_db=on_update_db
69 )
70
71 self.info('Initializing K8S Helm connector')
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # initialize helm client-only
88 self.debug('Initializing helm client-only...')
89 command = '{} init --client-only'.format(self._helm_command)
90 try:
91 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
92 # loop = asyncio.get_event_loop()
93 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
94 except Exception as e:
95 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
96
97 self.info('K8S Helm connector initialized')
98
99 async def init_env(
100 self,
101 k8s_creds: str,
102 namespace: str = 'kube-system',
103 reuse_cluster_uuid=None
104 ) -> (str, bool):
105
106 cluster_uuid = reuse_cluster_uuid
107 if not cluster_uuid:
108 cluster_uuid = str(uuid4())
109
110 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
111
112 # create config filename
113 kube_dir, helm_dir, config_filename, cluster_dir = \
114 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
115 f = open(config_filename, "w")
116 f.write(k8s_creds)
117 f.close()
118
119 # check if tiller pod is up in cluster
120 command = '{} --kubeconfig={} --namespace={} get deployments'\
121 .format(self.kubectl_command, config_filename, namespace)
122 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
123
124 output_table = K8sHelmConnector._output_to_table(output=output)
125
126 # find 'tiller' pod in all pods
127 already_initialized = False
128 try:
129 for row in output_table:
130 if row[0].startswith('tiller-deploy'):
131 already_initialized = True
132 break
133 except Exception as e:
134 pass
135
136 # helm init
137 n2vc_installed_sw = False
138 if not already_initialized:
139 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
140 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
141 .format(self._helm_command, config_filename, namespace, helm_dir)
142 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
143 n2vc_installed_sw = True
144 else:
145 # check client helm installation
146 check_file = helm_dir + '/repository/repositories.yaml'
147 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
148 self.info('Initializing helm in client: {}'.format(cluster_uuid))
149 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
150 .format(self._helm_command, config_filename, namespace, helm_dir)
151 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
152 else:
153 self.info('Helm client already initialized')
154
155 self.info('Cluster initialized {}'.format(cluster_uuid))
156
157 return cluster_uuid, n2vc_installed_sw
158
159 async def repo_add(
160 self,
161 cluster_uuid: str,
162 name: str,
163 url: str,
164 repo_type: str = 'chart'
165 ):
166
167 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
168
169 # config filename
170 kube_dir, helm_dir, config_filename, cluster_dir = \
171 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
172
173 # helm repo update
174 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
175 self.debug('updating repo: {}'.format(command))
176 await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 # helm repo add name url
179 command = '{} --kubeconfig={} --home={} repo add {} {}'\
180 .format(self._helm_command, config_filename, helm_dir, name, url)
181 self.debug('adding repo: {}'.format(command))
182 await self._local_async_exec(command=command, raise_exception_on_error=True)
183
184 async def repo_list(
185 self,
186 cluster_uuid: str
187 ) -> list:
188 """
189 Get the list of registered repositories
190
191 :return: list of registered repositories: [ (name, url) .... ]
192 """
193
194 self.debug('list repositories for cluster {}'.format(cluster_uuid))
195
196 # config filename
197 kube_dir, helm_dir, config_filename, cluster_dir = \
198 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
199
200 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
201 .format(self._helm_command, config_filename, helm_dir)
202
203 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
204 if output and len(output) > 0:
205 return yaml.load(output, Loader=yaml.SafeLoader)
206 else:
207 return []
208
209 async def repo_remove(
210 self,
211 cluster_uuid: str,
212 name: str
213 ):
214 """
215 Remove a repository from OSM
216
217 :param cluster_uuid: the cluster
218 :param name: repo name in OSM
219 :return: True if successful
220 """
221
222 self.debug('list repositories for cluster {}'.format(cluster_uuid))
223
224 # config filename
225 kube_dir, helm_dir, config_filename, cluster_dir = \
226 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
227
228 command = '{} --kubeconfig={} --home={} repo remove {}'\
229 .format(self._helm_command, config_filename, helm_dir, name)
230
231 await self._local_async_exec(command=command, raise_exception_on_error=True)
232
233 async def reset(
234 self,
235 cluster_uuid: str,
236 force: bool = False,
237 uninstall_sw: bool = False
238 ) -> bool:
239
240 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
241
242 # get kube and helm directories
243 kube_dir, helm_dir, config_filename, cluster_dir = \
244 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
245
246 # uninstall releases if needed
247 releases = await self.instances_list(cluster_uuid=cluster_uuid)
248 if len(releases) > 0:
249 if force:
250 for r in releases:
251 try:
252 kdu_instance = r.get('Name')
253 chart = r.get('Chart')
254 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
255 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
256 except Exception as e:
257 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
258 else:
259 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
260 .format(cluster_uuid)
261 self.error(msg)
262 raise K8sException(msg)
263
264 if uninstall_sw:
265
266 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
267
268 # find namespace for tiller pod
269 command = '{} --kubeconfig={} get deployments --all-namespaces'\
270 .format(self.kubectl_command, config_filename)
271 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
272 output_table = K8sHelmConnector._output_to_table(output=output)
273 namespace = None
274 for r in output_table:
275 try:
276 if 'tiller-deploy' in r[1]:
277 namespace = r[0]
278 break
279 except Exception as e:
280 pass
281 else:
282 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
283 self.error(msg)
284
285 self.debug('namespace for tiller: {}'.format(namespace))
286
287 force_str = '--force'
288
289 if namespace:
290 # delete tiller deployment
291 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
292 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
293 .format(self.kubectl_command, namespace, config_filename, force_str)
294 await self._local_async_exec(command=command, raise_exception_on_error=False)
295
296 # uninstall tiller from cluster
297 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
298 command = '{} --kubeconfig={} --home={} reset'\
299 .format(self._helm_command, config_filename, helm_dir)
300 self.debug('resetting: {}'.format(command))
301 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
302 else:
303 self.debug('namespace not found')
304
305 # delete cluster directory
306 dir = self.fs.path + '/' + cluster_uuid
307 self.debug('Removing directory {}'.format(dir))
308 shutil.rmtree(dir, ignore_errors=True)
309
310 return True
311
312 async def install(
313 self,
314 cluster_uuid: str,
315 kdu_model: str,
316 atomic: bool = True,
317 timeout: float = 300,
318 params: dict = None,
319 db_dict: dict = None
320 ):
321
322 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
323
324 # config filename
325 kube_dir, helm_dir, config_filename, cluster_dir = \
326 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
327
328 # params to str
329 # params_str = K8sHelmConnector._params_to_set_option(params)
330 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
331
332 timeout_str = ''
333 if timeout:
334 timeout_str = '--timeout {}'.format(timeout)
335
336 # atomic
337 atomic_str = ''
338 if atomic:
339 atomic_str = '--atomic'
340
341 # version
342 version_str = ''
343 if ':' in kdu_model:
344 parts = kdu_model.split(sep=':')
345 if len(parts) == 2:
346 version_str = '--version {}'.format(parts[1])
347 kdu_model = parts[0]
348
349 # generate a name for the release. Then, check if already exists
350 kdu_instance = None
351 while kdu_instance is None:
352 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
353 try:
354 result = await self._status_kdu(
355 cluster_uuid=cluster_uuid,
356 kdu_instance=kdu_instance,
357 show_error_log=False
358 )
359 if result is not None:
360 # instance already exists: generate a new one
361 kdu_instance = None
362 except Exception as e:
363 kdu_instance = None
364
365 # helm repo install
366 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
367 .format(self._helm_command, atomic_str, config_filename, helm_dir,
368 params_str, timeout_str, kdu_instance, kdu_model, version_str)
369 self.debug('installing: {}'.format(command))
370
371 if atomic:
372 # exec helm in a task
373 exec_task = asyncio.ensure_future(
374 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
375 )
376 # write status in another task
377 status_task = asyncio.ensure_future(
378 coro_or_future=self._store_status(
379 cluster_uuid=cluster_uuid,
380 kdu_instance=kdu_instance,
381 db_dict=db_dict,
382 operation='install',
383 run_once=False
384 )
385 )
386
387 # wait for execution task
388 await asyncio.wait([exec_task])
389
390 # cancel status task
391 status_task.cancel()
392
393 output, rc = exec_task.result()
394
395 else:
396
397 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
398
399 # remove temporal values yaml file
400 if file_to_delete:
401 os.remove(file_to_delete)
402
403 # write final status
404 await self._store_status(
405 cluster_uuid=cluster_uuid,
406 kdu_instance=kdu_instance,
407 db_dict=db_dict,
408 operation='install',
409 run_once=True,
410 check_every=0
411 )
412
413 if rc != 0:
414 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
415 self.error(msg)
416 raise K8sException(msg)
417
418 self.debug('Returning kdu_instance {}'.format(kdu_instance))
419 return kdu_instance
420
421 async def instances_list(
422 self,
423 cluster_uuid: str
424 ) -> list:
425 """
426 returns a list of deployed releases in a cluster
427
428 :param cluster_uuid: the cluster
429 :return:
430 """
431
432 self.debug('list releases for cluster {}'.format(cluster_uuid))
433
434 # config filename
435 kube_dir, helm_dir, config_filename, cluster_dir = \
436 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
437
438 command = '{} --kubeconfig={} --home={} list --output yaml'\
439 .format(self._helm_command, config_filename, helm_dir)
440
441 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
442
443 if output and len(output) > 0:
444 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
445 else:
446 return []
447
448 async def upgrade(
449 self,
450 cluster_uuid: str,
451 kdu_instance: str,
452 kdu_model: str = None,
453 atomic: bool = True,
454 timeout: float = 300,
455 params: dict = None,
456 db_dict: dict = None
457 ):
458
459 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
460
461 # config filename
462 kube_dir, helm_dir, config_filename, cluster_dir = \
463 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
464
465 # params to str
466 # params_str = K8sHelmConnector._params_to_set_option(params)
467 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
468
469 timeout_str = ''
470 if timeout:
471 timeout_str = '--timeout {}'.format(timeout)
472
473 # atomic
474 atomic_str = ''
475 if atomic:
476 atomic_str = '--atomic'
477
478 # version
479 version_str = ''
480 if kdu_model and ':' in kdu_model:
481 parts = kdu_model.split(sep=':')
482 if len(parts) == 2:
483 version_str = '--version {}'.format(parts[1])
484 kdu_model = parts[0]
485
486 # helm repo upgrade
487 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
488 .format(self._helm_command, atomic_str, config_filename, helm_dir,
489 params_str, timeout_str, kdu_instance, kdu_model, version_str)
490 self.debug('upgrading: {}'.format(command))
491
492 if atomic:
493
494 # exec helm in a task
495 exec_task = asyncio.ensure_future(
496 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
497 )
498 # write status in another task
499 status_task = asyncio.ensure_future(
500 coro_or_future=self._store_status(
501 cluster_uuid=cluster_uuid,
502 kdu_instance=kdu_instance,
503 db_dict=db_dict,
504 operation='upgrade',
505 run_once=False
506 )
507 )
508
509 # wait for execution task
510 await asyncio.wait([exec_task])
511
512 # cancel status task
513 status_task.cancel()
514 output, rc = exec_task.result()
515
516 else:
517
518 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
519
520 # remove temporal values yaml file
521 if file_to_delete:
522 os.remove(file_to_delete)
523
524 # write final status
525 await self._store_status(
526 cluster_uuid=cluster_uuid,
527 kdu_instance=kdu_instance,
528 db_dict=db_dict,
529 operation='upgrade',
530 run_once=True,
531 check_every=0
532 )
533
534 if rc != 0:
535 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
536 self.error(msg)
537 raise K8sException(msg)
538
539 # return new revision number
540 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
541 if instance:
542 revision = int(instance.get('Revision'))
543 self.debug('New revision: {}'.format(revision))
544 return revision
545 else:
546 return 0
547
548 async def rollback(
549 self,
550 cluster_uuid: str,
551 kdu_instance: str,
552 revision=0,
553 db_dict: dict = None
554 ):
555
556 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
557 .format(kdu_instance, revision, cluster_uuid))
558
559 # config filename
560 kube_dir, helm_dir, config_filename, cluster_dir = \
561 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
562
563 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
564 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
565
566 # exec helm in a task
567 exec_task = asyncio.ensure_future(
568 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
569 )
570 # write status in another task
571 status_task = asyncio.ensure_future(
572 coro_or_future=self._store_status(
573 cluster_uuid=cluster_uuid,
574 kdu_instance=kdu_instance,
575 db_dict=db_dict,
576 operation='rollback',
577 run_once=False
578 )
579 )
580
581 # wait for execution task
582 await asyncio.wait([exec_task])
583
584 # cancel status task
585 status_task.cancel()
586
587 output, rc = exec_task.result()
588
589 # write final status
590 await self._store_status(
591 cluster_uuid=cluster_uuid,
592 kdu_instance=kdu_instance,
593 db_dict=db_dict,
594 operation='rollback',
595 run_once=True,
596 check_every=0
597 )
598
599 if rc != 0:
600 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
601 self.error(msg)
602 raise K8sException(msg)
603
604 # return new revision number
605 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
606 if instance:
607 revision = int(instance.get('Revision'))
608 self.debug('New revision: {}'.format(revision))
609 return revision
610 else:
611 return 0
612
613 async def uninstall(
614 self,
615 cluster_uuid: str,
616 kdu_instance: str
617 ):
618 """
619 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
620 after all _terminate-config-primitive_ of the VNF are invoked).
621
622 :param cluster_uuid: UUID of a K8s cluster known by OSM
623 :param kdu_instance: unique name for the KDU instance to be deleted
624 :return: True if successful
625 """
626
627 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
628
629 # config filename
630 kube_dir, helm_dir, config_filename, cluster_dir = \
631 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
632
633 command = '{} --kubeconfig={} --home={} delete --purge {}'\
634 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
635
636 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
637
638 return self._output_to_table(output)
639
640 async def inspect_kdu(
641 self,
642 kdu_model: str,
643 repo_url: str = None
644 ) -> str:
645
646 self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
647
648 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
649
650 async def values_kdu(
651 self,
652 kdu_model: str,
653 repo_url: str = None
654 ) -> str:
655
656 self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
657
658 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
659
660 async def help_kdu(
661 self,
662 kdu_model: str,
663 repo_url: str = None
664 ) -> str:
665
666 self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
667
668 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
669
670 async def status_kdu(
671 self,
672 cluster_uuid: str,
673 kdu_instance: str
674 ) -> str:
675
676 # call internal function
677 return await self._status_kdu(
678 cluster_uuid=cluster_uuid,
679 kdu_instance=kdu_instance,
680 show_error_log=True,
681 return_text=True
682 )
683
684 """
685 ##################################################################################################
686 ########################################## P R I V A T E #########################################
687 ##################################################################################################
688 """
689
690 async def _exec_inspect_comand(
691 self,
692 inspect_command: str,
693 kdu_model: str,
694 repo_url: str = None
695 ):
696
697 repo_str = ''
698 if repo_url:
699 repo_str = ' --repo {}'.format(repo_url)
700 idx = kdu_model.find('/')
701 if idx >= 0:
702 idx += 1
703 kdu_model = kdu_model[idx:]
704
705 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
706 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
707
708 return output
709
710 async def _status_kdu(
711 self,
712 cluster_uuid: str,
713 kdu_instance: str,
714 show_error_log: bool = False,
715 return_text: bool = False
716 ):
717
718 self.debug('status of kdu_instance {}'.format(kdu_instance))
719
720 # config filename
721 kube_dir, helm_dir, config_filename, cluster_dir = \
722 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
723
724 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
725 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
726
727 output, rc = await self._local_async_exec(
728 command=command,
729 raise_exception_on_error=True,
730 show_error_log=show_error_log
731 )
732
733 if return_text:
734 return str(output)
735
736 if rc != 0:
737 return None
738
739 data = yaml.load(output, Loader=yaml.SafeLoader)
740
741 # remove field 'notes'
742 try:
743 del data.get('info').get('status')['notes']
744 except KeyError:
745 pass
746
747 # parse field 'resources'
748 try:
749 resources = str(data.get('info').get('status').get('resources'))
750 resource_table = self._output_to_table(resources)
751 data.get('info').get('status')['resources'] = resource_table
752 except Exception as e:
753 pass
754
755 return data
756
757 async def get_instance_info(
758 self,
759 cluster_uuid: str,
760 kdu_instance: str
761 ):
762 instances = await self.instances_list(cluster_uuid=cluster_uuid)
763 for instance in instances:
764 if instance.get('Name') == kdu_instance:
765 return instance
766 self.debug('Instance {} not found'.format(kdu_instance))
767 return None
768
769 @staticmethod
770 def _generate_release_name(
771 chart_name: str
772 ):
773 name = ''
774 for c in chart_name:
775 if c.isalpha() or c.isnumeric():
776 name += c
777 else:
778 name += '-'
779 if len(name) > 35:
780 name = name[0:35]
781
782 # if does not start with alpha character, prefix 'a'
783 if not name[0].isalpha():
784 name = 'a' + name
785
786 name += '-'
787
788 def get_random_number():
789 r = random.randrange(start=1, stop=99999999)
790 s = str(r)
791 s = s.rjust(10, '0')
792 return s
793
794 name = name + get_random_number()
795 return name.lower()
796
797 async def _store_status(
798 self,
799 cluster_uuid: str,
800 operation: str,
801 kdu_instance: str,
802 check_every: float = 10,
803 db_dict: dict = None,
804 run_once: bool = False
805 ):
806 while True:
807 try:
808 await asyncio.sleep(check_every)
809 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
810 status = detailed_status.get('info').get('Description')
811 print('=' * 60)
812 self.debug('STATUS:\n{}'.format(status))
813 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
814 print('=' * 60)
815 # write status to db
816 result = await self.write_app_status_to_db(
817 db_dict=db_dict,
818 status=str(status),
819 detailed_status=str(detailed_status),
820 operation=operation)
821 if not result:
822 self.info('Error writing in database. Task exiting...')
823 return
824 except asyncio.CancelledError:
825 self.debug('Task cancelled')
826 return
827 except Exception as e:
828 pass
829 finally:
830 if run_once:
831 return
832
833 async def _is_install_completed(
834 self,
835 cluster_uuid: str,
836 kdu_instance: str
837 ) -> bool:
838
839 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
840
841 # extract info.status.resources-> str
842 # format:
843 # ==> v1/Deployment
844 # NAME READY UP-TO-DATE AVAILABLE AGE
845 # halting-horse-mongodb 0/1 1 0 0s
846 # halting-petit-mongodb 1/1 1 0 0s
847 # blank line
848 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
849
850 # convert to table
851 resources = K8sHelmConnector._output_to_table(resources)
852
853 num_lines = len(resources)
854 index = 0
855 while index < num_lines:
856 try:
857 line1 = resources[index]
858 index += 1
859 # find '==>' in column 0
860 if line1[0] == '==>':
861 line2 = resources[index]
862 index += 1
863 # find READY in column 1
864 if line2[1] == 'READY':
865 # read next lines
866 line3 = resources[index]
867 index += 1
868 while len(line3) > 1 and index < num_lines:
869 ready_value = line3[1]
870 parts = ready_value.split(sep='/')
871 current = int(parts[0])
872 total = int(parts[1])
873 if current < total:
874 self.debug('NOT READY:\n {}'.format(line3))
875 ready = False
876 line3 = resources[index]
877 index += 1
878
879 except Exception as e:
880 pass
881
882 return ready
883
884 @staticmethod
885 def _get_deep(dictionary: dict, members: tuple):
886 target = dictionary
887 value = None
888 try:
889 for m in members:
890 value = target.get(m)
891 if not value:
892 return None
893 else:
894 target = value
895 except Exception as e:
896 pass
897 return value
898
899 # find key:value in several lines
900 @staticmethod
901 def _find_in_lines(p_lines: list, p_key: str) -> str:
902 for line in p_lines:
903 try:
904 if line.startswith(p_key + ':'):
905 parts = line.split(':')
906 the_value = parts[1].strip()
907 return the_value
908 except Exception as e:
909 # ignore it
910 pass
911 return None
912
913 # params for use in -f file
914 # returns values file option and filename (in order to delete it at the end)
915 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
916
917 if params and len(params) > 0:
918 kube_dir, helm_dir, config_filename, cluster_dir = \
919 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
920
921 def get_random_number():
922 r = random.randrange(start=1, stop=99999999)
923 s = str(r)
924 while len(s) < 10:
925 s = '0' + s
926 return s
927
928 params2 = dict()
929 for key in params:
930 value = params.get(key)
931 if '!!yaml' in str(value):
932 value = yaml.load(value[7:])
933 params2[key] = value
934
935 values_file = get_random_number() + '.yaml'
936 with open(values_file, 'w') as stream:
937 yaml.dump(params2, stream, indent=4, default_flow_style=False)
938
939 return '-f {}'.format(values_file), values_file
940
941 return '', None
942
943 # params for use in --set option
944 @staticmethod
945 def _params_to_set_option(params: dict) -> str:
946 params_str = ''
947 if params and len(params) > 0:
948 start = True
949 for key in params:
950 value = params.get(key, None)
951 if value is not None:
952 if start:
953 params_str += '--set '
954 start = False
955 else:
956 params_str += ','
957 params_str += '{}={}'.format(key, value)
958 return params_str
959
960 @staticmethod
961 def _output_to_lines(output: str) -> list:
962 output_lines = list()
963 lines = output.splitlines(keepends=False)
964 for line in lines:
965 line = line.strip()
966 if len(line) > 0:
967 output_lines.append(line)
968 return output_lines
969
970 @staticmethod
971 def _output_to_table(output: str) -> list:
972 output_table = list()
973 lines = output.splitlines(keepends=False)
974 for line in lines:
975 line = line.replace('\t', ' ')
976 line_list = list()
977 output_table.append(line_list)
978 cells = line.split(sep=' ')
979 for cell in cells:
980 cell = cell.strip()
981 if len(cell) > 0:
982 line_list.append(cell)
983 return output_table
984
985 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
986 """
987 Returns kube and helm directories
988
989 :param cluster_name:
990 :param create_if_not_exist:
991 :return: kube, helm directories, config filename and cluster dir.
992 Raises exception if not exist and cannot create
993 """
994
995 base = self.fs.path
996 if base.endswith("/") or base.endswith("\\"):
997 base = base[:-1]
998
999 # base dir for cluster
1000 cluster_dir = base + '/' + cluster_name
1001 if create_if_not_exist and not os.path.exists(cluster_dir):
1002 self.debug('Creating dir {}'.format(cluster_dir))
1003 os.makedirs(cluster_dir)
1004 if not os.path.exists(cluster_dir):
1005 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
1006 self.error(msg)
1007 raise K8sException(msg)
1008
1009 # kube dir
1010 kube_dir = cluster_dir + '/' + '.kube'
1011 if create_if_not_exist and not os.path.exists(kube_dir):
1012 self.debug('Creating dir {}'.format(kube_dir))
1013 os.makedirs(kube_dir)
1014 if not os.path.exists(kube_dir):
1015 msg = 'Kube config dir {} does not exist'.format(kube_dir)
1016 self.error(msg)
1017 raise K8sException(msg)
1018
1019 # helm home dir
1020 helm_dir = cluster_dir + '/' + '.helm'
1021 if create_if_not_exist and not os.path.exists(helm_dir):
1022 self.debug('Creating dir {}'.format(helm_dir))
1023 os.makedirs(helm_dir)
1024 if not os.path.exists(helm_dir):
1025 msg = 'Helm config dir {} does not exist'.format(helm_dir)
1026 self.error(msg)
1027 raise K8sException(msg)
1028
1029 config_filename = kube_dir + '/config'
1030 return kube_dir, helm_dir, config_filename, cluster_dir
1031
1032 @staticmethod
1033 def _remove_multiple_spaces(str):
1034 str = str.strip()
1035 while ' ' in str:
1036 str = str.replace(' ', ' ')
1037 return str
1038
1039 def _local_exec(
1040 self,
1041 command: str
1042 ) -> (str, int):
1043 command = K8sHelmConnector._remove_multiple_spaces(command)
1044 self.debug('Executing sync local command: {}'.format(command))
1045 # raise exception if fails
1046 output = ''
1047 try:
1048 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1049 return_code = 0
1050 self.debug(output)
1051 except Exception as e:
1052 return_code = 1
1053
1054 return output, return_code
1055
1056 async def _local_async_exec(
1057 self,
1058 command: str,
1059 raise_exception_on_error: bool = False,
1060 show_error_log: bool = True,
1061 encode_utf8: bool = False
1062 ) -> (str, int):
1063
1064 command = K8sHelmConnector._remove_multiple_spaces(command)
1065 self.debug('Executing async local command: {}'.format(command))
1066
1067 # split command
1068 command = command.split(sep=' ')
1069
1070 try:
1071 process = await asyncio.create_subprocess_exec(
1072 *command,
1073 stdout=asyncio.subprocess.PIPE,
1074 stderr=asyncio.subprocess.PIPE
1075 )
1076
1077 # wait for command terminate
1078 stdout, stderr = await process.communicate()
1079
1080 return_code = process.returncode
1081
1082 output = ''
1083 if stdout:
1084 output = stdout.decode('utf-8').strip()
1085 # output = stdout.decode()
1086 if stderr:
1087 output = stderr.decode('utf-8').strip()
1088 # output = stderr.decode()
1089
1090 if return_code != 0 and show_error_log:
1091 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1092 else:
1093 self.debug('Return code: {}'.format(return_code))
1094
1095 if raise_exception_on_error and return_code != 0:
1096 raise Exception(output)
1097
1098 if encode_utf8:
1099 output = output.encode('utf-8').strip()
1100 output = str(output).replace('\\n', '\n')
1101
1102 return output, return_code
1103
1104 except Exception as e:
1105 msg = 'Exception executing command: {} -> {}'.format(command, e)
1106 if show_error_log:
1107 self.error(msg)
1108 return '', -1
1109
1110 def _remote_exec(
1111 self,
1112 hostname: str,
1113 username: str,
1114 password: str,
1115 command: str,
1116 timeout: int = 10
1117 ) -> (str, int):
1118
1119 command = K8sHelmConnector._remove_multiple_spaces(command)
1120 self.debug('Executing sync remote ssh command: {}'.format(command))
1121
1122 ssh = paramiko.SSHClient()
1123 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1124 ssh.connect(hostname=hostname, username=username, password=password)
1125 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1126 output = ssh_stdout.read().decode('utf-8')
1127 error = ssh_stderr.read().decode('utf-8')
1128 if error:
1129 self.error('ERROR: {}'.format(error))
1130 return_code = 1
1131 else:
1132 return_code = 0
1133 output = output.replace('\\n', '\n')
1134 self.debug('OUTPUT: {}'.format(output))
1135
1136 return output, return_code
1137
1138 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1139 self.debug('Checking if file {} exists...'.format(filename))
1140 if os.path.exists(filename):
1141 return True
1142 else:
1143 msg = 'File {} does not exist'.format(filename)
1144 if exception_if_not_exists:
1145 self.error(msg)
1146 raise K8sException(msg)