01b448b7e97350823f4c7cae81bb0a4874495bea
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import paramiko
24 import subprocess
25 import os
26 import shutil
27 import asyncio
28 import time
29 import yaml
30 from uuid import uuid4
31 import random
32 from n2vc.k8s_conn import K8sConnector
33 from n2vc.exceptions import K8sException
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = '/usr/bin/kubectl',
49 helm_command: str = '/usr/bin/helm',
50 log: object = None,
51 on_update_db=None
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(
65 self,
66 db=db,
67 log=log,
68 on_update_db=on_update_db
69 )
70
71 self.info('Initializing K8S Helm connector')
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # initialize helm client-only
88 self.debug('Initializing helm client-only...')
89 command = '{} init --client-only'.format(self._helm_command)
90 try:
91 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
92 # loop = asyncio.get_event_loop()
93 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
94 except Exception as e:
95 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
96
97 self.info('K8S Helm connector initialized')
98
99 async def init_env(
100 self,
101 k8s_creds: str,
102 namespace: str = 'kube-system',
103 reuse_cluster_uuid=None
104 ) -> (str, bool):
105 """
106 It prepares a given K8s cluster environment to run Charts on both sides:
107 client (OSM)
108 server (Tiller)
109
110 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
111 :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
114 (on error, an exception will be raised)
115 """
116
117 cluster_uuid = reuse_cluster_uuid
118 if not cluster_uuid:
119 cluster_uuid = str(uuid4())
120
121 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
122
123 # create config filename
124 kube_dir, helm_dir, config_filename, cluster_dir = \
125 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
126 f = open(config_filename, "w")
127 f.write(k8s_creds)
128 f.close()
129
130 # check if tiller pod is up in cluster
131 command = '{} --kubeconfig={} --namespace={} get deployments'\
132 .format(self.kubectl_command, config_filename, namespace)
133 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
134
135 output_table = K8sHelmConnector._output_to_table(output=output)
136
137 # find 'tiller' pod in all pods
138 already_initialized = False
139 try:
140 for row in output_table:
141 if row[0].startswith('tiller-deploy'):
142 already_initialized = True
143 break
144 except Exception as e:
145 pass
146
147 # helm init
148 n2vc_installed_sw = False
149 if not already_initialized:
150 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
151 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
152 .format(self._helm_command, config_filename, namespace, helm_dir)
153 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
154 n2vc_installed_sw = True
155 else:
156 # check client helm installation
157 check_file = helm_dir + '/repository/repositories.yaml'
158 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
159 self.info('Initializing helm in client: {}'.format(cluster_uuid))
160 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
161 .format(self._helm_command, config_filename, namespace, helm_dir)
162 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
163 else:
164 self.info('Helm client already initialized')
165
166 self.info('Cluster initialized {}'.format(cluster_uuid))
167
168 return cluster_uuid, n2vc_installed_sw
169
170 async def repo_add(
171 self,
172 cluster_uuid: str,
173 name: str,
174 url: str,
175 repo_type: str = 'chart'
176 ):
177
178 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
179
180 # config filename
181 kube_dir, helm_dir, config_filename, cluster_dir = \
182 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
183
184 # helm repo update
185 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
186 self.debug('updating repo: {}'.format(command))
187 await self._local_async_exec(command=command, raise_exception_on_error=False)
188
189 # helm repo add name url
190 command = '{} --kubeconfig={} --home={} repo add {} {}'\
191 .format(self._helm_command, config_filename, helm_dir, name, url)
192 self.debug('adding repo: {}'.format(command))
193 await self._local_async_exec(command=command, raise_exception_on_error=True)
194
195 async def repo_list(
196 self,
197 cluster_uuid: str
198 ) -> list:
199 """
200 Get the list of registered repositories
201
202 :return: list of registered repositories: [ (name, url) .... ]
203 """
204
205 self.debug('list repositories for cluster {}'.format(cluster_uuid))
206
207 # config filename
208 kube_dir, helm_dir, config_filename, cluster_dir = \
209 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
210
211 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
212 .format(self._helm_command, config_filename, helm_dir)
213
214 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
215 if output and len(output) > 0:
216 return yaml.load(output, Loader=yaml.SafeLoader)
217 else:
218 return []
219
220 async def repo_remove(
221 self,
222 cluster_uuid: str,
223 name: str
224 ):
225 """
226 Remove a repository from OSM
227
228 :param cluster_uuid: the cluster
229 :param name: repo name in OSM
230 :return: True if successful
231 """
232
233 self.debug('list repositories for cluster {}'.format(cluster_uuid))
234
235 # config filename
236 kube_dir, helm_dir, config_filename, cluster_dir = \
237 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
238
239 command = '{} --kubeconfig={} --home={} repo remove {}'\
240 .format(self._helm_command, config_filename, helm_dir, name)
241
242 await self._local_async_exec(command=command, raise_exception_on_error=True)
243
244 async def reset(
245 self,
246 cluster_uuid: str,
247 force: bool = False,
248 uninstall_sw: bool = False
249 ) -> bool:
250
251 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
252
253 # get kube and helm directories
254 kube_dir, helm_dir, config_filename, cluster_dir = \
255 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
256
257 # uninstall releases if needed
258 releases = await self.instances_list(cluster_uuid=cluster_uuid)
259 if len(releases) > 0:
260 if force:
261 for r in releases:
262 try:
263 kdu_instance = r.get('Name')
264 chart = r.get('Chart')
265 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
266 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
267 except Exception as e:
268 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
269 else:
270 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
271 .format(cluster_uuid)
272 self.error(msg)
273 raise K8sException(msg)
274
275 if uninstall_sw:
276
277 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
278
279 # find namespace for tiller pod
280 command = '{} --kubeconfig={} get deployments --all-namespaces'\
281 .format(self.kubectl_command, config_filename)
282 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
283 output_table = K8sHelmConnector._output_to_table(output=output)
284 namespace = None
285 for r in output_table:
286 try:
287 if 'tiller-deploy' in r[1]:
288 namespace = r[0]
289 break
290 except Exception as e:
291 pass
292 else:
293 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
294 self.error(msg)
295
296 self.debug('namespace for tiller: {}'.format(namespace))
297
298 force_str = '--force'
299
300 if namespace:
301 # delete tiller deployment
302 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
303 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
304 .format(self.kubectl_command, namespace, config_filename, force_str)
305 await self._local_async_exec(command=command, raise_exception_on_error=False)
306
307 # uninstall tiller from cluster
308 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
309 command = '{} --kubeconfig={} --home={} reset'\
310 .format(self._helm_command, config_filename, helm_dir)
311 self.debug('resetting: {}'.format(command))
312 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
313 else:
314 self.debug('namespace not found')
315
316 # delete cluster directory
317 dir = self.fs.path + '/' + cluster_uuid
318 self.debug('Removing directory {}'.format(dir))
319 shutil.rmtree(dir, ignore_errors=True)
320
321 return True
322
323 async def install(
324 self,
325 cluster_uuid: str,
326 kdu_model: str,
327 atomic: bool = True,
328 timeout: float = 300,
329 params: dict = None,
330 db_dict: dict = None
331 ):
332
333 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
334
335 # config filename
336 kube_dir, helm_dir, config_filename, cluster_dir = \
337 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
338
339 # params to str
340 # params_str = K8sHelmConnector._params_to_set_option(params)
341 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
342
343 timeout_str = ''
344 if timeout:
345 timeout_str = '--timeout {}'.format(timeout)
346
347 # atomic
348 atomic_str = ''
349 if atomic:
350 atomic_str = '--atomic'
351
352 # version
353 version_str = ''
354 if ':' in kdu_model:
355 parts = kdu_model.split(sep=':')
356 if len(parts) == 2:
357 version_str = '--version {}'.format(parts[1])
358 kdu_model = parts[0]
359
360 # generate a name for the release. Then, check if already exists
361 kdu_instance = None
362 while kdu_instance is None:
363 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
364 try:
365 result = await self._status_kdu(
366 cluster_uuid=cluster_uuid,
367 kdu_instance=kdu_instance,
368 show_error_log=False
369 )
370 if result is not None:
371 # instance already exists: generate a new one
372 kdu_instance = None
373 except Exception as e:
374 kdu_instance = None
375
376 # helm repo install
377 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
378 .format(self._helm_command, atomic_str, config_filename, helm_dir,
379 params_str, timeout_str, kdu_instance, kdu_model, version_str)
380 self.debug('installing: {}'.format(command))
381
382 if atomic:
383 # exec helm in a task
384 exec_task = asyncio.ensure_future(
385 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
386 )
387 # write status in another task
388 status_task = asyncio.ensure_future(
389 coro_or_future=self._store_status(
390 cluster_uuid=cluster_uuid,
391 kdu_instance=kdu_instance,
392 db_dict=db_dict,
393 operation='install',
394 run_once=False
395 )
396 )
397
398 # wait for execution task
399 await asyncio.wait([exec_task])
400
401 # cancel status task
402 status_task.cancel()
403
404 output, rc = exec_task.result()
405
406 else:
407
408 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
409
410 # remove temporal values yaml file
411 if file_to_delete:
412 os.remove(file_to_delete)
413
414 # write final status
415 await self._store_status(
416 cluster_uuid=cluster_uuid,
417 kdu_instance=kdu_instance,
418 db_dict=db_dict,
419 operation='install',
420 run_once=True,
421 check_every=0
422 )
423
424 if rc != 0:
425 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
426 self.error(msg)
427 raise K8sException(msg)
428
429 self.debug('Returning kdu_instance {}'.format(kdu_instance))
430 return kdu_instance
431
432 async def instances_list(
433 self,
434 cluster_uuid: str
435 ) -> list:
436 """
437 returns a list of deployed releases in a cluster
438
439 :param cluster_uuid: the cluster
440 :return:
441 """
442
443 self.debug('list releases for cluster {}'.format(cluster_uuid))
444
445 # config filename
446 kube_dir, helm_dir, config_filename, cluster_dir = \
447 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
448
449 command = '{} --kubeconfig={} --home={} list --output yaml'\
450 .format(self._helm_command, config_filename, helm_dir)
451
452 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
453
454 if output and len(output) > 0:
455 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
456 else:
457 return []
458
459 async def upgrade(
460 self,
461 cluster_uuid: str,
462 kdu_instance: str,
463 kdu_model: str = None,
464 atomic: bool = True,
465 timeout: float = 300,
466 params: dict = None,
467 db_dict: dict = None
468 ):
469
470 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
471
472 # config filename
473 kube_dir, helm_dir, config_filename, cluster_dir = \
474 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
475
476 # params to str
477 # params_str = K8sHelmConnector._params_to_set_option(params)
478 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
479
480 timeout_str = ''
481 if timeout:
482 timeout_str = '--timeout {}'.format(timeout)
483
484 # atomic
485 atomic_str = ''
486 if atomic:
487 atomic_str = '--atomic'
488
489 # version
490 version_str = ''
491 if kdu_model and ':' in kdu_model:
492 parts = kdu_model.split(sep=':')
493 if len(parts) == 2:
494 version_str = '--version {}'.format(parts[1])
495 kdu_model = parts[0]
496
497 # helm repo upgrade
498 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
499 .format(self._helm_command, atomic_str, config_filename, helm_dir,
500 params_str, timeout_str, kdu_instance, kdu_model, version_str)
501 self.debug('upgrading: {}'.format(command))
502
503 if atomic:
504
505 # exec helm in a task
506 exec_task = asyncio.ensure_future(
507 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
508 )
509 # write status in another task
510 status_task = asyncio.ensure_future(
511 coro_or_future=self._store_status(
512 cluster_uuid=cluster_uuid,
513 kdu_instance=kdu_instance,
514 db_dict=db_dict,
515 operation='upgrade',
516 run_once=False
517 )
518 )
519
520 # wait for execution task
521 await asyncio.wait([exec_task])
522
523 # cancel status task
524 status_task.cancel()
525 output, rc = exec_task.result()
526
527 else:
528
529 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
530
531 # remove temporal values yaml file
532 if file_to_delete:
533 os.remove(file_to_delete)
534
535 # write final status
536 await self._store_status(
537 cluster_uuid=cluster_uuid,
538 kdu_instance=kdu_instance,
539 db_dict=db_dict,
540 operation='upgrade',
541 run_once=True,
542 check_every=0
543 )
544
545 if rc != 0:
546 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
547 self.error(msg)
548 raise K8sException(msg)
549
550 # return new revision number
551 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
552 if instance:
553 revision = int(instance.get('Revision'))
554 self.debug('New revision: {}'.format(revision))
555 return revision
556 else:
557 return 0
558
559 async def rollback(
560 self,
561 cluster_uuid: str,
562 kdu_instance: str,
563 revision=0,
564 db_dict: dict = None
565 ):
566
567 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
568 .format(kdu_instance, revision, cluster_uuid))
569
570 # config filename
571 kube_dir, helm_dir, config_filename, cluster_dir = \
572 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
573
574 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
575 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
576
577 # exec helm in a task
578 exec_task = asyncio.ensure_future(
579 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
580 )
581 # write status in another task
582 status_task = asyncio.ensure_future(
583 coro_or_future=self._store_status(
584 cluster_uuid=cluster_uuid,
585 kdu_instance=kdu_instance,
586 db_dict=db_dict,
587 operation='rollback',
588 run_once=False
589 )
590 )
591
592 # wait for execution task
593 await asyncio.wait([exec_task])
594
595 # cancel status task
596 status_task.cancel()
597
598 output, rc = exec_task.result()
599
600 # write final status
601 await self._store_status(
602 cluster_uuid=cluster_uuid,
603 kdu_instance=kdu_instance,
604 db_dict=db_dict,
605 operation='rollback',
606 run_once=True,
607 check_every=0
608 )
609
610 if rc != 0:
611 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
612 self.error(msg)
613 raise K8sException(msg)
614
615 # return new revision number
616 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
617 if instance:
618 revision = int(instance.get('Revision'))
619 self.debug('New revision: {}'.format(revision))
620 return revision
621 else:
622 return 0
623
624 async def uninstall(
625 self,
626 cluster_uuid: str,
627 kdu_instance: str
628 ):
629 """
630 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
631 after all _terminate-config-primitive_ of the VNF are invoked).
632
633 :param cluster_uuid: UUID of a K8s cluster known by OSM
634 :param kdu_instance: unique name for the KDU instance to be deleted
635 :return: True if successful
636 """
637
638 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
639
640 # config filename
641 kube_dir, helm_dir, config_filename, cluster_dir = \
642 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
643
644 command = '{} --kubeconfig={} --home={} delete --purge {}'\
645 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
646
647 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
648
649 return self._output_to_table(output)
650
651 async def inspect_kdu(
652 self,
653 kdu_model: str,
654 repo_url: str = None
655 ) -> str:
656
657 self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
658
659 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
660
661 async def values_kdu(
662 self,
663 kdu_model: str,
664 repo_url: str = None
665 ) -> str:
666
667 self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
668
669 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
670
671 async def help_kdu(
672 self,
673 kdu_model: str,
674 repo_url: str = None
675 ) -> str:
676
677 self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
678
679 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
680
681 async def status_kdu(
682 self,
683 cluster_uuid: str,
684 kdu_instance: str
685 ) -> str:
686
687 # call internal function
688 return await self._status_kdu(
689 cluster_uuid=cluster_uuid,
690 kdu_instance=kdu_instance,
691 show_error_log=True,
692 return_text=True
693 )
694
695 """
696 ##################################################################################################
697 ########################################## P R I V A T E #########################################
698 ##################################################################################################
699 """
700
701 async def _exec_inspect_comand(
702 self,
703 inspect_command: str,
704 kdu_model: str,
705 repo_url: str = None
706 ):
707
708 repo_str = ''
709 if repo_url:
710 repo_str = ' --repo {}'.format(repo_url)
711 idx = kdu_model.find('/')
712 if idx >= 0:
713 idx += 1
714 kdu_model = kdu_model[idx:]
715
716 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
717 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
718
719 return output
720
721 async def _status_kdu(
722 self,
723 cluster_uuid: str,
724 kdu_instance: str,
725 show_error_log: bool = False,
726 return_text: bool = False
727 ):
728
729 self.debug('status of kdu_instance {}'.format(kdu_instance))
730
731 # config filename
732 kube_dir, helm_dir, config_filename, cluster_dir = \
733 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
734
735 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
736 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
737
738 output, rc = await self._local_async_exec(
739 command=command,
740 raise_exception_on_error=True,
741 show_error_log=show_error_log
742 )
743
744 if return_text:
745 return str(output)
746
747 if rc != 0:
748 return None
749
750 data = yaml.load(output, Loader=yaml.SafeLoader)
751
752 # remove field 'notes'
753 try:
754 del data.get('info').get('status')['notes']
755 except KeyError:
756 pass
757
758 # parse field 'resources'
759 try:
760 resources = str(data.get('info').get('status').get('resources'))
761 resource_table = self._output_to_table(resources)
762 data.get('info').get('status')['resources'] = resource_table
763 except Exception as e:
764 pass
765
766 return data
767
768 async def get_instance_info(
769 self,
770 cluster_uuid: str,
771 kdu_instance: str
772 ):
773 instances = await self.instances_list(cluster_uuid=cluster_uuid)
774 for instance in instances:
775 if instance.get('Name') == kdu_instance:
776 return instance
777 self.debug('Instance {} not found'.format(kdu_instance))
778 return None
779
780 @staticmethod
781 def _generate_release_name(
782 chart_name: str
783 ):
784 # check embeded chart (file or dir)
785 if chart_name.startswith('/'):
786 # extract file or directory name
787 chart_name = chart_name[chart_name.rfind('/')+1:]
788 # check URL
789 elif '://' in chart_name:
790 # extract last portion of URL
791 chart_name = chart_name[chart_name.rfind('/')+1:]
792
793 name = ''
794 for c in chart_name:
795 if c.isalpha() or c.isnumeric():
796 name += c
797 else:
798 name += '-'
799 if len(name) > 35:
800 name = name[0:35]
801
802 # if does not start with alpha character, prefix 'a'
803 if not name[0].isalpha():
804 name = 'a' + name
805
806 name += '-'
807
808 def get_random_number():
809 r = random.randrange(start=1, stop=99999999)
810 s = str(r)
811 s = s.rjust(10, '0')
812 return s
813
814 name = name + get_random_number()
815 return name.lower()
816
817 async def _store_status(
818 self,
819 cluster_uuid: str,
820 operation: str,
821 kdu_instance: str,
822 check_every: float = 10,
823 db_dict: dict = None,
824 run_once: bool = False
825 ):
826 while True:
827 try:
828 await asyncio.sleep(check_every)
829 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
830 status = detailed_status.get('info').get('Description')
831 print('=' * 60)
832 self.debug('STATUS:\n{}'.format(status))
833 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
834 print('=' * 60)
835 # write status to db
836 result = await self.write_app_status_to_db(
837 db_dict=db_dict,
838 status=str(status),
839 detailed_status=str(detailed_status),
840 operation=operation)
841 if not result:
842 self.info('Error writing in database. Task exiting...')
843 return
844 except asyncio.CancelledError:
845 self.debug('Task cancelled')
846 return
847 except Exception as e:
848 pass
849 finally:
850 if run_once:
851 return
852
853 async def _is_install_completed(
854 self,
855 cluster_uuid: str,
856 kdu_instance: str
857 ) -> bool:
858
859 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
860
861 # extract info.status.resources-> str
862 # format:
863 # ==> v1/Deployment
864 # NAME READY UP-TO-DATE AVAILABLE AGE
865 # halting-horse-mongodb 0/1 1 0 0s
866 # halting-petit-mongodb 1/1 1 0 0s
867 # blank line
868 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
869
870 # convert to table
871 resources = K8sHelmConnector._output_to_table(resources)
872
873 num_lines = len(resources)
874 index = 0
875 while index < num_lines:
876 try:
877 line1 = resources[index]
878 index += 1
879 # find '==>' in column 0
880 if line1[0] == '==>':
881 line2 = resources[index]
882 index += 1
883 # find READY in column 1
884 if line2[1] == 'READY':
885 # read next lines
886 line3 = resources[index]
887 index += 1
888 while len(line3) > 1 and index < num_lines:
889 ready_value = line3[1]
890 parts = ready_value.split(sep='/')
891 current = int(parts[0])
892 total = int(parts[1])
893 if current < total:
894 self.debug('NOT READY:\n {}'.format(line3))
895 ready = False
896 line3 = resources[index]
897 index += 1
898
899 except Exception as e:
900 pass
901
902 return ready
903
904 @staticmethod
905 def _get_deep(dictionary: dict, members: tuple):
906 target = dictionary
907 value = None
908 try:
909 for m in members:
910 value = target.get(m)
911 if not value:
912 return None
913 else:
914 target = value
915 except Exception as e:
916 pass
917 return value
918
919 # find key:value in several lines
920 @staticmethod
921 def _find_in_lines(p_lines: list, p_key: str) -> str:
922 for line in p_lines:
923 try:
924 if line.startswith(p_key + ':'):
925 parts = line.split(':')
926 the_value = parts[1].strip()
927 return the_value
928 except Exception as e:
929 # ignore it
930 pass
931 return None
932
933 # params for use in -f file
934 # returns values file option and filename (in order to delete it at the end)
935 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
936
937 if params and len(params) > 0:
938 kube_dir, helm_dir, config_filename, cluster_dir = \
939 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
940
941 def get_random_number():
942 r = random.randrange(start=1, stop=99999999)
943 s = str(r)
944 while len(s) < 10:
945 s = '0' + s
946 return s
947
948 params2 = dict()
949 for key in params:
950 value = params.get(key)
951 if '!!yaml' in str(value):
952 value = yaml.load(value[7:])
953 params2[key] = value
954
955 values_file = get_random_number() + '.yaml'
956 with open(values_file, 'w') as stream:
957 yaml.dump(params2, stream, indent=4, default_flow_style=False)
958
959 return '-f {}'.format(values_file), values_file
960
961 return '', None
962
963 # params for use in --set option
964 @staticmethod
965 def _params_to_set_option(params: dict) -> str:
966 params_str = ''
967 if params and len(params) > 0:
968 start = True
969 for key in params:
970 value = params.get(key, None)
971 if value is not None:
972 if start:
973 params_str += '--set '
974 start = False
975 else:
976 params_str += ','
977 params_str += '{}={}'.format(key, value)
978 return params_str
979
980 @staticmethod
981 def _output_to_lines(output: str) -> list:
982 output_lines = list()
983 lines = output.splitlines(keepends=False)
984 for line in lines:
985 line = line.strip()
986 if len(line) > 0:
987 output_lines.append(line)
988 return output_lines
989
990 @staticmethod
991 def _output_to_table(output: str) -> list:
992 output_table = list()
993 lines = output.splitlines(keepends=False)
994 for line in lines:
995 line = line.replace('\t', ' ')
996 line_list = list()
997 output_table.append(line_list)
998 cells = line.split(sep=' ')
999 for cell in cells:
1000 cell = cell.strip()
1001 if len(cell) > 0:
1002 line_list.append(cell)
1003 return output_table
1004
1005 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
1006 """
1007 Returns kube and helm directories
1008
1009 :param cluster_name:
1010 :param create_if_not_exist:
1011 :return: kube, helm directories, config filename and cluster dir.
1012 Raises exception if not exist and cannot create
1013 """
1014
1015 base = self.fs.path
1016 if base.endswith("/") or base.endswith("\\"):
1017 base = base[:-1]
1018
1019 # base dir for cluster
1020 cluster_dir = base + '/' + cluster_name
1021 if create_if_not_exist and not os.path.exists(cluster_dir):
1022 self.debug('Creating dir {}'.format(cluster_dir))
1023 os.makedirs(cluster_dir)
1024 if not os.path.exists(cluster_dir):
1025 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
1026 self.error(msg)
1027 raise K8sException(msg)
1028
1029 # kube dir
1030 kube_dir = cluster_dir + '/' + '.kube'
1031 if create_if_not_exist and not os.path.exists(kube_dir):
1032 self.debug('Creating dir {}'.format(kube_dir))
1033 os.makedirs(kube_dir)
1034 if not os.path.exists(kube_dir):
1035 msg = 'Kube config dir {} does not exist'.format(kube_dir)
1036 self.error(msg)
1037 raise K8sException(msg)
1038
1039 # helm home dir
1040 helm_dir = cluster_dir + '/' + '.helm'
1041 if create_if_not_exist and not os.path.exists(helm_dir):
1042 self.debug('Creating dir {}'.format(helm_dir))
1043 os.makedirs(helm_dir)
1044 if not os.path.exists(helm_dir):
1045 msg = 'Helm config dir {} does not exist'.format(helm_dir)
1046 self.error(msg)
1047 raise K8sException(msg)
1048
1049 config_filename = kube_dir + '/config'
1050 return kube_dir, helm_dir, config_filename, cluster_dir
1051
1052 @staticmethod
1053 def _remove_multiple_spaces(str):
1054 str = str.strip()
1055 while ' ' in str:
1056 str = str.replace(' ', ' ')
1057 return str
1058
1059 def _local_exec(
1060 self,
1061 command: str
1062 ) -> (str, int):
1063 command = K8sHelmConnector._remove_multiple_spaces(command)
1064 self.debug('Executing sync local command: {}'.format(command))
1065 # raise exception if fails
1066 output = ''
1067 try:
1068 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1069 return_code = 0
1070 self.debug(output)
1071 except Exception as e:
1072 return_code = 1
1073
1074 return output, return_code
1075
1076 async def _local_async_exec(
1077 self,
1078 command: str,
1079 raise_exception_on_error: bool = False,
1080 show_error_log: bool = True,
1081 encode_utf8: bool = False
1082 ) -> (str, int):
1083
1084 command = K8sHelmConnector._remove_multiple_spaces(command)
1085 self.debug('Executing async local command: {}'.format(command))
1086
1087 # split command
1088 command = command.split(sep=' ')
1089
1090 try:
1091 process = await asyncio.create_subprocess_exec(
1092 *command,
1093 stdout=asyncio.subprocess.PIPE,
1094 stderr=asyncio.subprocess.PIPE
1095 )
1096
1097 # wait for command terminate
1098 stdout, stderr = await process.communicate()
1099
1100 return_code = process.returncode
1101
1102 output = ''
1103 if stdout:
1104 output = stdout.decode('utf-8').strip()
1105 # output = stdout.decode()
1106 if stderr:
1107 output = stderr.decode('utf-8').strip()
1108 # output = stderr.decode()
1109
1110 if return_code != 0 and show_error_log:
1111 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1112 else:
1113 self.debug('Return code: {}'.format(return_code))
1114
1115 if raise_exception_on_error and return_code != 0:
1116 raise Exception(output)
1117
1118 if encode_utf8:
1119 output = output.encode('utf-8').strip()
1120 output = str(output).replace('\\n', '\n')
1121
1122 return output, return_code
1123
1124 except Exception as e:
1125 msg = 'Exception executing command: {} -> {}'.format(command, e)
1126 if show_error_log:
1127 self.error(msg)
1128 if raise_exception_on_error:
1129 raise e
1130 else:
1131 return '', -1
1132
1133 def _remote_exec(
1134 self,
1135 hostname: str,
1136 username: str,
1137 password: str,
1138 command: str,
1139 timeout: int = 10
1140 ) -> (str, int):
1141
1142 command = K8sHelmConnector._remove_multiple_spaces(command)
1143 self.debug('Executing sync remote ssh command: {}'.format(command))
1144
1145 ssh = paramiko.SSHClient()
1146 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1147 ssh.connect(hostname=hostname, username=username, password=password)
1148 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1149 output = ssh_stdout.read().decode('utf-8')
1150 error = ssh_stderr.read().decode('utf-8')
1151 if error:
1152 self.error('ERROR: {}'.format(error))
1153 return_code = 1
1154 else:
1155 return_code = 0
1156 output = output.replace('\\n', '\n')
1157 self.debug('OUTPUT: {}'.format(output))
1158
1159 return output, return_code
1160
1161 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1162 self.debug('Checking if file {} exists...'.format(filename))
1163 if os.path.exists(filename):
1164 return True
1165 else:
1166 msg = 'File {} does not exist'.format(filename)
1167 if exception_if_not_exists:
1168 self.error(msg)
1169 raise K8sException(msg)