blob: f49159eeb2a20ee216edeb5833286e271510949a [file] [log] [blame]
quilesj26c78a42019-10-28 18:10:42 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22
23import paramiko
24import subprocess
25import os
26import shutil
27import asyncio
quilesj26c78a42019-10-28 18:10:42 +010028import time
29import yaml
30from uuid import uuid4
31import random
32from n2vc.k8s_conn import K8sConnector
quilesja6748412019-12-04 07:51:26 +000033from n2vc.exceptions import K8sException
quilesj26c78a42019-10-28 18:10:42 +010034
35
36class K8sHelmConnector(K8sConnector):
37
38 """
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = '/usr/bin/kubectl',
49 helm_command: str = '/usr/bin/helm',
50 log: object = None,
51 on_update_db=None
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(
65 self,
66 db=db,
67 log=log,
68 on_update_db=on_update_db
69 )
70
71 self.info('Initializing K8S Helm connector')
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
quilesj1be06302019-11-29 11:17:11 +000087 # initialize helm client-only
88 self.debug('Initializing helm client-only...')
89 command = '{} init --client-only'.format(self._helm_command)
90 try:
91 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
92 # loop = asyncio.get_event_loop()
93 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
94 except Exception as e:
95 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
96
quilesj26c78a42019-10-28 18:10:42 +010097 self.info('K8S Helm connector initialized')
98
99 async def init_env(
100 self,
101 k8s_creds: str,
102 namespace: str = 'kube-system',
103 reuse_cluster_uuid=None
104 ) -> (str, bool):
105
106 cluster_uuid = reuse_cluster_uuid
107 if not cluster_uuid:
108 cluster_uuid = str(uuid4())
109
110 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
111
112 # create config filename
quilesjcda5f412019-11-18 11:32:12 +0100113 kube_dir, helm_dir, config_filename, cluster_dir = \
114 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100115 f = open(config_filename, "w")
116 f.write(k8s_creds)
117 f.close()
118
119 # check if tiller pod is up in cluster
120 command = '{} --kubeconfig={} --namespace={} get deployments'\
121 .format(self.kubectl_command, config_filename, namespace)
122 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
123
124 output_table = K8sHelmConnector._output_to_table(output=output)
125
126 # find 'tiller' pod in all pods
127 already_initialized = False
128 try:
129 for row in output_table:
130 if row[0].startswith('tiller-deploy'):
131 already_initialized = True
132 break
133 except Exception as e:
134 pass
135
136 # helm init
137 n2vc_installed_sw = False
138 if not already_initialized:
139 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
140 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
141 .format(self._helm_command, config_filename, namespace, helm_dir)
142 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
143 n2vc_installed_sw = True
144 else:
145 # check client helm installation
146 check_file = helm_dir + '/repository/repositories.yaml'
147 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
148 self.info('Initializing helm in client: {}'.format(cluster_uuid))
149 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
150 .format(self._helm_command, config_filename, namespace, helm_dir)
151 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
152 else:
153 self.info('Helm client already initialized')
154
155 self.info('Cluster initialized {}'.format(cluster_uuid))
156
157 return cluster_uuid, n2vc_installed_sw
158
159 async def repo_add(
160 self,
161 cluster_uuid: str,
162 name: str,
163 url: str,
164 repo_type: str = 'chart'
165 ):
166
167 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
168
169 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100170 kube_dir, helm_dir, config_filename, cluster_dir = \
171 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100172
173 # helm repo update
174 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
175 self.debug('updating repo: {}'.format(command))
176 await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 # helm repo add name url
179 command = '{} --kubeconfig={} --home={} repo add {} {}'\
180 .format(self._helm_command, config_filename, helm_dir, name, url)
181 self.debug('adding repo: {}'.format(command))
182 await self._local_async_exec(command=command, raise_exception_on_error=True)
183
184 async def repo_list(
185 self,
186 cluster_uuid: str
187 ) -> list:
188 """
189 Get the list of registered repositories
190
191 :return: list of registered repositories: [ (name, url) .... ]
192 """
193
194 self.debug('list repositories for cluster {}'.format(cluster_uuid))
195
196 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100197 kube_dir, helm_dir, config_filename, cluster_dir = \
198 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100199
quilesj1be06302019-11-29 11:17:11 +0000200 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
201 .format(self._helm_command, config_filename, helm_dir)
quilesj26c78a42019-10-28 18:10:42 +0100202
203 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
204 if output and len(output) > 0:
205 return yaml.load(output, Loader=yaml.SafeLoader)
206 else:
207 return []
208
209 async def repo_remove(
210 self,
211 cluster_uuid: str,
212 name: str
213 ):
214 """
215 Remove a repository from OSM
216
217 :param cluster_uuid: the cluster
218 :param name: repo name in OSM
219 :return: True if successful
220 """
221
222 self.debug('list repositories for cluster {}'.format(cluster_uuid))
223
224 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100225 kube_dir, helm_dir, config_filename, cluster_dir = \
226 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100227
228 command = '{} --kubeconfig={} --home={} repo remove {}'\
229 .format(self._helm_command, config_filename, helm_dir, name)
230
231 await self._local_async_exec(command=command, raise_exception_on_error=True)
232
233 async def reset(
234 self,
235 cluster_uuid: str,
236 force: bool = False,
237 uninstall_sw: bool = False
238 ) -> bool:
239
240 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
241
242 # get kube and helm directories
quilesjcda5f412019-11-18 11:32:12 +0100243 kube_dir, helm_dir, config_filename, cluster_dir = \
244 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
quilesj26c78a42019-10-28 18:10:42 +0100245
246 # uninstall releases if needed
247 releases = await self.instances_list(cluster_uuid=cluster_uuid)
248 if len(releases) > 0:
249 if force:
250 for r in releases:
251 try:
252 kdu_instance = r.get('Name')
253 chart = r.get('Chart')
254 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
255 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
256 except Exception as e:
257 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
258 else:
259 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
260 .format(cluster_uuid)
261 self.error(msg)
quilesja6748412019-12-04 07:51:26 +0000262 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100263
264 if uninstall_sw:
265
266 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
267
268 # find namespace for tiller pod
269 command = '{} --kubeconfig={} get deployments --all-namespaces'\
270 .format(self.kubectl_command, config_filename)
271 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
272 output_table = K8sHelmConnector._output_to_table(output=output)
273 namespace = None
274 for r in output_table:
275 try:
276 if 'tiller-deploy' in r[1]:
277 namespace = r[0]
278 break
279 except Exception as e:
280 pass
281 else:
282 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
283 self.error(msg)
quilesj26c78a42019-10-28 18:10:42 +0100284
285 self.debug('namespace for tiller: {}'.format(namespace))
286
287 force_str = '--force'
288
289 if namespace:
290 # delete tiller deployment
291 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
292 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
293 .format(self.kubectl_command, namespace, config_filename, force_str)
294 await self._local_async_exec(command=command, raise_exception_on_error=False)
295
296 # uninstall tiller from cluster
297 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
298 command = '{} --kubeconfig={} --home={} reset'\
299 .format(self._helm_command, config_filename, helm_dir)
300 self.debug('resetting: {}'.format(command))
301 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
302 else:
303 self.debug('namespace not found')
304
305 # delete cluster directory
306 dir = self.fs.path + '/' + cluster_uuid
307 self.debug('Removing directory {}'.format(dir))
308 shutil.rmtree(dir, ignore_errors=True)
309
310 return True
311
312 async def install(
313 self,
314 cluster_uuid: str,
315 kdu_model: str,
316 atomic: bool = True,
317 timeout: float = 300,
318 params: dict = None,
319 db_dict: dict = None
320 ):
321
322 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
323
quilesj26c78a42019-10-28 18:10:42 +0100324 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100325 kube_dir, helm_dir, config_filename, cluster_dir = \
326 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100327
328 # params to str
quilesjcda5f412019-11-18 11:32:12 +0100329 # params_str = K8sHelmConnector._params_to_set_option(params)
330 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
quilesj26c78a42019-10-28 18:10:42 +0100331
332 timeout_str = ''
333 if timeout:
334 timeout_str = '--timeout {}'.format(timeout)
335
336 # atomic
337 atomic_str = ''
338 if atomic:
339 atomic_str = '--atomic'
340
341 # version
342 version_str = ''
343 if ':' in kdu_model:
344 parts = kdu_model.split(sep=':')
345 if len(parts) == 2:
346 version_str = '--version {}'.format(parts[1])
347 kdu_model = parts[0]
348
quilesja6748412019-12-04 07:51:26 +0000349 # generate a name for the release. Then, check if already exists
quilesj26c78a42019-10-28 18:10:42 +0100350 kdu_instance = None
351 while kdu_instance is None:
352 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
353 try:
354 result = await self._status_kdu(
355 cluster_uuid=cluster_uuid,
356 kdu_instance=kdu_instance,
357 show_error_log=False
358 )
359 if result is not None:
360 # instance already exists: generate a new one
361 kdu_instance = None
quilesj1be06302019-11-29 11:17:11 +0000362 except Exception as e:
quilesj26c78a42019-10-28 18:10:42 +0100363 kdu_instance = None
364
365 # helm repo install
366 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
367 .format(self._helm_command, atomic_str, config_filename, helm_dir,
368 params_str, timeout_str, kdu_instance, kdu_model, version_str)
369 self.debug('installing: {}'.format(command))
370
371 if atomic:
372 # exec helm in a task
373 exec_task = asyncio.ensure_future(
374 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
375 )
376 # write status in another task
377 status_task = asyncio.ensure_future(
378 coro_or_future=self._store_status(
379 cluster_uuid=cluster_uuid,
380 kdu_instance=kdu_instance,
381 db_dict=db_dict,
382 operation='install',
383 run_once=False
384 )
385 )
386
387 # wait for execution task
388 await asyncio.wait([exec_task])
389
390 # cancel status task
391 status_task.cancel()
392
393 output, rc = exec_task.result()
394
395 else:
396
397 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
398
quilesjcda5f412019-11-18 11:32:12 +0100399 # remove temporal values yaml file
400 if file_to_delete:
401 os.remove(file_to_delete)
402
quilesj26c78a42019-10-28 18:10:42 +0100403 # write final status
404 await self._store_status(
405 cluster_uuid=cluster_uuid,
406 kdu_instance=kdu_instance,
407 db_dict=db_dict,
408 operation='install',
409 run_once=True,
410 check_every=0
411 )
412
413 if rc != 0:
414 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
415 self.error(msg)
quilesja6748412019-12-04 07:51:26 +0000416 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100417
418 self.debug('Returning kdu_instance {}'.format(kdu_instance))
419 return kdu_instance
420
421 async def instances_list(
422 self,
423 cluster_uuid: str
424 ) -> list:
425 """
426 returns a list of deployed releases in a cluster
427
428 :param cluster_uuid: the cluster
429 :return:
430 """
431
432 self.debug('list releases for cluster {}'.format(cluster_uuid))
433
434 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100435 kube_dir, helm_dir, config_filename, cluster_dir = \
436 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100437
438 command = '{} --kubeconfig={} --home={} list --output yaml'\
439 .format(self._helm_command, config_filename, helm_dir)
440
441 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
442
443 if output and len(output) > 0:
444 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
445 else:
446 return []
447
448 async def upgrade(
449 self,
450 cluster_uuid: str,
451 kdu_instance: str,
452 kdu_model: str = None,
453 atomic: bool = True,
454 timeout: float = 300,
455 params: dict = None,
456 db_dict: dict = None
457 ):
458
459 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
460
quilesj26c78a42019-10-28 18:10:42 +0100461 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100462 kube_dir, helm_dir, config_filename, cluster_dir = \
463 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100464
465 # params to str
quilesjcda5f412019-11-18 11:32:12 +0100466 # params_str = K8sHelmConnector._params_to_set_option(params)
467 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
quilesj26c78a42019-10-28 18:10:42 +0100468
469 timeout_str = ''
470 if timeout:
471 timeout_str = '--timeout {}'.format(timeout)
472
473 # atomic
474 atomic_str = ''
475 if atomic:
476 atomic_str = '--atomic'
477
478 # version
479 version_str = ''
quilesjcda5f412019-11-18 11:32:12 +0100480 if kdu_model and ':' in kdu_model:
quilesj26c78a42019-10-28 18:10:42 +0100481 parts = kdu_model.split(sep=':')
482 if len(parts) == 2:
483 version_str = '--version {}'.format(parts[1])
484 kdu_model = parts[0]
485
486 # helm repo upgrade
487 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
488 .format(self._helm_command, atomic_str, config_filename, helm_dir,
489 params_str, timeout_str, kdu_instance, kdu_model, version_str)
490 self.debug('upgrading: {}'.format(command))
491
492 if atomic:
493
494 # exec helm in a task
495 exec_task = asyncio.ensure_future(
496 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
497 )
498 # write status in another task
499 status_task = asyncio.ensure_future(
500 coro_or_future=self._store_status(
501 cluster_uuid=cluster_uuid,
502 kdu_instance=kdu_instance,
503 db_dict=db_dict,
504 operation='upgrade',
505 run_once=False
506 )
507 )
508
509 # wait for execution task
quilesj1be06302019-11-29 11:17:11 +0000510 await asyncio.wait([exec_task])
quilesj26c78a42019-10-28 18:10:42 +0100511
512 # cancel status task
513 status_task.cancel()
514 output, rc = exec_task.result()
515
516 else:
517
518 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
519
quilesjcda5f412019-11-18 11:32:12 +0100520 # remove temporal values yaml file
521 if file_to_delete:
522 os.remove(file_to_delete)
523
quilesj26c78a42019-10-28 18:10:42 +0100524 # write final status
525 await self._store_status(
526 cluster_uuid=cluster_uuid,
527 kdu_instance=kdu_instance,
528 db_dict=db_dict,
529 operation='upgrade',
530 run_once=True,
531 check_every=0
532 )
533
534 if rc != 0:
535 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
536 self.error(msg)
quilesja6748412019-12-04 07:51:26 +0000537 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100538
539 # return new revision number
540 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
541 if instance:
542 revision = int(instance.get('Revision'))
543 self.debug('New revision: {}'.format(revision))
544 return revision
545 else:
546 return 0
547
548 async def rollback(
549 self,
550 cluster_uuid: str,
551 kdu_instance: str,
552 revision=0,
553 db_dict: dict = None
554 ):
555
556 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
557 .format(kdu_instance, revision, cluster_uuid))
558
559 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100560 kube_dir, helm_dir, config_filename, cluster_dir = \
561 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100562
563 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
564 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
565
566 # exec helm in a task
567 exec_task = asyncio.ensure_future(
568 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
569 )
570 # write status in another task
571 status_task = asyncio.ensure_future(
572 coro_or_future=self._store_status(
573 cluster_uuid=cluster_uuid,
574 kdu_instance=kdu_instance,
575 db_dict=db_dict,
576 operation='rollback',
577 run_once=False
578 )
579 )
580
581 # wait for execution task
582 await asyncio.wait([exec_task])
583
584 # cancel status task
585 status_task.cancel()
586
587 output, rc = exec_task.result()
588
589 # write final status
590 await self._store_status(
591 cluster_uuid=cluster_uuid,
592 kdu_instance=kdu_instance,
593 db_dict=db_dict,
594 operation='rollback',
595 run_once=True,
596 check_every=0
597 )
598
599 if rc != 0:
600 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
601 self.error(msg)
quilesja6748412019-12-04 07:51:26 +0000602 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100603
604 # return new revision number
605 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
606 if instance:
607 revision = int(instance.get('Revision'))
608 self.debug('New revision: {}'.format(revision))
609 return revision
610 else:
611 return 0
612
613 async def uninstall(
614 self,
615 cluster_uuid: str,
616 kdu_instance: str
617 ):
618 """
619 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
620 after all _terminate-config-primitive_ of the VNF are invoked).
621
622 :param cluster_uuid: UUID of a K8s cluster known by OSM
623 :param kdu_instance: unique name for the KDU instance to be deleted
624 :return: True if successful
625 """
626
627 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
628
629 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100630 kube_dir, helm_dir, config_filename, cluster_dir = \
631 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100632
633 command = '{} --kubeconfig={} --home={} delete --purge {}'\
634 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
635
636 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
637
638 return self._output_to_table(output)
639
640 async def inspect_kdu(
641 self,
quilesj1be06302019-11-29 11:17:11 +0000642 kdu_model: str,
643 repo_url: str = None
quilesj26c78a42019-10-28 18:10:42 +0100644 ) -> str:
645
quilesj1be06302019-11-29 11:17:11 +0000646 self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
quilesj26c78a42019-10-28 18:10:42 +0100647
quilesj1be06302019-11-29 11:17:11 +0000648 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100649
quilesj1be06302019-11-29 11:17:11 +0000650 async def values_kdu(
651 self,
652 kdu_model: str,
653 repo_url: str = None
654 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100655
quilesj1be06302019-11-29 11:17:11 +0000656 self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
657
658 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100659
660 async def help_kdu(
661 self,
quilesj1be06302019-11-29 11:17:11 +0000662 kdu_model: str,
663 repo_url: str = None
664 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100665
quilesj1be06302019-11-29 11:17:11 +0000666 self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
quilesj26c78a42019-10-28 18:10:42 +0100667
quilesj1be06302019-11-29 11:17:11 +0000668 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100669
670 async def status_kdu(
671 self,
672 cluster_uuid: str,
673 kdu_instance: str
quilesj1be06302019-11-29 11:17:11 +0000674 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100675
quilesj1be06302019-11-29 11:17:11 +0000676 # call internal function
677 return await self._status_kdu(
678 cluster_uuid=cluster_uuid,
679 kdu_instance=kdu_instance,
680 show_error_log=True,
681 return_text=True
682 )
quilesj26c78a42019-10-28 18:10:42 +0100683
684 """
685 ##################################################################################################
686 ########################################## P R I V A T E #########################################
687 ##################################################################################################
688 """
689
quilesj1be06302019-11-29 11:17:11 +0000690 async def _exec_inspect_comand(
691 self,
692 inspect_command: str,
693 kdu_model: str,
694 repo_url: str = None
695 ):
696
697 repo_str = ''
698 if repo_url:
699 repo_str = ' --repo {}'.format(repo_url)
700 idx = kdu_model.find('/')
701 if idx >= 0:
702 idx += 1
703 kdu_model = kdu_model[idx:]
704
705 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
706 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
707
708 return output
709
quilesj26c78a42019-10-28 18:10:42 +0100710 async def _status_kdu(
711 self,
712 cluster_uuid: str,
713 kdu_instance: str,
quilesj1be06302019-11-29 11:17:11 +0000714 show_error_log: bool = False,
715 return_text: bool = False
quilesj26c78a42019-10-28 18:10:42 +0100716 ):
717
718 self.debug('status of kdu_instance {}'.format(kdu_instance))
719
720 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100721 kube_dir, helm_dir, config_filename, cluster_dir = \
722 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100723
724 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
725 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
726
727 output, rc = await self._local_async_exec(
728 command=command,
729 raise_exception_on_error=True,
730 show_error_log=show_error_log
731 )
732
quilesj1be06302019-11-29 11:17:11 +0000733 if return_text:
734 return str(output)
735
quilesj26c78a42019-10-28 18:10:42 +0100736 if rc != 0:
737 return None
738
739 data = yaml.load(output, Loader=yaml.SafeLoader)
740
741 # remove field 'notes'
742 try:
743 del data.get('info').get('status')['notes']
744 except KeyError:
745 pass
746
747 # parse field 'resources'
748 try:
749 resources = str(data.get('info').get('status').get('resources'))
750 resource_table = self._output_to_table(resources)
751 data.get('info').get('status')['resources'] = resource_table
752 except Exception as e:
753 pass
754
755 return data
756
quilesj26c78a42019-10-28 18:10:42 +0100757 async def get_instance_info(
758 self,
759 cluster_uuid: str,
760 kdu_instance: str
761 ):
762 instances = await self.instances_list(cluster_uuid=cluster_uuid)
763 for instance in instances:
764 if instance.get('Name') == kdu_instance:
765 return instance
766 self.debug('Instance {} not found'.format(kdu_instance))
767 return None
768
769 @staticmethod
770 def _generate_release_name(
771 chart_name: str
772 ):
quilesjbc355a12020-01-23 09:28:26 +0000773 # check embeded chart (file or dir)
774 if chart_name.startswith('/'):
775 # extract file or directory name
776 chart_name = chart_name[chart_name.rfind('/')+1:]
777 # check URL
778 elif '://' in chart_name:
779 # extract last portion of URL
780 chart_name = chart_name[chart_name.rfind('/')+1:]
781
quilesj26c78a42019-10-28 18:10:42 +0100782 name = ''
783 for c in chart_name:
784 if c.isalpha() or c.isnumeric():
785 name += c
786 else:
787 name += '-'
788 if len(name) > 35:
789 name = name[0:35]
790
791 # if does not start with alpha character, prefix 'a'
792 if not name[0].isalpha():
793 name = 'a' + name
794
795 name += '-'
796
797 def get_random_number():
798 r = random.randrange(start=1, stop=99999999)
799 s = str(r)
quilesja6748412019-12-04 07:51:26 +0000800 s = s.rjust(10, '0')
quilesj26c78a42019-10-28 18:10:42 +0100801 return s
802
803 name = name + get_random_number()
804 return name.lower()
805
806 async def _store_status(
807 self,
808 cluster_uuid: str,
809 operation: str,
810 kdu_instance: str,
811 check_every: float = 10,
812 db_dict: dict = None,
813 run_once: bool = False
814 ):
815 while True:
816 try:
817 await asyncio.sleep(check_every)
818 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
819 status = detailed_status.get('info').get('Description')
820 print('=' * 60)
821 self.debug('STATUS:\n{}'.format(status))
822 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
823 print('=' * 60)
824 # write status to db
825 result = await self.write_app_status_to_db(
826 db_dict=db_dict,
827 status=str(status),
828 detailed_status=str(detailed_status),
829 operation=operation)
830 if not result:
831 self.info('Error writing in database. Task exiting...')
832 return
833 except asyncio.CancelledError:
834 self.debug('Task cancelled')
835 return
836 except Exception as e:
837 pass
838 finally:
839 if run_once:
840 return
841
842 async def _is_install_completed(
843 self,
844 cluster_uuid: str,
845 kdu_instance: str
846 ) -> bool:
847
quilesj1be06302019-11-29 11:17:11 +0000848 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
quilesj26c78a42019-10-28 18:10:42 +0100849
850 # extract info.status.resources-> str
851 # format:
852 # ==> v1/Deployment
853 # NAME READY UP-TO-DATE AVAILABLE AGE
854 # halting-horse-mongodb 0/1 1 0 0s
855 # halting-petit-mongodb 1/1 1 0 0s
856 # blank line
857 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
858
859 # convert to table
860 resources = K8sHelmConnector._output_to_table(resources)
861
862 num_lines = len(resources)
863 index = 0
864 while index < num_lines:
865 try:
866 line1 = resources[index]
867 index += 1
868 # find '==>' in column 0
869 if line1[0] == '==>':
870 line2 = resources[index]
871 index += 1
872 # find READY in column 1
873 if line2[1] == 'READY':
874 # read next lines
875 line3 = resources[index]
876 index += 1
877 while len(line3) > 1 and index < num_lines:
878 ready_value = line3[1]
879 parts = ready_value.split(sep='/')
880 current = int(parts[0])
881 total = int(parts[1])
882 if current < total:
883 self.debug('NOT READY:\n {}'.format(line3))
884 ready = False
885 line3 = resources[index]
886 index += 1
887
888 except Exception as e:
889 pass
890
891 return ready
892
893 @staticmethod
894 def _get_deep(dictionary: dict, members: tuple):
895 target = dictionary
896 value = None
897 try:
898 for m in members:
899 value = target.get(m)
900 if not value:
901 return None
902 else:
903 target = value
904 except Exception as e:
905 pass
906 return value
907
908 # find key:value in several lines
909 @staticmethod
910 def _find_in_lines(p_lines: list, p_key: str) -> str:
911 for line in p_lines:
912 try:
913 if line.startswith(p_key + ':'):
914 parts = line.split(':')
915 the_value = parts[1].strip()
916 return the_value
917 except Exception as e:
918 # ignore it
919 pass
920 return None
921
quilesjcda5f412019-11-18 11:32:12 +0100922 # params for use in -f file
923 # returns values file option and filename (in order to delete it at the end)
924 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
quilesjcda5f412019-11-18 11:32:12 +0100925
926 if params and len(params) > 0:
927 kube_dir, helm_dir, config_filename, cluster_dir = \
928 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
929
930 def get_random_number():
931 r = random.randrange(start=1, stop=99999999)
932 s = str(r)
933 while len(s) < 10:
934 s = '0' + s
935 return s
936
937 params2 = dict()
938 for key in params:
939 value = params.get(key)
940 if '!!yaml' in str(value):
quilesj1be06302019-11-29 11:17:11 +0000941 value = yaml.load(value[7:])
quilesjcda5f412019-11-18 11:32:12 +0100942 params2[key] = value
943
944 values_file = get_random_number() + '.yaml'
945 with open(values_file, 'w') as stream:
946 yaml.dump(params2, stream, indent=4, default_flow_style=False)
947
948 return '-f {}'.format(values_file), values_file
949
950 return '', None
951
quilesj26c78a42019-10-28 18:10:42 +0100952 # params for use in --set option
953 @staticmethod
954 def _params_to_set_option(params: dict) -> str:
955 params_str = ''
956 if params and len(params) > 0:
957 start = True
958 for key in params:
959 value = params.get(key, None)
960 if value is not None:
961 if start:
962 params_str += '--set '
963 start = False
964 else:
965 params_str += ','
966 params_str += '{}={}'.format(key, value)
967 return params_str
968
969 @staticmethod
970 def _output_to_lines(output: str) -> list:
971 output_lines = list()
972 lines = output.splitlines(keepends=False)
973 for line in lines:
974 line = line.strip()
975 if len(line) > 0:
976 output_lines.append(line)
977 return output_lines
978
979 @staticmethod
980 def _output_to_table(output: str) -> list:
981 output_table = list()
982 lines = output.splitlines(keepends=False)
983 for line in lines:
984 line = line.replace('\t', ' ')
985 line_list = list()
986 output_table.append(line_list)
987 cells = line.split(sep=' ')
988 for cell in cells:
989 cell = cell.strip()
990 if len(cell) > 0:
991 line_list.append(cell)
992 return output_table
993
quilesjcda5f412019-11-18 11:32:12 +0100994 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
quilesj26c78a42019-10-28 18:10:42 +0100995 """
996 Returns kube and helm directories
997
998 :param cluster_name:
999 :param create_if_not_exist:
quilesjcda5f412019-11-18 11:32:12 +01001000 :return: kube, helm directories, config filename and cluster dir.
1001 Raises exception if not exist and cannot create
quilesj26c78a42019-10-28 18:10:42 +01001002 """
1003
1004 base = self.fs.path
1005 if base.endswith("/") or base.endswith("\\"):
1006 base = base[:-1]
1007
1008 # base dir for cluster
1009 cluster_dir = base + '/' + cluster_name
1010 if create_if_not_exist and not os.path.exists(cluster_dir):
1011 self.debug('Creating dir {}'.format(cluster_dir))
1012 os.makedirs(cluster_dir)
1013 if not os.path.exists(cluster_dir):
1014 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
1015 self.error(msg)
quilesja6748412019-12-04 07:51:26 +00001016 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001017
1018 # kube dir
1019 kube_dir = cluster_dir + '/' + '.kube'
1020 if create_if_not_exist and not os.path.exists(kube_dir):
1021 self.debug('Creating dir {}'.format(kube_dir))
1022 os.makedirs(kube_dir)
1023 if not os.path.exists(kube_dir):
1024 msg = 'Kube config dir {} does not exist'.format(kube_dir)
1025 self.error(msg)
quilesja6748412019-12-04 07:51:26 +00001026 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001027
1028 # helm home dir
1029 helm_dir = cluster_dir + '/' + '.helm'
1030 if create_if_not_exist and not os.path.exists(helm_dir):
1031 self.debug('Creating dir {}'.format(helm_dir))
1032 os.makedirs(helm_dir)
1033 if not os.path.exists(helm_dir):
1034 msg = 'Helm config dir {} does not exist'.format(helm_dir)
1035 self.error(msg)
quilesja6748412019-12-04 07:51:26 +00001036 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001037
1038 config_filename = kube_dir + '/config'
quilesjcda5f412019-11-18 11:32:12 +01001039 return kube_dir, helm_dir, config_filename, cluster_dir
quilesj26c78a42019-10-28 18:10:42 +01001040
1041 @staticmethod
1042 def _remove_multiple_spaces(str):
1043 str = str.strip()
1044 while ' ' in str:
1045 str = str.replace(' ', ' ')
1046 return str
1047
1048 def _local_exec(
1049 self,
1050 command: str
1051 ) -> (str, int):
1052 command = K8sHelmConnector._remove_multiple_spaces(command)
1053 self.debug('Executing sync local command: {}'.format(command))
1054 # raise exception if fails
1055 output = ''
1056 try:
1057 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1058 return_code = 0
1059 self.debug(output)
1060 except Exception as e:
1061 return_code = 1
1062
1063 return output, return_code
1064
1065 async def _local_async_exec(
1066 self,
1067 command: str,
1068 raise_exception_on_error: bool = False,
quilesj1be06302019-11-29 11:17:11 +00001069 show_error_log: bool = True,
1070 encode_utf8: bool = False
quilesj26c78a42019-10-28 18:10:42 +01001071 ) -> (str, int):
1072
1073 command = K8sHelmConnector._remove_multiple_spaces(command)
1074 self.debug('Executing async local command: {}'.format(command))
1075
1076 # split command
1077 command = command.split(sep=' ')
1078
1079 try:
1080 process = await asyncio.create_subprocess_exec(
1081 *command,
1082 stdout=asyncio.subprocess.PIPE,
1083 stderr=asyncio.subprocess.PIPE
1084 )
1085
1086 # wait for command terminate
1087 stdout, stderr = await process.communicate()
1088
1089 return_code = process.returncode
1090
1091 output = ''
1092 if stdout:
1093 output = stdout.decode('utf-8').strip()
quilesj1be06302019-11-29 11:17:11 +00001094 # output = stdout.decode()
quilesj26c78a42019-10-28 18:10:42 +01001095 if stderr:
1096 output = stderr.decode('utf-8').strip()
quilesj1be06302019-11-29 11:17:11 +00001097 # output = stderr.decode()
quilesj26c78a42019-10-28 18:10:42 +01001098
1099 if return_code != 0 and show_error_log:
1100 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1101 else:
1102 self.debug('Return code: {}'.format(return_code))
1103
1104 if raise_exception_on_error and return_code != 0:
1105 raise Exception(output)
1106
quilesj1be06302019-11-29 11:17:11 +00001107 if encode_utf8:
1108 output = output.encode('utf-8').strip()
1109 output = str(output).replace('\\n', '\n')
1110
quilesj26c78a42019-10-28 18:10:42 +01001111 return output, return_code
1112
1113 except Exception as e:
1114 msg = 'Exception executing command: {} -> {}'.format(command, e)
1115 if show_error_log:
1116 self.error(msg)
1117 return '', -1
1118
1119 def _remote_exec(
1120 self,
1121 hostname: str,
1122 username: str,
1123 password: str,
1124 command: str,
1125 timeout: int = 10
1126 ) -> (str, int):
1127
1128 command = K8sHelmConnector._remove_multiple_spaces(command)
1129 self.debug('Executing sync remote ssh command: {}'.format(command))
1130
1131 ssh = paramiko.SSHClient()
1132 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1133 ssh.connect(hostname=hostname, username=username, password=password)
1134 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1135 output = ssh_stdout.read().decode('utf-8')
1136 error = ssh_stderr.read().decode('utf-8')
1137 if error:
1138 self.error('ERROR: {}'.format(error))
1139 return_code = 1
1140 else:
1141 return_code = 0
1142 output = output.replace('\\n', '\n')
1143 self.debug('OUTPUT: {}'.format(output))
1144
1145 return output, return_code
1146
1147 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1148 self.debug('Checking if file {} exists...'.format(filename))
1149 if os.path.exists(filename):
1150 return True
1151 else:
1152 msg = 'File {} does not exist'.format(filename)
1153 if exception_if_not_exists:
1154 self.error(msg)
quilesja6748412019-12-04 07:51:26 +00001155 raise K8sException(msg)