blob: cd15d73b1d8589260df63bec1fb813619ec1d0df [file] [log] [blame]
quilesj26c78a42019-10-28 18:10:42 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22
23import paramiko
24import subprocess
25import os
26import shutil
27import asyncio
quilesj26c78a42019-10-28 18:10:42 +010028import time
29import yaml
30from uuid import uuid4
31import random
32from n2vc.k8s_conn import K8sConnector
33
34
35class K8sHelmConnector(K8sConnector):
36
37 """
38 ##################################################################################################
39 ########################################## P U B L I C ###########################################
40 ##################################################################################################
41 """
42
43 def __init__(
44 self,
45 fs: object,
46 db: object,
47 kubectl_command: str = '/usr/bin/kubectl',
48 helm_command: str = '/usr/bin/helm',
49 log: object = None,
50 on_update_db=None
51 ):
52 """
53
54 :param fs: file system for kubernetes and helm configuration
55 :param db: database object to write current operation status
56 :param kubectl_command: path to kubectl executable
57 :param helm_command: path to helm executable
58 :param log: logger
59 :param on_update_db: callback called when k8s connector updates database
60 """
61
62 # parent class
63 K8sConnector.__init__(
64 self,
65 db=db,
66 log=log,
67 on_update_db=on_update_db
68 )
69
70 self.info('Initializing K8S Helm connector')
71
72 # random numbers for release name generation
73 random.seed(time.time())
74
75 # the file system
76 self.fs = fs
77
78 # exception if kubectl is not installed
79 self.kubectl_command = kubectl_command
80 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
81
82 # exception if helm is not installed
83 self._helm_command = helm_command
84 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
85
86 self.info('K8S Helm connector initialized')
87
88 async def init_env(
89 self,
90 k8s_creds: str,
91 namespace: str = 'kube-system',
92 reuse_cluster_uuid=None
93 ) -> (str, bool):
94
95 cluster_uuid = reuse_cluster_uuid
96 if not cluster_uuid:
97 cluster_uuid = str(uuid4())
98
99 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
100
101 # create config filename
quilesjcda5f412019-11-18 11:32:12 +0100102 kube_dir, helm_dir, config_filename, cluster_dir = \
103 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100104 f = open(config_filename, "w")
105 f.write(k8s_creds)
106 f.close()
107
108 # check if tiller pod is up in cluster
109 command = '{} --kubeconfig={} --namespace={} get deployments'\
110 .format(self.kubectl_command, config_filename, namespace)
111 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
112
113 output_table = K8sHelmConnector._output_to_table(output=output)
114
115 # find 'tiller' pod in all pods
116 already_initialized = False
117 try:
118 for row in output_table:
119 if row[0].startswith('tiller-deploy'):
120 already_initialized = True
121 break
122 except Exception as e:
123 pass
124
125 # helm init
126 n2vc_installed_sw = False
127 if not already_initialized:
128 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
129 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
130 .format(self._helm_command, config_filename, namespace, helm_dir)
131 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
132 n2vc_installed_sw = True
133 else:
134 # check client helm installation
135 check_file = helm_dir + '/repository/repositories.yaml'
136 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
137 self.info('Initializing helm in client: {}'.format(cluster_uuid))
138 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
139 .format(self._helm_command, config_filename, namespace, helm_dir)
140 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
141 else:
142 self.info('Helm client already initialized')
143
144 self.info('Cluster initialized {}'.format(cluster_uuid))
145
146 return cluster_uuid, n2vc_installed_sw
147
148 async def repo_add(
149 self,
150 cluster_uuid: str,
151 name: str,
152 url: str,
153 repo_type: str = 'chart'
154 ):
155
156 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
157
158 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100159 kube_dir, helm_dir, config_filename, cluster_dir = \
160 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100161
162 # helm repo update
163 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
164 self.debug('updating repo: {}'.format(command))
165 await self._local_async_exec(command=command, raise_exception_on_error=False)
166
167 # helm repo add name url
168 command = '{} --kubeconfig={} --home={} repo add {} {}'\
169 .format(self._helm_command, config_filename, helm_dir, name, url)
170 self.debug('adding repo: {}'.format(command))
171 await self._local_async_exec(command=command, raise_exception_on_error=True)
172
173 async def repo_list(
174 self,
175 cluster_uuid: str
176 ) -> list:
177 """
178 Get the list of registered repositories
179
180 :return: list of registered repositories: [ (name, url) .... ]
181 """
182
183 self.debug('list repositories for cluster {}'.format(cluster_uuid))
184
185 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100186 kube_dir, helm_dir, config_filename, cluster_dir = \
187 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100188
189 command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
190
191 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
192 if output and len(output) > 0:
193 return yaml.load(output, Loader=yaml.SafeLoader)
194 else:
195 return []
196
197 async def repo_remove(
198 self,
199 cluster_uuid: str,
200 name: str
201 ):
202 """
203 Remove a repository from OSM
204
205 :param cluster_uuid: the cluster
206 :param name: repo name in OSM
207 :return: True if successful
208 """
209
210 self.debug('list repositories for cluster {}'.format(cluster_uuid))
211
212 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100213 kube_dir, helm_dir, config_filename, cluster_dir = \
214 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100215
216 command = '{} --kubeconfig={} --home={} repo remove {}'\
217 .format(self._helm_command, config_filename, helm_dir, name)
218
219 await self._local_async_exec(command=command, raise_exception_on_error=True)
220
221 async def reset(
222 self,
223 cluster_uuid: str,
224 force: bool = False,
225 uninstall_sw: bool = False
226 ) -> bool:
227
228 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
229
230 # get kube and helm directories
quilesjcda5f412019-11-18 11:32:12 +0100231 kube_dir, helm_dir, config_filename, cluster_dir = \
232 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
quilesj26c78a42019-10-28 18:10:42 +0100233
234 # uninstall releases if needed
235 releases = await self.instances_list(cluster_uuid=cluster_uuid)
236 if len(releases) > 0:
237 if force:
238 for r in releases:
239 try:
240 kdu_instance = r.get('Name')
241 chart = r.get('Chart')
242 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
243 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
244 except Exception as e:
245 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
246 else:
247 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
248 .format(cluster_uuid)
249 self.error(msg)
250 raise Exception(msg)
251
252 if uninstall_sw:
253
254 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
255
256 # find namespace for tiller pod
257 command = '{} --kubeconfig={} get deployments --all-namespaces'\
258 .format(self.kubectl_command, config_filename)
259 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
260 output_table = K8sHelmConnector._output_to_table(output=output)
261 namespace = None
262 for r in output_table:
263 try:
264 if 'tiller-deploy' in r[1]:
265 namespace = r[0]
266 break
267 except Exception as e:
268 pass
269 else:
270 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
271 self.error(msg)
272 # raise Exception(msg)
273
274 self.debug('namespace for tiller: {}'.format(namespace))
275
276 force_str = '--force'
277
278 if namespace:
279 # delete tiller deployment
280 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
281 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
282 .format(self.kubectl_command, namespace, config_filename, force_str)
283 await self._local_async_exec(command=command, raise_exception_on_error=False)
284
285 # uninstall tiller from cluster
286 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
287 command = '{} --kubeconfig={} --home={} reset'\
288 .format(self._helm_command, config_filename, helm_dir)
289 self.debug('resetting: {}'.format(command))
290 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
291 else:
292 self.debug('namespace not found')
293
294 # delete cluster directory
295 dir = self.fs.path + '/' + cluster_uuid
296 self.debug('Removing directory {}'.format(dir))
297 shutil.rmtree(dir, ignore_errors=True)
298
299 return True
300
301 async def install(
302 self,
303 cluster_uuid: str,
304 kdu_model: str,
305 atomic: bool = True,
306 timeout: float = 300,
307 params: dict = None,
308 db_dict: dict = None
309 ):
310
311 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
312
313 start = time.time()
314 end = start + timeout
315
316 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100317 kube_dir, helm_dir, config_filename, cluster_dir = \
318 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100319
320 # params to str
quilesjcda5f412019-11-18 11:32:12 +0100321 # params_str = K8sHelmConnector._params_to_set_option(params)
322 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
quilesj26c78a42019-10-28 18:10:42 +0100323
324 timeout_str = ''
325 if timeout:
326 timeout_str = '--timeout {}'.format(timeout)
327
328 # atomic
329 atomic_str = ''
330 if atomic:
331 atomic_str = '--atomic'
332
333 # version
334 version_str = ''
335 if ':' in kdu_model:
336 parts = kdu_model.split(sep=':')
337 if len(parts) == 2:
338 version_str = '--version {}'.format(parts[1])
339 kdu_model = parts[0]
340
341 # generate a name for the releas. Then, check if already exists
342 kdu_instance = None
343 while kdu_instance is None:
344 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
345 try:
346 result = await self._status_kdu(
347 cluster_uuid=cluster_uuid,
348 kdu_instance=kdu_instance,
349 show_error_log=False
350 )
351 if result is not None:
352 # instance already exists: generate a new one
353 kdu_instance = None
354 except:
355 kdu_instance = None
356
357 # helm repo install
358 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
359 .format(self._helm_command, atomic_str, config_filename, helm_dir,
360 params_str, timeout_str, kdu_instance, kdu_model, version_str)
361 self.debug('installing: {}'.format(command))
362
363 if atomic:
364 # exec helm in a task
365 exec_task = asyncio.ensure_future(
366 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
367 )
368 # write status in another task
369 status_task = asyncio.ensure_future(
370 coro_or_future=self._store_status(
371 cluster_uuid=cluster_uuid,
372 kdu_instance=kdu_instance,
373 db_dict=db_dict,
374 operation='install',
375 run_once=False
376 )
377 )
378
379 # wait for execution task
380 await asyncio.wait([exec_task])
381
382 # cancel status task
383 status_task.cancel()
384
385 output, rc = exec_task.result()
386
387 else:
388
389 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
390
quilesjcda5f412019-11-18 11:32:12 +0100391 # remove temporal values yaml file
392 if file_to_delete:
393 os.remove(file_to_delete)
394
quilesj26c78a42019-10-28 18:10:42 +0100395 # write final status
396 await self._store_status(
397 cluster_uuid=cluster_uuid,
398 kdu_instance=kdu_instance,
399 db_dict=db_dict,
400 operation='install',
401 run_once=True,
402 check_every=0
403 )
404
405 if rc != 0:
406 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
407 self.error(msg)
408 raise Exception(msg)
409
410 self.debug('Returning kdu_instance {}'.format(kdu_instance))
411 return kdu_instance
412
413 async def instances_list(
414 self,
415 cluster_uuid: str
416 ) -> list:
417 """
418 returns a list of deployed releases in a cluster
419
420 :param cluster_uuid: the cluster
421 :return:
422 """
423
424 self.debug('list releases for cluster {}'.format(cluster_uuid))
425
426 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100427 kube_dir, helm_dir, config_filename, cluster_dir = \
428 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100429
430 command = '{} --kubeconfig={} --home={} list --output yaml'\
431 .format(self._helm_command, config_filename, helm_dir)
432
433 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
434
435 if output and len(output) > 0:
436 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
437 else:
438 return []
439
440 async def upgrade(
441 self,
442 cluster_uuid: str,
443 kdu_instance: str,
444 kdu_model: str = None,
445 atomic: bool = True,
446 timeout: float = 300,
447 params: dict = None,
448 db_dict: dict = None
449 ):
450
451 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
452
453 start = time.time()
454 end = start + timeout
455
456 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100457 kube_dir, helm_dir, config_filename, cluster_dir = \
458 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100459
460 # params to str
quilesjcda5f412019-11-18 11:32:12 +0100461 # params_str = K8sHelmConnector._params_to_set_option(params)
462 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
quilesj26c78a42019-10-28 18:10:42 +0100463
464 timeout_str = ''
465 if timeout:
466 timeout_str = '--timeout {}'.format(timeout)
467
468 # atomic
469 atomic_str = ''
470 if atomic:
471 atomic_str = '--atomic'
472
473 # version
474 version_str = ''
quilesjcda5f412019-11-18 11:32:12 +0100475 if kdu_model and ':' in kdu_model:
quilesj26c78a42019-10-28 18:10:42 +0100476 parts = kdu_model.split(sep=':')
477 if len(parts) == 2:
478 version_str = '--version {}'.format(parts[1])
479 kdu_model = parts[0]
480
481 # helm repo upgrade
482 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
483 .format(self._helm_command, atomic_str, config_filename, helm_dir,
484 params_str, timeout_str, kdu_instance, kdu_model, version_str)
485 self.debug('upgrading: {}'.format(command))
486
487 if atomic:
488
489 # exec helm in a task
490 exec_task = asyncio.ensure_future(
491 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
492 )
493 # write status in another task
494 status_task = asyncio.ensure_future(
495 coro_or_future=self._store_status(
496 cluster_uuid=cluster_uuid,
497 kdu_instance=kdu_instance,
498 db_dict=db_dict,
499 operation='upgrade',
500 run_once=False
501 )
502 )
503
504 # wait for execution task
505 await asyncio.wait([ exec_task ])
506
507 # cancel status task
508 status_task.cancel()
509 output, rc = exec_task.result()
510
511 else:
512
513 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
514
quilesjcda5f412019-11-18 11:32:12 +0100515 # remove temporal values yaml file
516 if file_to_delete:
517 os.remove(file_to_delete)
518
quilesj26c78a42019-10-28 18:10:42 +0100519 # write final status
520 await self._store_status(
521 cluster_uuid=cluster_uuid,
522 kdu_instance=kdu_instance,
523 db_dict=db_dict,
524 operation='upgrade',
525 run_once=True,
526 check_every=0
527 )
528
529 if rc != 0:
530 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
531 self.error(msg)
532 raise Exception(msg)
533
534 # return new revision number
535 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
536 if instance:
537 revision = int(instance.get('Revision'))
538 self.debug('New revision: {}'.format(revision))
539 return revision
540 else:
541 return 0
542
543 async def rollback(
544 self,
545 cluster_uuid: str,
546 kdu_instance: str,
547 revision=0,
548 db_dict: dict = None
549 ):
550
551 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
552 .format(kdu_instance, revision, cluster_uuid))
553
554 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100555 kube_dir, helm_dir, config_filename, cluster_dir = \
556 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100557
558 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
559 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
560
561 # exec helm in a task
562 exec_task = asyncio.ensure_future(
563 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
564 )
565 # write status in another task
566 status_task = asyncio.ensure_future(
567 coro_or_future=self._store_status(
568 cluster_uuid=cluster_uuid,
569 kdu_instance=kdu_instance,
570 db_dict=db_dict,
571 operation='rollback',
572 run_once=False
573 )
574 )
575
576 # wait for execution task
577 await asyncio.wait([exec_task])
578
579 # cancel status task
580 status_task.cancel()
581
582 output, rc = exec_task.result()
583
584 # write final status
585 await self._store_status(
586 cluster_uuid=cluster_uuid,
587 kdu_instance=kdu_instance,
588 db_dict=db_dict,
589 operation='rollback',
590 run_once=True,
591 check_every=0
592 )
593
594 if rc != 0:
595 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
596 self.error(msg)
597 raise Exception(msg)
598
599 # return new revision number
600 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
601 if instance:
602 revision = int(instance.get('Revision'))
603 self.debug('New revision: {}'.format(revision))
604 return revision
605 else:
606 return 0
607
608 async def uninstall(
609 self,
610 cluster_uuid: str,
611 kdu_instance: str
612 ):
613 """
614 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
615 after all _terminate-config-primitive_ of the VNF are invoked).
616
617 :param cluster_uuid: UUID of a K8s cluster known by OSM
618 :param kdu_instance: unique name for the KDU instance to be deleted
619 :return: True if successful
620 """
621
622 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
623
624 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100625 kube_dir, helm_dir, config_filename, cluster_dir = \
626 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100627
628 command = '{} --kubeconfig={} --home={} delete --purge {}'\
629 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
630
631 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
632
633 return self._output_to_table(output)
634
635 async def inspect_kdu(
636 self,
637 kdu_model: str
638 ) -> str:
639
640 self.debug('inspect kdu_model {}'.format(kdu_model))
641
642 command = '{} inspect values {}'\
643 .format(self._helm_command, kdu_model)
644
645 output, rc = await self._local_async_exec(command=command)
646
647 return output
648
649 async def help_kdu(
650 self,
651 kdu_model: str
652 ):
653
654 self.debug('help kdu_model {}'.format(kdu_model))
655
656 command = '{} inspect readme {}'\
657 .format(self._helm_command, kdu_model)
658
659 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
660
661 return output
662
663 async def status_kdu(
664 self,
665 cluster_uuid: str,
666 kdu_instance: str
667 ):
668
669 return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
670
671
672 """
673 ##################################################################################################
674 ########################################## P R I V A T E #########################################
675 ##################################################################################################
676 """
677
678 async def _status_kdu(
679 self,
680 cluster_uuid: str,
681 kdu_instance: str,
682 show_error_log: bool = False
683 ):
684
685 self.debug('status of kdu_instance {}'.format(kdu_instance))
686
687 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100688 kube_dir, helm_dir, config_filename, cluster_dir = \
689 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100690
691 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
692 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
693
694 output, rc = await self._local_async_exec(
695 command=command,
696 raise_exception_on_error=True,
697 show_error_log=show_error_log
698 )
699
700 if rc != 0:
701 return None
702
703 data = yaml.load(output, Loader=yaml.SafeLoader)
704
705 # remove field 'notes'
706 try:
707 del data.get('info').get('status')['notes']
708 except KeyError:
709 pass
710
711 # parse field 'resources'
712 try:
713 resources = str(data.get('info').get('status').get('resources'))
714 resource_table = self._output_to_table(resources)
715 data.get('info').get('status')['resources'] = resource_table
716 except Exception as e:
717 pass
718
719 return data
720
quilesj26c78a42019-10-28 18:10:42 +0100721 async def get_instance_info(
722 self,
723 cluster_uuid: str,
724 kdu_instance: str
725 ):
726 instances = await self.instances_list(cluster_uuid=cluster_uuid)
727 for instance in instances:
728 if instance.get('Name') == kdu_instance:
729 return instance
730 self.debug('Instance {} not found'.format(kdu_instance))
731 return None
732
733 @staticmethod
734 def _generate_release_name(
735 chart_name: str
736 ):
737 name = ''
738 for c in chart_name:
739 if c.isalpha() or c.isnumeric():
740 name += c
741 else:
742 name += '-'
743 if len(name) > 35:
744 name = name[0:35]
745
746 # if does not start with alpha character, prefix 'a'
747 if not name[0].isalpha():
748 name = 'a' + name
749
750 name += '-'
751
752 def get_random_number():
753 r = random.randrange(start=1, stop=99999999)
754 s = str(r)
quilesjcda5f412019-11-18 11:32:12 +0100755 s = s.rjust(width=10, fillchar=' ')
quilesj26c78a42019-10-28 18:10:42 +0100756 return s
757
758 name = name + get_random_number()
759 return name.lower()
760
761 async def _store_status(
762 self,
763 cluster_uuid: str,
764 operation: str,
765 kdu_instance: str,
766 check_every: float = 10,
767 db_dict: dict = None,
768 run_once: bool = False
769 ):
770 while True:
771 try:
772 await asyncio.sleep(check_every)
773 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
774 status = detailed_status.get('info').get('Description')
775 print('=' * 60)
776 self.debug('STATUS:\n{}'.format(status))
777 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
778 print('=' * 60)
779 # write status to db
780 result = await self.write_app_status_to_db(
781 db_dict=db_dict,
782 status=str(status),
783 detailed_status=str(detailed_status),
784 operation=operation)
785 if not result:
786 self.info('Error writing in database. Task exiting...')
787 return
788 except asyncio.CancelledError:
789 self.debug('Task cancelled')
790 return
791 except Exception as e:
792 pass
793 finally:
794 if run_once:
795 return
796
797 async def _is_install_completed(
798 self,
799 cluster_uuid: str,
800 kdu_instance: str
801 ) -> bool:
802
803 status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
804
805 # extract info.status.resources-> str
806 # format:
807 # ==> v1/Deployment
808 # NAME READY UP-TO-DATE AVAILABLE AGE
809 # halting-horse-mongodb 0/1 1 0 0s
810 # halting-petit-mongodb 1/1 1 0 0s
811 # blank line
812 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
813
814 # convert to table
815 resources = K8sHelmConnector._output_to_table(resources)
816
817 num_lines = len(resources)
818 index = 0
819 while index < num_lines:
820 try:
821 line1 = resources[index]
822 index += 1
823 # find '==>' in column 0
824 if line1[0] == '==>':
825 line2 = resources[index]
826 index += 1
827 # find READY in column 1
828 if line2[1] == 'READY':
829 # read next lines
830 line3 = resources[index]
831 index += 1
832 while len(line3) > 1 and index < num_lines:
833 ready_value = line3[1]
834 parts = ready_value.split(sep='/')
835 current = int(parts[0])
836 total = int(parts[1])
837 if current < total:
838 self.debug('NOT READY:\n {}'.format(line3))
839 ready = False
840 line3 = resources[index]
841 index += 1
842
843 except Exception as e:
844 pass
845
846 return ready
847
848 @staticmethod
849 def _get_deep(dictionary: dict, members: tuple):
850 target = dictionary
851 value = None
852 try:
853 for m in members:
854 value = target.get(m)
855 if not value:
856 return None
857 else:
858 target = value
859 except Exception as e:
860 pass
861 return value
862
863 # find key:value in several lines
864 @staticmethod
865 def _find_in_lines(p_lines: list, p_key: str) -> str:
866 for line in p_lines:
867 try:
868 if line.startswith(p_key + ':'):
869 parts = line.split(':')
870 the_value = parts[1].strip()
871 return the_value
872 except Exception as e:
873 # ignore it
874 pass
875 return None
876
quilesjcda5f412019-11-18 11:32:12 +0100877 # params for use in -f file
878 # returns values file option and filename (in order to delete it at the end)
879 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
880 params_str = ''
881
882 if params and len(params) > 0:
883 kube_dir, helm_dir, config_filename, cluster_dir = \
884 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
885
886 def get_random_number():
887 r = random.randrange(start=1, stop=99999999)
888 s = str(r)
889 while len(s) < 10:
890 s = '0' + s
891 return s
892
893 params2 = dict()
894 for key in params:
895 value = params.get(key)
896 if '!!yaml' in str(value):
897 value = yaml.load(value[7:])
898 params2[key] = value
899
900 values_file = get_random_number() + '.yaml'
901 with open(values_file, 'w') as stream:
902 yaml.dump(params2, stream, indent=4, default_flow_style=False)
903
904 return '-f {}'.format(values_file), values_file
905
906 return '', None
907
quilesj26c78a42019-10-28 18:10:42 +0100908 # params for use in --set option
909 @staticmethod
910 def _params_to_set_option(params: dict) -> str:
911 params_str = ''
912 if params and len(params) > 0:
913 start = True
914 for key in params:
915 value = params.get(key, None)
916 if value is not None:
917 if start:
918 params_str += '--set '
919 start = False
920 else:
921 params_str += ','
922 params_str += '{}={}'.format(key, value)
923 return params_str
924
925 @staticmethod
926 def _output_to_lines(output: str) -> list:
927 output_lines = list()
928 lines = output.splitlines(keepends=False)
929 for line in lines:
930 line = line.strip()
931 if len(line) > 0:
932 output_lines.append(line)
933 return output_lines
934
935 @staticmethod
936 def _output_to_table(output: str) -> list:
937 output_table = list()
938 lines = output.splitlines(keepends=False)
939 for line in lines:
940 line = line.replace('\t', ' ')
941 line_list = list()
942 output_table.append(line_list)
943 cells = line.split(sep=' ')
944 for cell in cells:
945 cell = cell.strip()
946 if len(cell) > 0:
947 line_list.append(cell)
948 return output_table
949
quilesjcda5f412019-11-18 11:32:12 +0100950 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
quilesj26c78a42019-10-28 18:10:42 +0100951 """
952 Returns kube and helm directories
953
954 :param cluster_name:
955 :param create_if_not_exist:
quilesjcda5f412019-11-18 11:32:12 +0100956 :return: kube, helm directories, config filename and cluster dir.
957 Raises exception if not exist and cannot create
quilesj26c78a42019-10-28 18:10:42 +0100958 """
959
960 base = self.fs.path
961 if base.endswith("/") or base.endswith("\\"):
962 base = base[:-1]
963
964 # base dir for cluster
965 cluster_dir = base + '/' + cluster_name
966 if create_if_not_exist and not os.path.exists(cluster_dir):
967 self.debug('Creating dir {}'.format(cluster_dir))
968 os.makedirs(cluster_dir)
969 if not os.path.exists(cluster_dir):
970 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
971 self.error(msg)
972 raise Exception(msg)
973
974 # kube dir
975 kube_dir = cluster_dir + '/' + '.kube'
976 if create_if_not_exist and not os.path.exists(kube_dir):
977 self.debug('Creating dir {}'.format(kube_dir))
978 os.makedirs(kube_dir)
979 if not os.path.exists(kube_dir):
980 msg = 'Kube config dir {} does not exist'.format(kube_dir)
981 self.error(msg)
982 raise Exception(msg)
983
984 # helm home dir
985 helm_dir = cluster_dir + '/' + '.helm'
986 if create_if_not_exist and not os.path.exists(helm_dir):
987 self.debug('Creating dir {}'.format(helm_dir))
988 os.makedirs(helm_dir)
989 if not os.path.exists(helm_dir):
990 msg = 'Helm config dir {} does not exist'.format(helm_dir)
991 self.error(msg)
992 raise Exception(msg)
993
994 config_filename = kube_dir + '/config'
quilesjcda5f412019-11-18 11:32:12 +0100995 return kube_dir, helm_dir, config_filename, cluster_dir
quilesj26c78a42019-10-28 18:10:42 +0100996
997 @staticmethod
998 def _remove_multiple_spaces(str):
999 str = str.strip()
1000 while ' ' in str:
1001 str = str.replace(' ', ' ')
1002 return str
1003
1004 def _local_exec(
1005 self,
1006 command: str
1007 ) -> (str, int):
1008 command = K8sHelmConnector._remove_multiple_spaces(command)
1009 self.debug('Executing sync local command: {}'.format(command))
1010 # raise exception if fails
1011 output = ''
1012 try:
1013 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1014 return_code = 0
1015 self.debug(output)
1016 except Exception as e:
1017 return_code = 1
1018
1019 return output, return_code
1020
1021 async def _local_async_exec(
1022 self,
1023 command: str,
1024 raise_exception_on_error: bool = False,
quilesjcda5f412019-11-18 11:32:12 +01001025 show_error_log: bool = True
quilesj26c78a42019-10-28 18:10:42 +01001026 ) -> (str, int):
1027
1028 command = K8sHelmConnector._remove_multiple_spaces(command)
1029 self.debug('Executing async local command: {}'.format(command))
1030
1031 # split command
1032 command = command.split(sep=' ')
1033
1034 try:
1035 process = await asyncio.create_subprocess_exec(
1036 *command,
1037 stdout=asyncio.subprocess.PIPE,
1038 stderr=asyncio.subprocess.PIPE
1039 )
1040
1041 # wait for command terminate
1042 stdout, stderr = await process.communicate()
1043
1044 return_code = process.returncode
1045
1046 output = ''
1047 if stdout:
1048 output = stdout.decode('utf-8').strip()
1049 if stderr:
1050 output = stderr.decode('utf-8').strip()
1051
1052 if return_code != 0 and show_error_log:
1053 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1054 else:
1055 self.debug('Return code: {}'.format(return_code))
1056
1057 if raise_exception_on_error and return_code != 0:
1058 raise Exception(output)
1059
1060 return output, return_code
1061
1062 except Exception as e:
1063 msg = 'Exception executing command: {} -> {}'.format(command, e)
1064 if show_error_log:
1065 self.error(msg)
1066 return '', -1
1067
1068 def _remote_exec(
1069 self,
1070 hostname: str,
1071 username: str,
1072 password: str,
1073 command: str,
1074 timeout: int = 10
1075 ) -> (str, int):
1076
1077 command = K8sHelmConnector._remove_multiple_spaces(command)
1078 self.debug('Executing sync remote ssh command: {}'.format(command))
1079
1080 ssh = paramiko.SSHClient()
1081 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1082 ssh.connect(hostname=hostname, username=username, password=password)
1083 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1084 output = ssh_stdout.read().decode('utf-8')
1085 error = ssh_stderr.read().decode('utf-8')
1086 if error:
1087 self.error('ERROR: {}'.format(error))
1088 return_code = 1
1089 else:
1090 return_code = 0
1091 output = output.replace('\\n', '\n')
1092 self.debug('OUTPUT: {}'.format(output))
1093
1094 return output, return_code
1095
1096 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1097 self.debug('Checking if file {} exists...'.format(filename))
1098 if os.path.exists(filename):
1099 return True
1100 else:
1101 msg = 'File {} does not exist'.format(filename)
1102 if exception_if_not_exists:
1103 self.error(msg)
1104 raise Exception(msg)
1105