1007 Use KDU name and NS id for model names
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import paramiko
24 import subprocess
25 import os
26 import shutil
27 import asyncio
28 import time
29 import yaml
30 from uuid import uuid4
31 import random
32 from n2vc.k8s_conn import K8sConnector
33 from n2vc.exceptions import K8sException
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = '/usr/bin/kubectl',
49 helm_command: str = '/usr/bin/helm',
50 log: object = None,
51 on_update_db=None
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(
65 self,
66 db=db,
67 log=log,
68 on_update_db=on_update_db
69 )
70
71 self.info('Initializing K8S Helm connector')
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # initialize helm client-only
88 self.debug('Initializing helm client-only...')
89 command = '{} init --client-only'.format(self._helm_command)
90 try:
91 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
92 # loop = asyncio.get_event_loop()
93 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
94 except Exception as e:
95 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
96
97 self.info('K8S Helm connector initialized')
98
99 async def init_env(
100 self,
101 k8s_creds: str,
102 namespace: str = 'kube-system',
103 reuse_cluster_uuid=None
104 ) -> (str, bool):
105 """
106 It prepares a given K8s cluster environment to run Charts on both sides:
107 client (OSM)
108 server (Tiller)
109
110 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
111 :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
114 (on error, an exception will be raised)
115 """
116
117 cluster_uuid = reuse_cluster_uuid
118 if not cluster_uuid:
119 cluster_uuid = str(uuid4())
120
121 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
122
123 # create config filename
124 kube_dir, helm_dir, config_filename, cluster_dir = \
125 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
126 f = open(config_filename, "w")
127 f.write(k8s_creds)
128 f.close()
129
130 # check if tiller pod is up in cluster
131 command = '{} --kubeconfig={} --namespace={} get deployments'\
132 .format(self.kubectl_command, config_filename, namespace)
133 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
134
135 output_table = K8sHelmConnector._output_to_table(output=output)
136
137 # find 'tiller' pod in all pods
138 already_initialized = False
139 try:
140 for row in output_table:
141 if row[0].startswith('tiller-deploy'):
142 already_initialized = True
143 break
144 except Exception as e:
145 pass
146
147 # helm init
148 n2vc_installed_sw = False
149 if not already_initialized:
150 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
151 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
152 .format(self._helm_command, config_filename, namespace, helm_dir)
153 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
154 n2vc_installed_sw = True
155 else:
156 # check client helm installation
157 check_file = helm_dir + '/repository/repositories.yaml'
158 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
159 self.info('Initializing helm in client: {}'.format(cluster_uuid))
160 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
161 .format(self._helm_command, config_filename, namespace, helm_dir)
162 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
163 else:
164 self.info('Helm client already initialized')
165
166 self.info('Cluster initialized {}'.format(cluster_uuid))
167
168 return cluster_uuid, n2vc_installed_sw
169
170 async def repo_add(
171 self,
172 cluster_uuid: str,
173 name: str,
174 url: str,
175 repo_type: str = 'chart'
176 ):
177
178 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
179
180 # config filename
181 kube_dir, helm_dir, config_filename, cluster_dir = \
182 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
183
184 # helm repo update
185 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
186 self.debug('updating repo: {}'.format(command))
187 await self._local_async_exec(command=command, raise_exception_on_error=False)
188
189 # helm repo add name url
190 command = '{} --kubeconfig={} --home={} repo add {} {}'\
191 .format(self._helm_command, config_filename, helm_dir, name, url)
192 self.debug('adding repo: {}'.format(command))
193 await self._local_async_exec(command=command, raise_exception_on_error=True)
194
195 async def repo_list(
196 self,
197 cluster_uuid: str
198 ) -> list:
199 """
200 Get the list of registered repositories
201
202 :return: list of registered repositories: [ (name, url) .... ]
203 """
204
205 self.debug('list repositories for cluster {}'.format(cluster_uuid))
206
207 # config filename
208 kube_dir, helm_dir, config_filename, cluster_dir = \
209 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
210
211 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
212 .format(self._helm_command, config_filename, helm_dir)
213
214 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
215 if output and len(output) > 0:
216 return yaml.load(output, Loader=yaml.SafeLoader)
217 else:
218 return []
219
220 async def repo_remove(
221 self,
222 cluster_uuid: str,
223 name: str
224 ):
225 """
226 Remove a repository from OSM
227
228 :param cluster_uuid: the cluster
229 :param name: repo name in OSM
230 :return: True if successful
231 """
232
233 self.debug('list repositories for cluster {}'.format(cluster_uuid))
234
235 # config filename
236 kube_dir, helm_dir, config_filename, cluster_dir = \
237 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
238
239 command = '{} --kubeconfig={} --home={} repo remove {}'\
240 .format(self._helm_command, config_filename, helm_dir, name)
241
242 await self._local_async_exec(command=command, raise_exception_on_error=True)
243
244 async def reset(
245 self,
246 cluster_uuid: str,
247 force: bool = False,
248 uninstall_sw: bool = False
249 ) -> bool:
250
251 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
252
253 # get kube and helm directories
254 kube_dir, helm_dir, config_filename, cluster_dir = \
255 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
256
257 # uninstall releases if needed
258 releases = await self.instances_list(cluster_uuid=cluster_uuid)
259 if len(releases) > 0:
260 if force:
261 for r in releases:
262 try:
263 kdu_instance = r.get('Name')
264 chart = r.get('Chart')
265 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
266 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
267 except Exception as e:
268 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
269 else:
270 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
271 .format(cluster_uuid)
272 self.error(msg)
273 raise K8sException(msg)
274
275 if uninstall_sw:
276
277 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
278
279 # find namespace for tiller pod
280 command = '{} --kubeconfig={} get deployments --all-namespaces'\
281 .format(self.kubectl_command, config_filename)
282 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
283 output_table = K8sHelmConnector._output_to_table(output=output)
284 namespace = None
285 for r in output_table:
286 try:
287 if 'tiller-deploy' in r[1]:
288 namespace = r[0]
289 break
290 except Exception as e:
291 pass
292 else:
293 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
294 self.error(msg)
295
296 self.debug('namespace for tiller: {}'.format(namespace))
297
298 force_str = '--force'
299
300 if namespace:
301 # delete tiller deployment
302 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
303 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
304 .format(self.kubectl_command, namespace, config_filename, force_str)
305 await self._local_async_exec(command=command, raise_exception_on_error=False)
306
307 # uninstall tiller from cluster
308 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
309 command = '{} --kubeconfig={} --home={} reset'\
310 .format(self._helm_command, config_filename, helm_dir)
311 self.debug('resetting: {}'.format(command))
312 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
313 else:
314 self.debug('namespace not found')
315
316 # delete cluster directory
317 dir = self.fs.path + '/' + cluster_uuid
318 self.debug('Removing directory {}'.format(dir))
319 shutil.rmtree(dir, ignore_errors=True)
320
321 return True
322
323 async def install(
324 self,
325 cluster_uuid: str,
326 kdu_model: str,
327 atomic: bool = True,
328 timeout: float = 300,
329 params: dict = None,
330 db_dict: dict = None,
331 kdu_name: str = None
332 ):
333
334 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
335
336 # config filename
337 kube_dir, helm_dir, config_filename, cluster_dir = \
338 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
339
340 # params to str
341 # params_str = K8sHelmConnector._params_to_set_option(params)
342 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
343
344 timeout_str = ''
345 if timeout:
346 timeout_str = '--timeout {}'.format(timeout)
347
348 # atomic
349 atomic_str = ''
350 if atomic:
351 atomic_str = '--atomic'
352
353 # version
354 version_str = ''
355 if ':' in kdu_model:
356 parts = kdu_model.split(sep=':')
357 if len(parts) == 2:
358 version_str = '--version {}'.format(parts[1])
359 kdu_model = parts[0]
360
361 # generate a name for the release. Then, check if already exists
362 kdu_instance = None
363 while kdu_instance is None:
364 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
365 try:
366 result = await self._status_kdu(
367 cluster_uuid=cluster_uuid,
368 kdu_instance=kdu_instance,
369 show_error_log=False
370 )
371 if result is not None:
372 # instance already exists: generate a new one
373 kdu_instance = None
374 except K8sException:
375 pass
376
377 # helm repo install
378 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
379 .format(self._helm_command, atomic_str, config_filename, helm_dir,
380 params_str, timeout_str, kdu_instance, kdu_model, version_str)
381 self.debug('installing: {}'.format(command))
382
383 if atomic:
384 # exec helm in a task
385 exec_task = asyncio.ensure_future(
386 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
387 )
388 # write status in another task
389 status_task = asyncio.ensure_future(
390 coro_or_future=self._store_status(
391 cluster_uuid=cluster_uuid,
392 kdu_instance=kdu_instance,
393 db_dict=db_dict,
394 operation='install',
395 run_once=False
396 )
397 )
398
399 # wait for execution task
400 await asyncio.wait([exec_task])
401
402 # cancel status task
403 status_task.cancel()
404
405 output, rc = exec_task.result()
406
407 else:
408
409 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
410
411 # remove temporal values yaml file
412 if file_to_delete:
413 os.remove(file_to_delete)
414
415 # write final status
416 await self._store_status(
417 cluster_uuid=cluster_uuid,
418 kdu_instance=kdu_instance,
419 db_dict=db_dict,
420 operation='install',
421 run_once=True,
422 check_every=0
423 )
424
425 if rc != 0:
426 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
427 self.error(msg)
428 raise K8sException(msg)
429
430 self.debug('Returning kdu_instance {}'.format(kdu_instance))
431 return kdu_instance
432
433 async def instances_list(
434 self,
435 cluster_uuid: str
436 ) -> list:
437 """
438 returns a list of deployed releases in a cluster
439
440 :param cluster_uuid: the cluster
441 :return:
442 """
443
444 self.debug('list releases for cluster {}'.format(cluster_uuid))
445
446 # config filename
447 kube_dir, helm_dir, config_filename, cluster_dir = \
448 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
449
450 command = '{} --kubeconfig={} --home={} list --output yaml'\
451 .format(self._helm_command, config_filename, helm_dir)
452
453 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
454
455 if output and len(output) > 0:
456 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
457 else:
458 return []
459
460 async def upgrade(
461 self,
462 cluster_uuid: str,
463 kdu_instance: str,
464 kdu_model: str = None,
465 atomic: bool = True,
466 timeout: float = 300,
467 params: dict = None,
468 db_dict: dict = None
469 ):
470
471 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
472
473 # config filename
474 kube_dir, helm_dir, config_filename, cluster_dir = \
475 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
476
477 # params to str
478 # params_str = K8sHelmConnector._params_to_set_option(params)
479 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
480
481 timeout_str = ''
482 if timeout:
483 timeout_str = '--timeout {}'.format(timeout)
484
485 # atomic
486 atomic_str = ''
487 if atomic:
488 atomic_str = '--atomic'
489
490 # version
491 version_str = ''
492 if kdu_model and ':' in kdu_model:
493 parts = kdu_model.split(sep=':')
494 if len(parts) == 2:
495 version_str = '--version {}'.format(parts[1])
496 kdu_model = parts[0]
497
498 # helm repo upgrade
499 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
500 .format(self._helm_command, atomic_str, config_filename, helm_dir,
501 params_str, timeout_str, kdu_instance, kdu_model, version_str)
502 self.debug('upgrading: {}'.format(command))
503
504 if atomic:
505
506 # exec helm in a task
507 exec_task = asyncio.ensure_future(
508 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
509 )
510 # write status in another task
511 status_task = asyncio.ensure_future(
512 coro_or_future=self._store_status(
513 cluster_uuid=cluster_uuid,
514 kdu_instance=kdu_instance,
515 db_dict=db_dict,
516 operation='upgrade',
517 run_once=False
518 )
519 )
520
521 # wait for execution task
522 await asyncio.wait([exec_task])
523
524 # cancel status task
525 status_task.cancel()
526 output, rc = exec_task.result()
527
528 else:
529
530 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
531
532 # remove temporal values yaml file
533 if file_to_delete:
534 os.remove(file_to_delete)
535
536 # write final status
537 await self._store_status(
538 cluster_uuid=cluster_uuid,
539 kdu_instance=kdu_instance,
540 db_dict=db_dict,
541 operation='upgrade',
542 run_once=True,
543 check_every=0
544 )
545
546 if rc != 0:
547 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
548 self.error(msg)
549 raise K8sException(msg)
550
551 # return new revision number
552 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
553 if instance:
554 revision = int(instance.get('Revision'))
555 self.debug('New revision: {}'.format(revision))
556 return revision
557 else:
558 return 0
559
560 async def rollback(
561 self,
562 cluster_uuid: str,
563 kdu_instance: str,
564 revision=0,
565 db_dict: dict = None
566 ):
567
568 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
569 .format(kdu_instance, revision, cluster_uuid))
570
571 # config filename
572 kube_dir, helm_dir, config_filename, cluster_dir = \
573 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
574
575 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
576 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
577
578 # exec helm in a task
579 exec_task = asyncio.ensure_future(
580 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
581 )
582 # write status in another task
583 status_task = asyncio.ensure_future(
584 coro_or_future=self._store_status(
585 cluster_uuid=cluster_uuid,
586 kdu_instance=kdu_instance,
587 db_dict=db_dict,
588 operation='rollback',
589 run_once=False
590 )
591 )
592
593 # wait for execution task
594 await asyncio.wait([exec_task])
595
596 # cancel status task
597 status_task.cancel()
598
599 output, rc = exec_task.result()
600
601 # write final status
602 await self._store_status(
603 cluster_uuid=cluster_uuid,
604 kdu_instance=kdu_instance,
605 db_dict=db_dict,
606 operation='rollback',
607 run_once=True,
608 check_every=0
609 )
610
611 if rc != 0:
612 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
613 self.error(msg)
614 raise K8sException(msg)
615
616 # return new revision number
617 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
618 if instance:
619 revision = int(instance.get('Revision'))
620 self.debug('New revision: {}'.format(revision))
621 return revision
622 else:
623 return 0
624
625 async def uninstall(
626 self,
627 cluster_uuid: str,
628 kdu_instance: str
629 ):
630 """
631 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
632 after all _terminate-config-primitive_ of the VNF are invoked).
633
634 :param cluster_uuid: UUID of a K8s cluster known by OSM
635 :param kdu_instance: unique name for the KDU instance to be deleted
636 :return: True if successful
637 """
638
639 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
640
641 # config filename
642 kube_dir, helm_dir, config_filename, cluster_dir = \
643 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
644
645 command = '{} --kubeconfig={} --home={} delete --purge {}'\
646 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
647
648 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
649
650 return self._output_to_table(output)
651
652 async def inspect_kdu(
653 self,
654 kdu_model: str,
655 repo_url: str = None
656 ) -> str:
657
658 self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
659
660 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
661
662 async def values_kdu(
663 self,
664 kdu_model: str,
665 repo_url: str = None
666 ) -> str:
667
668 self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
669
670 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
671
672 async def help_kdu(
673 self,
674 kdu_model: str,
675 repo_url: str = None
676 ) -> str:
677
678 self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
679
680 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
681
682 async def status_kdu(
683 self,
684 cluster_uuid: str,
685 kdu_instance: str
686 ) -> str:
687
688 # call internal function
689 return await self._status_kdu(
690 cluster_uuid=cluster_uuid,
691 kdu_instance=kdu_instance,
692 show_error_log=True,
693 return_text=True
694 )
695
696 """
697 ##################################################################################################
698 ########################################## P R I V A T E #########################################
699 ##################################################################################################
700 """
701
702 async def _exec_inspect_comand(
703 self,
704 inspect_command: str,
705 kdu_model: str,
706 repo_url: str = None
707 ):
708
709 repo_str = ''
710 if repo_url:
711 repo_str = ' --repo {}'.format(repo_url)
712 idx = kdu_model.find('/')
713 if idx >= 0:
714 idx += 1
715 kdu_model = kdu_model[idx:]
716
717 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
718 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
719
720 return output
721
722 async def _status_kdu(
723 self,
724 cluster_uuid: str,
725 kdu_instance: str,
726 show_error_log: bool = False,
727 return_text: bool = False
728 ):
729
730 self.debug('status of kdu_instance {}'.format(kdu_instance))
731
732 # config filename
733 kube_dir, helm_dir, config_filename, cluster_dir = \
734 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
735
736 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
737 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
738
739 output, rc = await self._local_async_exec(
740 command=command,
741 raise_exception_on_error=True,
742 show_error_log=show_error_log
743 )
744
745 if return_text:
746 return str(output)
747
748 if rc != 0:
749 return None
750
751 data = yaml.load(output, Loader=yaml.SafeLoader)
752
753 # remove field 'notes'
754 try:
755 del data.get('info').get('status')['notes']
756 except KeyError:
757 pass
758
759 # parse field 'resources'
760 try:
761 resources = str(data.get('info').get('status').get('resources'))
762 resource_table = self._output_to_table(resources)
763 data.get('info').get('status')['resources'] = resource_table
764 except Exception as e:
765 pass
766
767 return data
768
769 async def get_instance_info(
770 self,
771 cluster_uuid: str,
772 kdu_instance: str
773 ):
774 instances = await self.instances_list(cluster_uuid=cluster_uuid)
775 for instance in instances:
776 if instance.get('Name') == kdu_instance:
777 return instance
778 self.debug('Instance {} not found'.format(kdu_instance))
779 return None
780
781 @staticmethod
782 def _generate_release_name(
783 chart_name: str
784 ):
785 # check embeded chart (file or dir)
786 if chart_name.startswith('/'):
787 # extract file or directory name
788 chart_name = chart_name[chart_name.rfind('/')+1:]
789 # check URL
790 elif '://' in chart_name:
791 # extract last portion of URL
792 chart_name = chart_name[chart_name.rfind('/')+1:]
793
794 name = ''
795 for c in chart_name:
796 if c.isalpha() or c.isnumeric():
797 name += c
798 else:
799 name += '-'
800 if len(name) > 35:
801 name = name[0:35]
802
803 # if does not start with alpha character, prefix 'a'
804 if not name[0].isalpha():
805 name = 'a' + name
806
807 name += '-'
808
809 def get_random_number():
810 r = random.randrange(start=1, stop=99999999)
811 s = str(r)
812 s = s.rjust(10, '0')
813 return s
814
815 name = name + get_random_number()
816 return name.lower()
817
818 async def _store_status(
819 self,
820 cluster_uuid: str,
821 operation: str,
822 kdu_instance: str,
823 check_every: float = 10,
824 db_dict: dict = None,
825 run_once: bool = False
826 ):
827 while True:
828 try:
829 await asyncio.sleep(check_every)
830 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
831 status = detailed_status.get('info').get('Description')
832 print('=' * 60)
833 self.debug('STATUS:\n{}'.format(status))
834 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
835 print('=' * 60)
836 # write status to db
837 result = await self.write_app_status_to_db(
838 db_dict=db_dict,
839 status=str(status),
840 detailed_status=str(detailed_status),
841 operation=operation)
842 if not result:
843 self.info('Error writing in database. Task exiting...')
844 return
845 except asyncio.CancelledError:
846 self.debug('Task cancelled')
847 return
848 except Exception as e:
849 pass
850 finally:
851 if run_once:
852 return
853
854 async def _is_install_completed(
855 self,
856 cluster_uuid: str,
857 kdu_instance: str
858 ) -> bool:
859
860 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
861
862 # extract info.status.resources-> str
863 # format:
864 # ==> v1/Deployment
865 # NAME READY UP-TO-DATE AVAILABLE AGE
866 # halting-horse-mongodb 0/1 1 0 0s
867 # halting-petit-mongodb 1/1 1 0 0s
868 # blank line
869 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
870
871 # convert to table
872 resources = K8sHelmConnector._output_to_table(resources)
873
874 num_lines = len(resources)
875 index = 0
876 while index < num_lines:
877 try:
878 line1 = resources[index]
879 index += 1
880 # find '==>' in column 0
881 if line1[0] == '==>':
882 line2 = resources[index]
883 index += 1
884 # find READY in column 1
885 if line2[1] == 'READY':
886 # read next lines
887 line3 = resources[index]
888 index += 1
889 while len(line3) > 1 and index < num_lines:
890 ready_value = line3[1]
891 parts = ready_value.split(sep='/')
892 current = int(parts[0])
893 total = int(parts[1])
894 if current < total:
895 self.debug('NOT READY:\n {}'.format(line3))
896 ready = False
897 line3 = resources[index]
898 index += 1
899
900 except Exception as e:
901 pass
902
903 return ready
904
905 @staticmethod
906 def _get_deep(dictionary: dict, members: tuple):
907 target = dictionary
908 value = None
909 try:
910 for m in members:
911 value = target.get(m)
912 if not value:
913 return None
914 else:
915 target = value
916 except Exception as e:
917 pass
918 return value
919
920 # find key:value in several lines
921 @staticmethod
922 def _find_in_lines(p_lines: list, p_key: str) -> str:
923 for line in p_lines:
924 try:
925 if line.startswith(p_key + ':'):
926 parts = line.split(':')
927 the_value = parts[1].strip()
928 return the_value
929 except Exception as e:
930 # ignore it
931 pass
932 return None
933
934 # params for use in -f file
935 # returns values file option and filename (in order to delete it at the end)
936 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
937
938 if params and len(params) > 0:
939 kube_dir, helm_dir, config_filename, cluster_dir = \
940 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
941
942 def get_random_number():
943 r = random.randrange(start=1, stop=99999999)
944 s = str(r)
945 while len(s) < 10:
946 s = '0' + s
947 return s
948
949 params2 = dict()
950 for key in params:
951 value = params.get(key)
952 if '!!yaml' in str(value):
953 value = yaml.load(value[7:])
954 params2[key] = value
955
956 values_file = get_random_number() + '.yaml'
957 with open(values_file, 'w') as stream:
958 yaml.dump(params2, stream, indent=4, default_flow_style=False)
959
960 return '-f {}'.format(values_file), values_file
961
962 return '', None
963
964 # params for use in --set option
965 @staticmethod
966 def _params_to_set_option(params: dict) -> str:
967 params_str = ''
968 if params and len(params) > 0:
969 start = True
970 for key in params:
971 value = params.get(key, None)
972 if value is not None:
973 if start:
974 params_str += '--set '
975 start = False
976 else:
977 params_str += ','
978 params_str += '{}={}'.format(key, value)
979 return params_str
980
981 @staticmethod
982 def _output_to_lines(output: str) -> list:
983 output_lines = list()
984 lines = output.splitlines(keepends=False)
985 for line in lines:
986 line = line.strip()
987 if len(line) > 0:
988 output_lines.append(line)
989 return output_lines
990
991 @staticmethod
992 def _output_to_table(output: str) -> list:
993 output_table = list()
994 lines = output.splitlines(keepends=False)
995 for line in lines:
996 line = line.replace('\t', ' ')
997 line_list = list()
998 output_table.append(line_list)
999 cells = line.split(sep=' ')
1000 for cell in cells:
1001 cell = cell.strip()
1002 if len(cell) > 0:
1003 line_list.append(cell)
1004 return output_table
1005
1006 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
1007 """
1008 Returns kube and helm directories
1009
1010 :param cluster_name:
1011 :param create_if_not_exist:
1012 :return: kube, helm directories, config filename and cluster dir.
1013 Raises exception if not exist and cannot create
1014 """
1015
1016 base = self.fs.path
1017 if base.endswith("/") or base.endswith("\\"):
1018 base = base[:-1]
1019
1020 # base dir for cluster
1021 cluster_dir = base + '/' + cluster_name
1022 if create_if_not_exist and not os.path.exists(cluster_dir):
1023 self.debug('Creating dir {}'.format(cluster_dir))
1024 os.makedirs(cluster_dir)
1025 if not os.path.exists(cluster_dir):
1026 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
1027 self.error(msg)
1028 raise K8sException(msg)
1029
1030 # kube dir
1031 kube_dir = cluster_dir + '/' + '.kube'
1032 if create_if_not_exist and not os.path.exists(kube_dir):
1033 self.debug('Creating dir {}'.format(kube_dir))
1034 os.makedirs(kube_dir)
1035 if not os.path.exists(kube_dir):
1036 msg = 'Kube config dir {} does not exist'.format(kube_dir)
1037 self.error(msg)
1038 raise K8sException(msg)
1039
1040 # helm home dir
1041 helm_dir = cluster_dir + '/' + '.helm'
1042 if create_if_not_exist and not os.path.exists(helm_dir):
1043 self.debug('Creating dir {}'.format(helm_dir))
1044 os.makedirs(helm_dir)
1045 if not os.path.exists(helm_dir):
1046 msg = 'Helm config dir {} does not exist'.format(helm_dir)
1047 self.error(msg)
1048 raise K8sException(msg)
1049
1050 config_filename = kube_dir + '/config'
1051 return kube_dir, helm_dir, config_filename, cluster_dir
1052
1053 @staticmethod
1054 def _remove_multiple_spaces(str):
1055 str = str.strip()
1056 while ' ' in str:
1057 str = str.replace(' ', ' ')
1058 return str
1059
1060 def _local_exec(
1061 self,
1062 command: str
1063 ) -> (str, int):
1064 command = K8sHelmConnector._remove_multiple_spaces(command)
1065 self.debug('Executing sync local command: {}'.format(command))
1066 # raise exception if fails
1067 output = ''
1068 try:
1069 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1070 return_code = 0
1071 self.debug(output)
1072 except Exception as e:
1073 return_code = 1
1074
1075 return output, return_code
1076
1077 async def _local_async_exec(
1078 self,
1079 command: str,
1080 raise_exception_on_error: bool = False,
1081 show_error_log: bool = True,
1082 encode_utf8: bool = False
1083 ) -> (str, int):
1084
1085 command = K8sHelmConnector._remove_multiple_spaces(command)
1086 self.debug('Executing async local command: {}'.format(command))
1087
1088 # split command
1089 command = command.split(sep=' ')
1090
1091 try:
1092 process = await asyncio.create_subprocess_exec(
1093 *command,
1094 stdout=asyncio.subprocess.PIPE,
1095 stderr=asyncio.subprocess.PIPE
1096 )
1097
1098 # wait for command terminate
1099 stdout, stderr = await process.communicate()
1100
1101 return_code = process.returncode
1102
1103 output = ''
1104 if stdout:
1105 output = stdout.decode('utf-8').strip()
1106 # output = stdout.decode()
1107 if stderr:
1108 output = stderr.decode('utf-8').strip()
1109 # output = stderr.decode()
1110
1111 if return_code != 0 and show_error_log:
1112 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1113 else:
1114 self.debug('Return code: {}'.format(return_code))
1115
1116 if raise_exception_on_error and return_code != 0:
1117 raise K8sException(output)
1118
1119 if encode_utf8:
1120 output = output.encode('utf-8').strip()
1121 output = str(output).replace('\\n', '\n')
1122
1123 return output, return_code
1124
1125 except K8sException:
1126 raise
1127 except Exception as e:
1128 msg = 'Exception executing command: {} -> {}'.format(command, e)
1129 self.error(msg)
1130 if raise_exception_on_error:
1131 raise K8sException(e) from e
1132 else:
1133 return '', -1
1134
1135 def _remote_exec(
1136 self,
1137 hostname: str,
1138 username: str,
1139 password: str,
1140 command: str,
1141 timeout: int = 10
1142 ) -> (str, int):
1143
1144 command = K8sHelmConnector._remove_multiple_spaces(command)
1145 self.debug('Executing sync remote ssh command: {}'.format(command))
1146
1147 ssh = paramiko.SSHClient()
1148 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1149 ssh.connect(hostname=hostname, username=username, password=password)
1150 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1151 output = ssh_stdout.read().decode('utf-8')
1152 error = ssh_stderr.read().decode('utf-8')
1153 if error:
1154 self.error('ERROR: {}'.format(error))
1155 return_code = 1
1156 else:
1157 return_code = 0
1158 output = output.replace('\\n', '\n')
1159 self.debug('OUTPUT: {}'.format(output))
1160
1161 return output, return_code
1162
1163 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1164 self.debug('Checking if file {} exists...'.format(filename))
1165 if os.path.exists(filename):
1166 return True
1167 else:
1168 msg = 'File {} does not exist'.format(filename)
1169 if exception_if_not_exists:
1170 self.error(msg)
1171 raise K8sException(msg)