Generate names for K8s pods when file or url
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import paramiko
24 import subprocess
25 import os
26 import shutil
27 import asyncio
28 import time
29 import yaml
30 from uuid import uuid4
31 import random
32 from n2vc.k8s_conn import K8sConnector
33 from n2vc.exceptions import K8sException
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = '/usr/bin/kubectl',
49 helm_command: str = '/usr/bin/helm',
50 log: object = None,
51 on_update_db=None
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(
65 self,
66 db=db,
67 log=log,
68 on_update_db=on_update_db
69 )
70
71 self.info('Initializing K8S Helm connector')
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # initialize helm client-only
88 self.debug('Initializing helm client-only...')
89 command = '{} init --client-only'.format(self._helm_command)
90 try:
91 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
92 # loop = asyncio.get_event_loop()
93 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
94 except Exception as e:
95 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
96
97 self.info('K8S Helm connector initialized')
98
99 async def init_env(
100 self,
101 k8s_creds: str,
102 namespace: str = 'kube-system',
103 reuse_cluster_uuid=None
104 ) -> (str, bool):
105
106 cluster_uuid = reuse_cluster_uuid
107 if not cluster_uuid:
108 cluster_uuid = str(uuid4())
109
110 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
111
112 # create config filename
113 kube_dir, helm_dir, config_filename, cluster_dir = \
114 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
115 f = open(config_filename, "w")
116 f.write(k8s_creds)
117 f.close()
118
119 # check if tiller pod is up in cluster
120 command = '{} --kubeconfig={} --namespace={} get deployments'\
121 .format(self.kubectl_command, config_filename, namespace)
122 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
123
124 output_table = K8sHelmConnector._output_to_table(output=output)
125
126 # find 'tiller' pod in all pods
127 already_initialized = False
128 try:
129 for row in output_table:
130 if row[0].startswith('tiller-deploy'):
131 already_initialized = True
132 break
133 except Exception as e:
134 pass
135
136 # helm init
137 n2vc_installed_sw = False
138 if not already_initialized:
139 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
140 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
141 .format(self._helm_command, config_filename, namespace, helm_dir)
142 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
143 n2vc_installed_sw = True
144 else:
145 # check client helm installation
146 check_file = helm_dir + '/repository/repositories.yaml'
147 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
148 self.info('Initializing helm in client: {}'.format(cluster_uuid))
149 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
150 .format(self._helm_command, config_filename, namespace, helm_dir)
151 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
152 else:
153 self.info('Helm client already initialized')
154
155 self.info('Cluster initialized {}'.format(cluster_uuid))
156
157 return cluster_uuid, n2vc_installed_sw
158
159 async def repo_add(
160 self,
161 cluster_uuid: str,
162 name: str,
163 url: str,
164 repo_type: str = 'chart'
165 ):
166
167 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
168
169 # config filename
170 kube_dir, helm_dir, config_filename, cluster_dir = \
171 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
172
173 # helm repo update
174 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
175 self.debug('updating repo: {}'.format(command))
176 await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 # helm repo add name url
179 command = '{} --kubeconfig={} --home={} repo add {} {}'\
180 .format(self._helm_command, config_filename, helm_dir, name, url)
181 self.debug('adding repo: {}'.format(command))
182 await self._local_async_exec(command=command, raise_exception_on_error=True)
183
184 async def repo_list(
185 self,
186 cluster_uuid: str
187 ) -> list:
188 """
189 Get the list of registered repositories
190
191 :return: list of registered repositories: [ (name, url) .... ]
192 """
193
194 self.debug('list repositories for cluster {}'.format(cluster_uuid))
195
196 # config filename
197 kube_dir, helm_dir, config_filename, cluster_dir = \
198 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
199
200 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
201 .format(self._helm_command, config_filename, helm_dir)
202
203 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
204 if output and len(output) > 0:
205 return yaml.load(output, Loader=yaml.SafeLoader)
206 else:
207 return []
208
209 async def repo_remove(
210 self,
211 cluster_uuid: str,
212 name: str
213 ):
214 """
215 Remove a repository from OSM
216
217 :param cluster_uuid: the cluster
218 :param name: repo name in OSM
219 :return: True if successful
220 """
221
222 self.debug('list repositories for cluster {}'.format(cluster_uuid))
223
224 # config filename
225 kube_dir, helm_dir, config_filename, cluster_dir = \
226 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
227
228 command = '{} --kubeconfig={} --home={} repo remove {}'\
229 .format(self._helm_command, config_filename, helm_dir, name)
230
231 await self._local_async_exec(command=command, raise_exception_on_error=True)
232
233 async def reset(
234 self,
235 cluster_uuid: str,
236 force: bool = False,
237 uninstall_sw: bool = False
238 ) -> bool:
239
240 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
241
242 # get kube and helm directories
243 kube_dir, helm_dir, config_filename, cluster_dir = \
244 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
245
246 # uninstall releases if needed
247 releases = await self.instances_list(cluster_uuid=cluster_uuid)
248 if len(releases) > 0:
249 if force:
250 for r in releases:
251 try:
252 kdu_instance = r.get('Name')
253 chart = r.get('Chart')
254 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
255 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
256 except Exception as e:
257 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
258 else:
259 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
260 .format(cluster_uuid)
261 self.error(msg)
262 raise K8sException(msg)
263
264 if uninstall_sw:
265
266 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
267
268 # find namespace for tiller pod
269 command = '{} --kubeconfig={} get deployments --all-namespaces'\
270 .format(self.kubectl_command, config_filename)
271 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
272 output_table = K8sHelmConnector._output_to_table(output=output)
273 namespace = None
274 for r in output_table:
275 try:
276 if 'tiller-deploy' in r[1]:
277 namespace = r[0]
278 break
279 except Exception as e:
280 pass
281 else:
282 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
283 self.error(msg)
284
285 self.debug('namespace for tiller: {}'.format(namespace))
286
287 force_str = '--force'
288
289 if namespace:
290 # delete tiller deployment
291 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
292 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
293 .format(self.kubectl_command, namespace, config_filename, force_str)
294 await self._local_async_exec(command=command, raise_exception_on_error=False)
295
296 # uninstall tiller from cluster
297 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
298 command = '{} --kubeconfig={} --home={} reset'\
299 .format(self._helm_command, config_filename, helm_dir)
300 self.debug('resetting: {}'.format(command))
301 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
302 else:
303 self.debug('namespace not found')
304
305 # delete cluster directory
306 dir = self.fs.path + '/' + cluster_uuid
307 self.debug('Removing directory {}'.format(dir))
308 shutil.rmtree(dir, ignore_errors=True)
309
310 return True
311
312 async def install(
313 self,
314 cluster_uuid: str,
315 kdu_model: str,
316 atomic: bool = True,
317 timeout: float = 300,
318 params: dict = None,
319 db_dict: dict = None
320 ):
321
322 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
323
324 # config filename
325 kube_dir, helm_dir, config_filename, cluster_dir = \
326 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
327
328 # params to str
329 # params_str = K8sHelmConnector._params_to_set_option(params)
330 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
331
332 timeout_str = ''
333 if timeout:
334 timeout_str = '--timeout {}'.format(timeout)
335
336 # atomic
337 atomic_str = ''
338 if atomic:
339 atomic_str = '--atomic'
340
341 # version
342 version_str = ''
343 if ':' in kdu_model:
344 parts = kdu_model.split(sep=':')
345 if len(parts) == 2:
346 version_str = '--version {}'.format(parts[1])
347 kdu_model = parts[0]
348
349 # generate a name for the release. Then, check if already exists
350 kdu_instance = None
351 while kdu_instance is None:
352 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
353 try:
354 result = await self._status_kdu(
355 cluster_uuid=cluster_uuid,
356 kdu_instance=kdu_instance,
357 show_error_log=False
358 )
359 if result is not None:
360 # instance already exists: generate a new one
361 kdu_instance = None
362 except Exception as e:
363 kdu_instance = None
364
365 # helm repo install
366 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
367 .format(self._helm_command, atomic_str, config_filename, helm_dir,
368 params_str, timeout_str, kdu_instance, kdu_model, version_str)
369 self.debug('installing: {}'.format(command))
370
371 if atomic:
372 # exec helm in a task
373 exec_task = asyncio.ensure_future(
374 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
375 )
376 # write status in another task
377 status_task = asyncio.ensure_future(
378 coro_or_future=self._store_status(
379 cluster_uuid=cluster_uuid,
380 kdu_instance=kdu_instance,
381 db_dict=db_dict,
382 operation='install',
383 run_once=False
384 )
385 )
386
387 # wait for execution task
388 await asyncio.wait([exec_task])
389
390 # cancel status task
391 status_task.cancel()
392
393 output, rc = exec_task.result()
394
395 else:
396
397 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
398
399 # remove temporal values yaml file
400 if file_to_delete:
401 os.remove(file_to_delete)
402
403 # write final status
404 await self._store_status(
405 cluster_uuid=cluster_uuid,
406 kdu_instance=kdu_instance,
407 db_dict=db_dict,
408 operation='install',
409 run_once=True,
410 check_every=0
411 )
412
413 if rc != 0:
414 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
415 self.error(msg)
416 raise K8sException(msg)
417
418 self.debug('Returning kdu_instance {}'.format(kdu_instance))
419 return kdu_instance
420
421 async def instances_list(
422 self,
423 cluster_uuid: str
424 ) -> list:
425 """
426 returns a list of deployed releases in a cluster
427
428 :param cluster_uuid: the cluster
429 :return:
430 """
431
432 self.debug('list releases for cluster {}'.format(cluster_uuid))
433
434 # config filename
435 kube_dir, helm_dir, config_filename, cluster_dir = \
436 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
437
438 command = '{} --kubeconfig={} --home={} list --output yaml'\
439 .format(self._helm_command, config_filename, helm_dir)
440
441 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
442
443 if output and len(output) > 0:
444 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
445 else:
446 return []
447
448 async def upgrade(
449 self,
450 cluster_uuid: str,
451 kdu_instance: str,
452 kdu_model: str = None,
453 atomic: bool = True,
454 timeout: float = 300,
455 params: dict = None,
456 db_dict: dict = None
457 ):
458
459 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
460
461 # config filename
462 kube_dir, helm_dir, config_filename, cluster_dir = \
463 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
464
465 # params to str
466 # params_str = K8sHelmConnector._params_to_set_option(params)
467 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
468
469 timeout_str = ''
470 if timeout:
471 timeout_str = '--timeout {}'.format(timeout)
472
473 # atomic
474 atomic_str = ''
475 if atomic:
476 atomic_str = '--atomic'
477
478 # version
479 version_str = ''
480 if kdu_model and ':' in kdu_model:
481 parts = kdu_model.split(sep=':')
482 if len(parts) == 2:
483 version_str = '--version {}'.format(parts[1])
484 kdu_model = parts[0]
485
486 # helm repo upgrade
487 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
488 .format(self._helm_command, atomic_str, config_filename, helm_dir,
489 params_str, timeout_str, kdu_instance, kdu_model, version_str)
490 self.debug('upgrading: {}'.format(command))
491
492 if atomic:
493
494 # exec helm in a task
495 exec_task = asyncio.ensure_future(
496 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
497 )
498 # write status in another task
499 status_task = asyncio.ensure_future(
500 coro_or_future=self._store_status(
501 cluster_uuid=cluster_uuid,
502 kdu_instance=kdu_instance,
503 db_dict=db_dict,
504 operation='upgrade',
505 run_once=False
506 )
507 )
508
509 # wait for execution task
510 await asyncio.wait([exec_task])
511
512 # cancel status task
513 status_task.cancel()
514 output, rc = exec_task.result()
515
516 else:
517
518 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
519
520 # remove temporal values yaml file
521 if file_to_delete:
522 os.remove(file_to_delete)
523
524 # write final status
525 await self._store_status(
526 cluster_uuid=cluster_uuid,
527 kdu_instance=kdu_instance,
528 db_dict=db_dict,
529 operation='upgrade',
530 run_once=True,
531 check_every=0
532 )
533
534 if rc != 0:
535 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
536 self.error(msg)
537 raise K8sException(msg)
538
539 # return new revision number
540 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
541 if instance:
542 revision = int(instance.get('Revision'))
543 self.debug('New revision: {}'.format(revision))
544 return revision
545 else:
546 return 0
547
548 async def rollback(
549 self,
550 cluster_uuid: str,
551 kdu_instance: str,
552 revision=0,
553 db_dict: dict = None
554 ):
555
556 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
557 .format(kdu_instance, revision, cluster_uuid))
558
559 # config filename
560 kube_dir, helm_dir, config_filename, cluster_dir = \
561 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
562
563 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
564 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
565
566 # exec helm in a task
567 exec_task = asyncio.ensure_future(
568 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
569 )
570 # write status in another task
571 status_task = asyncio.ensure_future(
572 coro_or_future=self._store_status(
573 cluster_uuid=cluster_uuid,
574 kdu_instance=kdu_instance,
575 db_dict=db_dict,
576 operation='rollback',
577 run_once=False
578 )
579 )
580
581 # wait for execution task
582 await asyncio.wait([exec_task])
583
584 # cancel status task
585 status_task.cancel()
586
587 output, rc = exec_task.result()
588
589 # write final status
590 await self._store_status(
591 cluster_uuid=cluster_uuid,
592 kdu_instance=kdu_instance,
593 db_dict=db_dict,
594 operation='rollback',
595 run_once=True,
596 check_every=0
597 )
598
599 if rc != 0:
600 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
601 self.error(msg)
602 raise K8sException(msg)
603
604 # return new revision number
605 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
606 if instance:
607 revision = int(instance.get('Revision'))
608 self.debug('New revision: {}'.format(revision))
609 return revision
610 else:
611 return 0
612
613 async def uninstall(
614 self,
615 cluster_uuid: str,
616 kdu_instance: str
617 ):
618 """
619 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
620 after all _terminate-config-primitive_ of the VNF are invoked).
621
622 :param cluster_uuid: UUID of a K8s cluster known by OSM
623 :param kdu_instance: unique name for the KDU instance to be deleted
624 :return: True if successful
625 """
626
627 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
628
629 # config filename
630 kube_dir, helm_dir, config_filename, cluster_dir = \
631 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
632
633 command = '{} --kubeconfig={} --home={} delete --purge {}'\
634 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
635
636 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
637
638 return self._output_to_table(output)
639
640 async def inspect_kdu(
641 self,
642 kdu_model: str,
643 repo_url: str = None
644 ) -> str:
645
646 self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
647
648 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
649
650 async def values_kdu(
651 self,
652 kdu_model: str,
653 repo_url: str = None
654 ) -> str:
655
656 self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
657
658 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
659
660 async def help_kdu(
661 self,
662 kdu_model: str,
663 repo_url: str = None
664 ) -> str:
665
666 self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
667
668 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
669
670 async def status_kdu(
671 self,
672 cluster_uuid: str,
673 kdu_instance: str
674 ) -> str:
675
676 # call internal function
677 return await self._status_kdu(
678 cluster_uuid=cluster_uuid,
679 kdu_instance=kdu_instance,
680 show_error_log=True,
681 return_text=True
682 )
683
684 """
685 ##################################################################################################
686 ########################################## P R I V A T E #########################################
687 ##################################################################################################
688 """
689
690 async def _exec_inspect_comand(
691 self,
692 inspect_command: str,
693 kdu_model: str,
694 repo_url: str = None
695 ):
696
697 repo_str = ''
698 if repo_url:
699 repo_str = ' --repo {}'.format(repo_url)
700 idx = kdu_model.find('/')
701 if idx >= 0:
702 idx += 1
703 kdu_model = kdu_model[idx:]
704
705 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
706 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
707
708 return output
709
710 async def _status_kdu(
711 self,
712 cluster_uuid: str,
713 kdu_instance: str,
714 show_error_log: bool = False,
715 return_text: bool = False
716 ):
717
718 self.debug('status of kdu_instance {}'.format(kdu_instance))
719
720 # config filename
721 kube_dir, helm_dir, config_filename, cluster_dir = \
722 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
723
724 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
725 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
726
727 output, rc = await self._local_async_exec(
728 command=command,
729 raise_exception_on_error=True,
730 show_error_log=show_error_log
731 )
732
733 if return_text:
734 return str(output)
735
736 if rc != 0:
737 return None
738
739 data = yaml.load(output, Loader=yaml.SafeLoader)
740
741 # remove field 'notes'
742 try:
743 del data.get('info').get('status')['notes']
744 except KeyError:
745 pass
746
747 # parse field 'resources'
748 try:
749 resources = str(data.get('info').get('status').get('resources'))
750 resource_table = self._output_to_table(resources)
751 data.get('info').get('status')['resources'] = resource_table
752 except Exception as e:
753 pass
754
755 return data
756
757 async def get_instance_info(
758 self,
759 cluster_uuid: str,
760 kdu_instance: str
761 ):
762 instances = await self.instances_list(cluster_uuid=cluster_uuid)
763 for instance in instances:
764 if instance.get('Name') == kdu_instance:
765 return instance
766 self.debug('Instance {} not found'.format(kdu_instance))
767 return None
768
769 @staticmethod
770 def _generate_release_name(
771 chart_name: str
772 ):
773 # check embeded chart (file or dir)
774 if chart_name.startswith('/'):
775 # extract file or directory name
776 chart_name = chart_name[chart_name.rfind('/')+1:]
777 # check URL
778 elif '://' in chart_name:
779 # extract last portion of URL
780 chart_name = chart_name[chart_name.rfind('/')+1:]
781
782 name = ''
783 for c in chart_name:
784 if c.isalpha() or c.isnumeric():
785 name += c
786 else:
787 name += '-'
788 if len(name) > 35:
789 name = name[0:35]
790
791 # if does not start with alpha character, prefix 'a'
792 if not name[0].isalpha():
793 name = 'a' + name
794
795 name += '-'
796
797 def get_random_number():
798 r = random.randrange(start=1, stop=99999999)
799 s = str(r)
800 s = s.rjust(10, '0')
801 return s
802
803 name = name + get_random_number()
804 return name.lower()
805
806 async def _store_status(
807 self,
808 cluster_uuid: str,
809 operation: str,
810 kdu_instance: str,
811 check_every: float = 10,
812 db_dict: dict = None,
813 run_once: bool = False
814 ):
815 while True:
816 try:
817 await asyncio.sleep(check_every)
818 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
819 status = detailed_status.get('info').get('Description')
820 print('=' * 60)
821 self.debug('STATUS:\n{}'.format(status))
822 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
823 print('=' * 60)
824 # write status to db
825 result = await self.write_app_status_to_db(
826 db_dict=db_dict,
827 status=str(status),
828 detailed_status=str(detailed_status),
829 operation=operation)
830 if not result:
831 self.info('Error writing in database. Task exiting...')
832 return
833 except asyncio.CancelledError:
834 self.debug('Task cancelled')
835 return
836 except Exception as e:
837 pass
838 finally:
839 if run_once:
840 return
841
842 async def _is_install_completed(
843 self,
844 cluster_uuid: str,
845 kdu_instance: str
846 ) -> bool:
847
848 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
849
850 # extract info.status.resources-> str
851 # format:
852 # ==> v1/Deployment
853 # NAME READY UP-TO-DATE AVAILABLE AGE
854 # halting-horse-mongodb 0/1 1 0 0s
855 # halting-petit-mongodb 1/1 1 0 0s
856 # blank line
857 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
858
859 # convert to table
860 resources = K8sHelmConnector._output_to_table(resources)
861
862 num_lines = len(resources)
863 index = 0
864 while index < num_lines:
865 try:
866 line1 = resources[index]
867 index += 1
868 # find '==>' in column 0
869 if line1[0] == '==>':
870 line2 = resources[index]
871 index += 1
872 # find READY in column 1
873 if line2[1] == 'READY':
874 # read next lines
875 line3 = resources[index]
876 index += 1
877 while len(line3) > 1 and index < num_lines:
878 ready_value = line3[1]
879 parts = ready_value.split(sep='/')
880 current = int(parts[0])
881 total = int(parts[1])
882 if current < total:
883 self.debug('NOT READY:\n {}'.format(line3))
884 ready = False
885 line3 = resources[index]
886 index += 1
887
888 except Exception as e:
889 pass
890
891 return ready
892
893 @staticmethod
894 def _get_deep(dictionary: dict, members: tuple):
895 target = dictionary
896 value = None
897 try:
898 for m in members:
899 value = target.get(m)
900 if not value:
901 return None
902 else:
903 target = value
904 except Exception as e:
905 pass
906 return value
907
908 # find key:value in several lines
909 @staticmethod
910 def _find_in_lines(p_lines: list, p_key: str) -> str:
911 for line in p_lines:
912 try:
913 if line.startswith(p_key + ':'):
914 parts = line.split(':')
915 the_value = parts[1].strip()
916 return the_value
917 except Exception as e:
918 # ignore it
919 pass
920 return None
921
922 # params for use in -f file
923 # returns values file option and filename (in order to delete it at the end)
924 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
925
926 if params and len(params) > 0:
927 kube_dir, helm_dir, config_filename, cluster_dir = \
928 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
929
930 def get_random_number():
931 r = random.randrange(start=1, stop=99999999)
932 s = str(r)
933 while len(s) < 10:
934 s = '0' + s
935 return s
936
937 params2 = dict()
938 for key in params:
939 value = params.get(key)
940 if '!!yaml' in str(value):
941 value = yaml.load(value[7:])
942 params2[key] = value
943
944 values_file = get_random_number() + '.yaml'
945 with open(values_file, 'w') as stream:
946 yaml.dump(params2, stream, indent=4, default_flow_style=False)
947
948 return '-f {}'.format(values_file), values_file
949
950 return '', None
951
952 # params for use in --set option
953 @staticmethod
954 def _params_to_set_option(params: dict) -> str:
955 params_str = ''
956 if params and len(params) > 0:
957 start = True
958 for key in params:
959 value = params.get(key, None)
960 if value is not None:
961 if start:
962 params_str += '--set '
963 start = False
964 else:
965 params_str += ','
966 params_str += '{}={}'.format(key, value)
967 return params_str
968
969 @staticmethod
970 def _output_to_lines(output: str) -> list:
971 output_lines = list()
972 lines = output.splitlines(keepends=False)
973 for line in lines:
974 line = line.strip()
975 if len(line) > 0:
976 output_lines.append(line)
977 return output_lines
978
979 @staticmethod
980 def _output_to_table(output: str) -> list:
981 output_table = list()
982 lines = output.splitlines(keepends=False)
983 for line in lines:
984 line = line.replace('\t', ' ')
985 line_list = list()
986 output_table.append(line_list)
987 cells = line.split(sep=' ')
988 for cell in cells:
989 cell = cell.strip()
990 if len(cell) > 0:
991 line_list.append(cell)
992 return output_table
993
994 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
995 """
996 Returns kube and helm directories
997
998 :param cluster_name:
999 :param create_if_not_exist:
1000 :return: kube, helm directories, config filename and cluster dir.
1001 Raises exception if not exist and cannot create
1002 """
1003
1004 base = self.fs.path
1005 if base.endswith("/") or base.endswith("\\"):
1006 base = base[:-1]
1007
1008 # base dir for cluster
1009 cluster_dir = base + '/' + cluster_name
1010 if create_if_not_exist and not os.path.exists(cluster_dir):
1011 self.debug('Creating dir {}'.format(cluster_dir))
1012 os.makedirs(cluster_dir)
1013 if not os.path.exists(cluster_dir):
1014 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
1015 self.error(msg)
1016 raise K8sException(msg)
1017
1018 # kube dir
1019 kube_dir = cluster_dir + '/' + '.kube'
1020 if create_if_not_exist and not os.path.exists(kube_dir):
1021 self.debug('Creating dir {}'.format(kube_dir))
1022 os.makedirs(kube_dir)
1023 if not os.path.exists(kube_dir):
1024 msg = 'Kube config dir {} does not exist'.format(kube_dir)
1025 self.error(msg)
1026 raise K8sException(msg)
1027
1028 # helm home dir
1029 helm_dir = cluster_dir + '/' + '.helm'
1030 if create_if_not_exist and not os.path.exists(helm_dir):
1031 self.debug('Creating dir {}'.format(helm_dir))
1032 os.makedirs(helm_dir)
1033 if not os.path.exists(helm_dir):
1034 msg = 'Helm config dir {} does not exist'.format(helm_dir)
1035 self.error(msg)
1036 raise K8sException(msg)
1037
1038 config_filename = kube_dir + '/config'
1039 return kube_dir, helm_dir, config_filename, cluster_dir
1040
1041 @staticmethod
1042 def _remove_multiple_spaces(str):
1043 str = str.strip()
1044 while ' ' in str:
1045 str = str.replace(' ', ' ')
1046 return str
1047
1048 def _local_exec(
1049 self,
1050 command: str
1051 ) -> (str, int):
1052 command = K8sHelmConnector._remove_multiple_spaces(command)
1053 self.debug('Executing sync local command: {}'.format(command))
1054 # raise exception if fails
1055 output = ''
1056 try:
1057 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1058 return_code = 0
1059 self.debug(output)
1060 except Exception as e:
1061 return_code = 1
1062
1063 return output, return_code
1064
1065 async def _local_async_exec(
1066 self,
1067 command: str,
1068 raise_exception_on_error: bool = False,
1069 show_error_log: bool = True,
1070 encode_utf8: bool = False
1071 ) -> (str, int):
1072
1073 command = K8sHelmConnector._remove_multiple_spaces(command)
1074 self.debug('Executing async local command: {}'.format(command))
1075
1076 # split command
1077 command = command.split(sep=' ')
1078
1079 try:
1080 process = await asyncio.create_subprocess_exec(
1081 *command,
1082 stdout=asyncio.subprocess.PIPE,
1083 stderr=asyncio.subprocess.PIPE
1084 )
1085
1086 # wait for command terminate
1087 stdout, stderr = await process.communicate()
1088
1089 return_code = process.returncode
1090
1091 output = ''
1092 if stdout:
1093 output = stdout.decode('utf-8').strip()
1094 # output = stdout.decode()
1095 if stderr:
1096 output = stderr.decode('utf-8').strip()
1097 # output = stderr.decode()
1098
1099 if return_code != 0 and show_error_log:
1100 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1101 else:
1102 self.debug('Return code: {}'.format(return_code))
1103
1104 if raise_exception_on_error and return_code != 0:
1105 raise Exception(output)
1106
1107 if encode_utf8:
1108 output = output.encode('utf-8').strip()
1109 output = str(output).replace('\\n', '\n')
1110
1111 return output, return_code
1112
1113 except Exception as e:
1114 msg = 'Exception executing command: {} -> {}'.format(command, e)
1115 if show_error_log:
1116 self.error(msg)
1117 return '', -1
1118
1119 def _remote_exec(
1120 self,
1121 hostname: str,
1122 username: str,
1123 password: str,
1124 command: str,
1125 timeout: int = 10
1126 ) -> (str, int):
1127
1128 command = K8sHelmConnector._remove_multiple_spaces(command)
1129 self.debug('Executing sync remote ssh command: {}'.format(command))
1130
1131 ssh = paramiko.SSHClient()
1132 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1133 ssh.connect(hostname=hostname, username=username, password=password)
1134 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1135 output = ssh_stdout.read().decode('utf-8')
1136 error = ssh_stderr.read().decode('utf-8')
1137 if error:
1138 self.error('ERROR: {}'.format(error))
1139 return_code = 1
1140 else:
1141 return_code = 0
1142 output = output.replace('\\n', '\n')
1143 self.debug('OUTPUT: {}'.format(output))
1144
1145 return output, return_code
1146
1147 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1148 self.debug('Checking if file {} exists...'.format(filename))
1149 if os.path.exists(filename):
1150 return True
1151 else:
1152 msg = 'File {} does not exist'.format(filename)
1153 if exception_if_not_exists:
1154 self.error(msg)
1155 raise K8sException(msg)