blob: 01b448b7e97350823f4c7cae81bb0a4874495bea [file] [log] [blame]
quilesj26c78a42019-10-28 18:10:42 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22
23import paramiko
24import subprocess
25import os
26import shutil
27import asyncio
quilesj26c78a42019-10-28 18:10:42 +010028import time
29import yaml
30from uuid import uuid4
31import random
32from n2vc.k8s_conn import K8sConnector
quilesja6748412019-12-04 07:51:26 +000033from n2vc.exceptions import K8sException
quilesj26c78a42019-10-28 18:10:42 +010034
35
36class K8sHelmConnector(K8sConnector):
37
38 """
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = '/usr/bin/kubectl',
49 helm_command: str = '/usr/bin/helm',
50 log: object = None,
51 on_update_db=None
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(
65 self,
66 db=db,
67 log=log,
68 on_update_db=on_update_db
69 )
70
71 self.info('Initializing K8S Helm connector')
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
quilesj1be06302019-11-29 11:17:11 +000087 # initialize helm client-only
88 self.debug('Initializing helm client-only...')
89 command = '{} init --client-only'.format(self._helm_command)
90 try:
91 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
92 # loop = asyncio.get_event_loop()
93 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
94 except Exception as e:
95 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
96
quilesj26c78a42019-10-28 18:10:42 +010097 self.info('K8S Helm connector initialized')
98
99 async def init_env(
100 self,
101 k8s_creds: str,
102 namespace: str = 'kube-system',
103 reuse_cluster_uuid=None
104 ) -> (str, bool):
garciadeblas2ce889d2019-12-13 13:39:03 +0100105 """
106 It prepares a given K8s cluster environment to run Charts on both sides:
107 client (OSM)
108 server (Tiller)
109
110 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
111 :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
114 (on error, an exception will be raised)
115 """
quilesj26c78a42019-10-28 18:10:42 +0100116
117 cluster_uuid = reuse_cluster_uuid
118 if not cluster_uuid:
119 cluster_uuid = str(uuid4())
120
121 self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
122
123 # create config filename
quilesjcda5f412019-11-18 11:32:12 +0100124 kube_dir, helm_dir, config_filename, cluster_dir = \
125 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100126 f = open(config_filename, "w")
127 f.write(k8s_creds)
128 f.close()
129
130 # check if tiller pod is up in cluster
131 command = '{} --kubeconfig={} --namespace={} get deployments'\
132 .format(self.kubectl_command, config_filename, namespace)
133 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
134
135 output_table = K8sHelmConnector._output_to_table(output=output)
136
137 # find 'tiller' pod in all pods
138 already_initialized = False
139 try:
140 for row in output_table:
141 if row[0].startswith('tiller-deploy'):
142 already_initialized = True
143 break
144 except Exception as e:
145 pass
146
147 # helm init
148 n2vc_installed_sw = False
149 if not already_initialized:
150 self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
151 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
152 .format(self._helm_command, config_filename, namespace, helm_dir)
153 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
154 n2vc_installed_sw = True
155 else:
156 # check client helm installation
157 check_file = helm_dir + '/repository/repositories.yaml'
158 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
159 self.info('Initializing helm in client: {}'.format(cluster_uuid))
160 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
161 .format(self._helm_command, config_filename, namespace, helm_dir)
162 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
163 else:
164 self.info('Helm client already initialized')
165
166 self.info('Cluster initialized {}'.format(cluster_uuid))
167
168 return cluster_uuid, n2vc_installed_sw
169
170 async def repo_add(
171 self,
172 cluster_uuid: str,
173 name: str,
174 url: str,
175 repo_type: str = 'chart'
176 ):
177
178 self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
179
180 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100181 kube_dir, helm_dir, config_filename, cluster_dir = \
182 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100183
184 # helm repo update
185 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
186 self.debug('updating repo: {}'.format(command))
187 await self._local_async_exec(command=command, raise_exception_on_error=False)
188
189 # helm repo add name url
190 command = '{} --kubeconfig={} --home={} repo add {} {}'\
191 .format(self._helm_command, config_filename, helm_dir, name, url)
192 self.debug('adding repo: {}'.format(command))
193 await self._local_async_exec(command=command, raise_exception_on_error=True)
194
195 async def repo_list(
196 self,
197 cluster_uuid: str
198 ) -> list:
199 """
200 Get the list of registered repositories
201
202 :return: list of registered repositories: [ (name, url) .... ]
203 """
204
205 self.debug('list repositories for cluster {}'.format(cluster_uuid))
206
207 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100208 kube_dir, helm_dir, config_filename, cluster_dir = \
209 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100210
quilesj1be06302019-11-29 11:17:11 +0000211 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
212 .format(self._helm_command, config_filename, helm_dir)
quilesj26c78a42019-10-28 18:10:42 +0100213
214 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
215 if output and len(output) > 0:
216 return yaml.load(output, Loader=yaml.SafeLoader)
217 else:
218 return []
219
220 async def repo_remove(
221 self,
222 cluster_uuid: str,
223 name: str
224 ):
225 """
226 Remove a repository from OSM
227
228 :param cluster_uuid: the cluster
229 :param name: repo name in OSM
230 :return: True if successful
231 """
232
233 self.debug('list repositories for cluster {}'.format(cluster_uuid))
234
235 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100236 kube_dir, helm_dir, config_filename, cluster_dir = \
237 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100238
239 command = '{} --kubeconfig={} --home={} repo remove {}'\
240 .format(self._helm_command, config_filename, helm_dir, name)
241
242 await self._local_async_exec(command=command, raise_exception_on_error=True)
243
244 async def reset(
245 self,
246 cluster_uuid: str,
247 force: bool = False,
248 uninstall_sw: bool = False
249 ) -> bool:
250
251 self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
252
253 # get kube and helm directories
quilesjcda5f412019-11-18 11:32:12 +0100254 kube_dir, helm_dir, config_filename, cluster_dir = \
255 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
quilesj26c78a42019-10-28 18:10:42 +0100256
257 # uninstall releases if needed
258 releases = await self.instances_list(cluster_uuid=cluster_uuid)
259 if len(releases) > 0:
260 if force:
261 for r in releases:
262 try:
263 kdu_instance = r.get('Name')
264 chart = r.get('Chart')
265 self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
266 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
267 except Exception as e:
268 self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
269 else:
270 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
271 .format(cluster_uuid)
272 self.error(msg)
quilesja6748412019-12-04 07:51:26 +0000273 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100274
275 if uninstall_sw:
276
277 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
278
279 # find namespace for tiller pod
280 command = '{} --kubeconfig={} get deployments --all-namespaces'\
281 .format(self.kubectl_command, config_filename)
282 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
283 output_table = K8sHelmConnector._output_to_table(output=output)
284 namespace = None
285 for r in output_table:
286 try:
287 if 'tiller-deploy' in r[1]:
288 namespace = r[0]
289 break
290 except Exception as e:
291 pass
292 else:
293 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
294 self.error(msg)
quilesj26c78a42019-10-28 18:10:42 +0100295
296 self.debug('namespace for tiller: {}'.format(namespace))
297
298 force_str = '--force'
299
300 if namespace:
301 # delete tiller deployment
302 self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
303 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
304 .format(self.kubectl_command, namespace, config_filename, force_str)
305 await self._local_async_exec(command=command, raise_exception_on_error=False)
306
307 # uninstall tiller from cluster
308 self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
309 command = '{} --kubeconfig={} --home={} reset'\
310 .format(self._helm_command, config_filename, helm_dir)
311 self.debug('resetting: {}'.format(command))
312 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
313 else:
314 self.debug('namespace not found')
315
316 # delete cluster directory
317 dir = self.fs.path + '/' + cluster_uuid
318 self.debug('Removing directory {}'.format(dir))
319 shutil.rmtree(dir, ignore_errors=True)
320
321 return True
322
323 async def install(
324 self,
325 cluster_uuid: str,
326 kdu_model: str,
327 atomic: bool = True,
328 timeout: float = 300,
329 params: dict = None,
330 db_dict: dict = None
331 ):
332
333 self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
334
quilesj26c78a42019-10-28 18:10:42 +0100335 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100336 kube_dir, helm_dir, config_filename, cluster_dir = \
337 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100338
339 # params to str
quilesjcda5f412019-11-18 11:32:12 +0100340 # params_str = K8sHelmConnector._params_to_set_option(params)
341 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
quilesj26c78a42019-10-28 18:10:42 +0100342
343 timeout_str = ''
344 if timeout:
345 timeout_str = '--timeout {}'.format(timeout)
346
347 # atomic
348 atomic_str = ''
349 if atomic:
350 atomic_str = '--atomic'
351
352 # version
353 version_str = ''
354 if ':' in kdu_model:
355 parts = kdu_model.split(sep=':')
356 if len(parts) == 2:
357 version_str = '--version {}'.format(parts[1])
358 kdu_model = parts[0]
359
quilesja6748412019-12-04 07:51:26 +0000360 # generate a name for the release. Then, check if already exists
quilesj26c78a42019-10-28 18:10:42 +0100361 kdu_instance = None
362 while kdu_instance is None:
363 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
364 try:
365 result = await self._status_kdu(
366 cluster_uuid=cluster_uuid,
367 kdu_instance=kdu_instance,
368 show_error_log=False
369 )
370 if result is not None:
371 # instance already exists: generate a new one
372 kdu_instance = None
quilesj1be06302019-11-29 11:17:11 +0000373 except Exception as e:
quilesj26c78a42019-10-28 18:10:42 +0100374 kdu_instance = None
375
376 # helm repo install
377 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
378 .format(self._helm_command, atomic_str, config_filename, helm_dir,
379 params_str, timeout_str, kdu_instance, kdu_model, version_str)
380 self.debug('installing: {}'.format(command))
381
382 if atomic:
383 # exec helm in a task
384 exec_task = asyncio.ensure_future(
385 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
386 )
387 # write status in another task
388 status_task = asyncio.ensure_future(
389 coro_or_future=self._store_status(
390 cluster_uuid=cluster_uuid,
391 kdu_instance=kdu_instance,
392 db_dict=db_dict,
393 operation='install',
394 run_once=False
395 )
396 )
397
398 # wait for execution task
399 await asyncio.wait([exec_task])
400
401 # cancel status task
402 status_task.cancel()
403
404 output, rc = exec_task.result()
405
406 else:
407
408 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
409
quilesjcda5f412019-11-18 11:32:12 +0100410 # remove temporal values yaml file
411 if file_to_delete:
412 os.remove(file_to_delete)
413
quilesj26c78a42019-10-28 18:10:42 +0100414 # write final status
415 await self._store_status(
416 cluster_uuid=cluster_uuid,
417 kdu_instance=kdu_instance,
418 db_dict=db_dict,
419 operation='install',
420 run_once=True,
421 check_every=0
422 )
423
424 if rc != 0:
425 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
426 self.error(msg)
quilesja6748412019-12-04 07:51:26 +0000427 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100428
429 self.debug('Returning kdu_instance {}'.format(kdu_instance))
430 return kdu_instance
431
432 async def instances_list(
433 self,
434 cluster_uuid: str
435 ) -> list:
436 """
437 returns a list of deployed releases in a cluster
438
439 :param cluster_uuid: the cluster
440 :return:
441 """
442
443 self.debug('list releases for cluster {}'.format(cluster_uuid))
444
445 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100446 kube_dir, helm_dir, config_filename, cluster_dir = \
447 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100448
449 command = '{} --kubeconfig={} --home={} list --output yaml'\
450 .format(self._helm_command, config_filename, helm_dir)
451
452 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
453
454 if output and len(output) > 0:
455 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
456 else:
457 return []
458
459 async def upgrade(
460 self,
461 cluster_uuid: str,
462 kdu_instance: str,
463 kdu_model: str = None,
464 atomic: bool = True,
465 timeout: float = 300,
466 params: dict = None,
467 db_dict: dict = None
468 ):
469
470 self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
471
quilesj26c78a42019-10-28 18:10:42 +0100472 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100473 kube_dir, helm_dir, config_filename, cluster_dir = \
474 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100475
476 # params to str
quilesjcda5f412019-11-18 11:32:12 +0100477 # params_str = K8sHelmConnector._params_to_set_option(params)
478 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
quilesj26c78a42019-10-28 18:10:42 +0100479
480 timeout_str = ''
481 if timeout:
482 timeout_str = '--timeout {}'.format(timeout)
483
484 # atomic
485 atomic_str = ''
486 if atomic:
487 atomic_str = '--atomic'
488
489 # version
490 version_str = ''
quilesjcda5f412019-11-18 11:32:12 +0100491 if kdu_model and ':' in kdu_model:
quilesj26c78a42019-10-28 18:10:42 +0100492 parts = kdu_model.split(sep=':')
493 if len(parts) == 2:
494 version_str = '--version {}'.format(parts[1])
495 kdu_model = parts[0]
496
497 # helm repo upgrade
498 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
499 .format(self._helm_command, atomic_str, config_filename, helm_dir,
500 params_str, timeout_str, kdu_instance, kdu_model, version_str)
501 self.debug('upgrading: {}'.format(command))
502
503 if atomic:
504
505 # exec helm in a task
506 exec_task = asyncio.ensure_future(
507 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
508 )
509 # write status in another task
510 status_task = asyncio.ensure_future(
511 coro_or_future=self._store_status(
512 cluster_uuid=cluster_uuid,
513 kdu_instance=kdu_instance,
514 db_dict=db_dict,
515 operation='upgrade',
516 run_once=False
517 )
518 )
519
520 # wait for execution task
quilesj1be06302019-11-29 11:17:11 +0000521 await asyncio.wait([exec_task])
quilesj26c78a42019-10-28 18:10:42 +0100522
523 # cancel status task
524 status_task.cancel()
525 output, rc = exec_task.result()
526
527 else:
528
529 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
530
quilesjcda5f412019-11-18 11:32:12 +0100531 # remove temporal values yaml file
532 if file_to_delete:
533 os.remove(file_to_delete)
534
quilesj26c78a42019-10-28 18:10:42 +0100535 # write final status
536 await self._store_status(
537 cluster_uuid=cluster_uuid,
538 kdu_instance=kdu_instance,
539 db_dict=db_dict,
540 operation='upgrade',
541 run_once=True,
542 check_every=0
543 )
544
545 if rc != 0:
546 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
547 self.error(msg)
quilesja6748412019-12-04 07:51:26 +0000548 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100549
550 # return new revision number
551 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
552 if instance:
553 revision = int(instance.get('Revision'))
554 self.debug('New revision: {}'.format(revision))
555 return revision
556 else:
557 return 0
558
559 async def rollback(
560 self,
561 cluster_uuid: str,
562 kdu_instance: str,
563 revision=0,
564 db_dict: dict = None
565 ):
566
567 self.debug('rollback kdu_instance {} to revision {} from cluster {}'
568 .format(kdu_instance, revision, cluster_uuid))
569
570 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100571 kube_dir, helm_dir, config_filename, cluster_dir = \
572 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100573
574 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
575 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
576
577 # exec helm in a task
578 exec_task = asyncio.ensure_future(
579 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
580 )
581 # write status in another task
582 status_task = asyncio.ensure_future(
583 coro_or_future=self._store_status(
584 cluster_uuid=cluster_uuid,
585 kdu_instance=kdu_instance,
586 db_dict=db_dict,
587 operation='rollback',
588 run_once=False
589 )
590 )
591
592 # wait for execution task
593 await asyncio.wait([exec_task])
594
595 # cancel status task
596 status_task.cancel()
597
598 output, rc = exec_task.result()
599
600 # write final status
601 await self._store_status(
602 cluster_uuid=cluster_uuid,
603 kdu_instance=kdu_instance,
604 db_dict=db_dict,
605 operation='rollback',
606 run_once=True,
607 check_every=0
608 )
609
610 if rc != 0:
611 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
612 self.error(msg)
quilesja6748412019-12-04 07:51:26 +0000613 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100614
615 # return new revision number
616 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
617 if instance:
618 revision = int(instance.get('Revision'))
619 self.debug('New revision: {}'.format(revision))
620 return revision
621 else:
622 return 0
623
624 async def uninstall(
625 self,
626 cluster_uuid: str,
627 kdu_instance: str
628 ):
629 """
630 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
631 after all _terminate-config-primitive_ of the VNF are invoked).
632
633 :param cluster_uuid: UUID of a K8s cluster known by OSM
634 :param kdu_instance: unique name for the KDU instance to be deleted
635 :return: True if successful
636 """
637
638 self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
639
640 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100641 kube_dir, helm_dir, config_filename, cluster_dir = \
642 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100643
644 command = '{} --kubeconfig={} --home={} delete --purge {}'\
645 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
646
647 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
648
649 return self._output_to_table(output)
650
651 async def inspect_kdu(
652 self,
quilesj1be06302019-11-29 11:17:11 +0000653 kdu_model: str,
654 repo_url: str = None
quilesj26c78a42019-10-28 18:10:42 +0100655 ) -> str:
656
quilesj1be06302019-11-29 11:17:11 +0000657 self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
quilesj26c78a42019-10-28 18:10:42 +0100658
quilesj1be06302019-11-29 11:17:11 +0000659 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100660
quilesj1be06302019-11-29 11:17:11 +0000661 async def values_kdu(
662 self,
663 kdu_model: str,
664 repo_url: str = None
665 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100666
quilesj1be06302019-11-29 11:17:11 +0000667 self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
668
669 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100670
671 async def help_kdu(
672 self,
quilesj1be06302019-11-29 11:17:11 +0000673 kdu_model: str,
674 repo_url: str = None
675 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100676
quilesj1be06302019-11-29 11:17:11 +0000677 self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
quilesj26c78a42019-10-28 18:10:42 +0100678
quilesj1be06302019-11-29 11:17:11 +0000679 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100680
681 async def status_kdu(
682 self,
683 cluster_uuid: str,
684 kdu_instance: str
quilesj1be06302019-11-29 11:17:11 +0000685 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100686
quilesj1be06302019-11-29 11:17:11 +0000687 # call internal function
688 return await self._status_kdu(
689 cluster_uuid=cluster_uuid,
690 kdu_instance=kdu_instance,
691 show_error_log=True,
692 return_text=True
693 )
quilesj26c78a42019-10-28 18:10:42 +0100694
695 """
696 ##################################################################################################
697 ########################################## P R I V A T E #########################################
698 ##################################################################################################
699 """
700
quilesj1be06302019-11-29 11:17:11 +0000701 async def _exec_inspect_comand(
702 self,
703 inspect_command: str,
704 kdu_model: str,
705 repo_url: str = None
706 ):
707
708 repo_str = ''
709 if repo_url:
710 repo_str = ' --repo {}'.format(repo_url)
711 idx = kdu_model.find('/')
712 if idx >= 0:
713 idx += 1
714 kdu_model = kdu_model[idx:]
715
716 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
717 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
718
719 return output
720
quilesj26c78a42019-10-28 18:10:42 +0100721 async def _status_kdu(
722 self,
723 cluster_uuid: str,
724 kdu_instance: str,
quilesj1be06302019-11-29 11:17:11 +0000725 show_error_log: bool = False,
726 return_text: bool = False
quilesj26c78a42019-10-28 18:10:42 +0100727 ):
728
729 self.debug('status of kdu_instance {}'.format(kdu_instance))
730
731 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100732 kube_dir, helm_dir, config_filename, cluster_dir = \
733 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100734
735 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
736 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
737
738 output, rc = await self._local_async_exec(
739 command=command,
740 raise_exception_on_error=True,
741 show_error_log=show_error_log
742 )
743
quilesj1be06302019-11-29 11:17:11 +0000744 if return_text:
745 return str(output)
746
quilesj26c78a42019-10-28 18:10:42 +0100747 if rc != 0:
748 return None
749
750 data = yaml.load(output, Loader=yaml.SafeLoader)
751
752 # remove field 'notes'
753 try:
754 del data.get('info').get('status')['notes']
755 except KeyError:
756 pass
757
758 # parse field 'resources'
759 try:
760 resources = str(data.get('info').get('status').get('resources'))
761 resource_table = self._output_to_table(resources)
762 data.get('info').get('status')['resources'] = resource_table
763 except Exception as e:
764 pass
765
766 return data
767
quilesj26c78a42019-10-28 18:10:42 +0100768 async def get_instance_info(
769 self,
770 cluster_uuid: str,
771 kdu_instance: str
772 ):
773 instances = await self.instances_list(cluster_uuid=cluster_uuid)
774 for instance in instances:
775 if instance.get('Name') == kdu_instance:
776 return instance
777 self.debug('Instance {} not found'.format(kdu_instance))
778 return None
779
780 @staticmethod
781 def _generate_release_name(
782 chart_name: str
783 ):
quilesj8d780a92020-01-23 09:28:26 +0000784 # check embeded chart (file or dir)
785 if chart_name.startswith('/'):
786 # extract file or directory name
787 chart_name = chart_name[chart_name.rfind('/')+1:]
788 # check URL
789 elif '://' in chart_name:
790 # extract last portion of URL
791 chart_name = chart_name[chart_name.rfind('/')+1:]
792
quilesj26c78a42019-10-28 18:10:42 +0100793 name = ''
794 for c in chart_name:
795 if c.isalpha() or c.isnumeric():
796 name += c
797 else:
798 name += '-'
799 if len(name) > 35:
800 name = name[0:35]
801
802 # if does not start with alpha character, prefix 'a'
803 if not name[0].isalpha():
804 name = 'a' + name
805
806 name += '-'
807
808 def get_random_number():
809 r = random.randrange(start=1, stop=99999999)
810 s = str(r)
quilesja6748412019-12-04 07:51:26 +0000811 s = s.rjust(10, '0')
quilesj26c78a42019-10-28 18:10:42 +0100812 return s
813
814 name = name + get_random_number()
815 return name.lower()
816
817 async def _store_status(
818 self,
819 cluster_uuid: str,
820 operation: str,
821 kdu_instance: str,
822 check_every: float = 10,
823 db_dict: dict = None,
824 run_once: bool = False
825 ):
826 while True:
827 try:
828 await asyncio.sleep(check_every)
829 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
830 status = detailed_status.get('info').get('Description')
831 print('=' * 60)
832 self.debug('STATUS:\n{}'.format(status))
833 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
834 print('=' * 60)
835 # write status to db
836 result = await self.write_app_status_to_db(
837 db_dict=db_dict,
838 status=str(status),
839 detailed_status=str(detailed_status),
840 operation=operation)
841 if not result:
842 self.info('Error writing in database. Task exiting...')
843 return
844 except asyncio.CancelledError:
845 self.debug('Task cancelled')
846 return
847 except Exception as e:
848 pass
849 finally:
850 if run_once:
851 return
852
853 async def _is_install_completed(
854 self,
855 cluster_uuid: str,
856 kdu_instance: str
857 ) -> bool:
858
quilesj1be06302019-11-29 11:17:11 +0000859 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
quilesj26c78a42019-10-28 18:10:42 +0100860
861 # extract info.status.resources-> str
862 # format:
863 # ==> v1/Deployment
864 # NAME READY UP-TO-DATE AVAILABLE AGE
865 # halting-horse-mongodb 0/1 1 0 0s
866 # halting-petit-mongodb 1/1 1 0 0s
867 # blank line
868 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
869
870 # convert to table
871 resources = K8sHelmConnector._output_to_table(resources)
872
873 num_lines = len(resources)
874 index = 0
875 while index < num_lines:
876 try:
877 line1 = resources[index]
878 index += 1
879 # find '==>' in column 0
880 if line1[0] == '==>':
881 line2 = resources[index]
882 index += 1
883 # find READY in column 1
884 if line2[1] == 'READY':
885 # read next lines
886 line3 = resources[index]
887 index += 1
888 while len(line3) > 1 and index < num_lines:
889 ready_value = line3[1]
890 parts = ready_value.split(sep='/')
891 current = int(parts[0])
892 total = int(parts[1])
893 if current < total:
894 self.debug('NOT READY:\n {}'.format(line3))
895 ready = False
896 line3 = resources[index]
897 index += 1
898
899 except Exception as e:
900 pass
901
902 return ready
903
904 @staticmethod
905 def _get_deep(dictionary: dict, members: tuple):
906 target = dictionary
907 value = None
908 try:
909 for m in members:
910 value = target.get(m)
911 if not value:
912 return None
913 else:
914 target = value
915 except Exception as e:
916 pass
917 return value
918
919 # find key:value in several lines
920 @staticmethod
921 def _find_in_lines(p_lines: list, p_key: str) -> str:
922 for line in p_lines:
923 try:
924 if line.startswith(p_key + ':'):
925 parts = line.split(':')
926 the_value = parts[1].strip()
927 return the_value
928 except Exception as e:
929 # ignore it
930 pass
931 return None
932
quilesjcda5f412019-11-18 11:32:12 +0100933 # params for use in -f file
934 # returns values file option and filename (in order to delete it at the end)
935 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
quilesjcda5f412019-11-18 11:32:12 +0100936
937 if params and len(params) > 0:
938 kube_dir, helm_dir, config_filename, cluster_dir = \
939 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
940
941 def get_random_number():
942 r = random.randrange(start=1, stop=99999999)
943 s = str(r)
944 while len(s) < 10:
945 s = '0' + s
946 return s
947
948 params2 = dict()
949 for key in params:
950 value = params.get(key)
951 if '!!yaml' in str(value):
quilesj1be06302019-11-29 11:17:11 +0000952 value = yaml.load(value[7:])
quilesjcda5f412019-11-18 11:32:12 +0100953 params2[key] = value
954
955 values_file = get_random_number() + '.yaml'
956 with open(values_file, 'w') as stream:
957 yaml.dump(params2, stream, indent=4, default_flow_style=False)
958
959 return '-f {}'.format(values_file), values_file
960
961 return '', None
962
quilesj26c78a42019-10-28 18:10:42 +0100963 # params for use in --set option
964 @staticmethod
965 def _params_to_set_option(params: dict) -> str:
966 params_str = ''
967 if params and len(params) > 0:
968 start = True
969 for key in params:
970 value = params.get(key, None)
971 if value is not None:
972 if start:
973 params_str += '--set '
974 start = False
975 else:
976 params_str += ','
977 params_str += '{}={}'.format(key, value)
978 return params_str
979
980 @staticmethod
981 def _output_to_lines(output: str) -> list:
982 output_lines = list()
983 lines = output.splitlines(keepends=False)
984 for line in lines:
985 line = line.strip()
986 if len(line) > 0:
987 output_lines.append(line)
988 return output_lines
989
990 @staticmethod
991 def _output_to_table(output: str) -> list:
992 output_table = list()
993 lines = output.splitlines(keepends=False)
994 for line in lines:
995 line = line.replace('\t', ' ')
996 line_list = list()
997 output_table.append(line_list)
998 cells = line.split(sep=' ')
999 for cell in cells:
1000 cell = cell.strip()
1001 if len(cell) > 0:
1002 line_list.append(cell)
1003 return output_table
1004
quilesjcda5f412019-11-18 11:32:12 +01001005 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
quilesj26c78a42019-10-28 18:10:42 +01001006 """
1007 Returns kube and helm directories
1008
1009 :param cluster_name:
1010 :param create_if_not_exist:
quilesjcda5f412019-11-18 11:32:12 +01001011 :return: kube, helm directories, config filename and cluster dir.
1012 Raises exception if not exist and cannot create
quilesj26c78a42019-10-28 18:10:42 +01001013 """
1014
1015 base = self.fs.path
1016 if base.endswith("/") or base.endswith("\\"):
1017 base = base[:-1]
1018
1019 # base dir for cluster
1020 cluster_dir = base + '/' + cluster_name
1021 if create_if_not_exist and not os.path.exists(cluster_dir):
1022 self.debug('Creating dir {}'.format(cluster_dir))
1023 os.makedirs(cluster_dir)
1024 if not os.path.exists(cluster_dir):
1025 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
1026 self.error(msg)
quilesja6748412019-12-04 07:51:26 +00001027 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001028
1029 # kube dir
1030 kube_dir = cluster_dir + '/' + '.kube'
1031 if create_if_not_exist and not os.path.exists(kube_dir):
1032 self.debug('Creating dir {}'.format(kube_dir))
1033 os.makedirs(kube_dir)
1034 if not os.path.exists(kube_dir):
1035 msg = 'Kube config dir {} does not exist'.format(kube_dir)
1036 self.error(msg)
quilesja6748412019-12-04 07:51:26 +00001037 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001038
1039 # helm home dir
1040 helm_dir = cluster_dir + '/' + '.helm'
1041 if create_if_not_exist and not os.path.exists(helm_dir):
1042 self.debug('Creating dir {}'.format(helm_dir))
1043 os.makedirs(helm_dir)
1044 if not os.path.exists(helm_dir):
1045 msg = 'Helm config dir {} does not exist'.format(helm_dir)
1046 self.error(msg)
quilesja6748412019-12-04 07:51:26 +00001047 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001048
1049 config_filename = kube_dir + '/config'
quilesjcda5f412019-11-18 11:32:12 +01001050 return kube_dir, helm_dir, config_filename, cluster_dir
quilesj26c78a42019-10-28 18:10:42 +01001051
1052 @staticmethod
1053 def _remove_multiple_spaces(str):
1054 str = str.strip()
1055 while ' ' in str:
1056 str = str.replace(' ', ' ')
1057 return str
1058
1059 def _local_exec(
1060 self,
1061 command: str
1062 ) -> (str, int):
1063 command = K8sHelmConnector._remove_multiple_spaces(command)
1064 self.debug('Executing sync local command: {}'.format(command))
1065 # raise exception if fails
1066 output = ''
1067 try:
1068 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1069 return_code = 0
1070 self.debug(output)
1071 except Exception as e:
1072 return_code = 1
1073
1074 return output, return_code
1075
1076 async def _local_async_exec(
1077 self,
1078 command: str,
1079 raise_exception_on_error: bool = False,
quilesj1be06302019-11-29 11:17:11 +00001080 show_error_log: bool = True,
1081 encode_utf8: bool = False
quilesj26c78a42019-10-28 18:10:42 +01001082 ) -> (str, int):
1083
1084 command = K8sHelmConnector._remove_multiple_spaces(command)
1085 self.debug('Executing async local command: {}'.format(command))
1086
1087 # split command
1088 command = command.split(sep=' ')
1089
1090 try:
1091 process = await asyncio.create_subprocess_exec(
1092 *command,
1093 stdout=asyncio.subprocess.PIPE,
1094 stderr=asyncio.subprocess.PIPE
1095 )
1096
1097 # wait for command terminate
1098 stdout, stderr = await process.communicate()
1099
1100 return_code = process.returncode
1101
1102 output = ''
1103 if stdout:
1104 output = stdout.decode('utf-8').strip()
quilesj1be06302019-11-29 11:17:11 +00001105 # output = stdout.decode()
quilesj26c78a42019-10-28 18:10:42 +01001106 if stderr:
1107 output = stderr.decode('utf-8').strip()
quilesj1be06302019-11-29 11:17:11 +00001108 # output = stderr.decode()
quilesj26c78a42019-10-28 18:10:42 +01001109
1110 if return_code != 0 and show_error_log:
1111 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1112 else:
1113 self.debug('Return code: {}'.format(return_code))
1114
1115 if raise_exception_on_error and return_code != 0:
1116 raise Exception(output)
1117
quilesj1be06302019-11-29 11:17:11 +00001118 if encode_utf8:
1119 output = output.encode('utf-8').strip()
1120 output = str(output).replace('\\n', '\n')
1121
quilesj26c78a42019-10-28 18:10:42 +01001122 return output, return_code
1123
1124 except Exception as e:
1125 msg = 'Exception executing command: {} -> {}'.format(command, e)
1126 if show_error_log:
1127 self.error(msg)
quilesj23451b82020-01-23 16:30:04 +00001128 if raise_exception_on_error:
1129 raise e
1130 else:
1131 return '', -1
quilesj26c78a42019-10-28 18:10:42 +01001132
1133 def _remote_exec(
1134 self,
1135 hostname: str,
1136 username: str,
1137 password: str,
1138 command: str,
1139 timeout: int = 10
1140 ) -> (str, int):
1141
1142 command = K8sHelmConnector._remove_multiple_spaces(command)
1143 self.debug('Executing sync remote ssh command: {}'.format(command))
1144
1145 ssh = paramiko.SSHClient()
1146 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
1147 ssh.connect(hostname=hostname, username=username, password=password)
1148 ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
1149 output = ssh_stdout.read().decode('utf-8')
1150 error = ssh_stderr.read().decode('utf-8')
1151 if error:
1152 self.error('ERROR: {}'.format(error))
1153 return_code = 1
1154 else:
1155 return_code = 0
1156 output = output.replace('\\n', '\n')
1157 self.debug('OUTPUT: {}'.format(output))
1158
1159 return output, return_code
1160
1161 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1162 self.debug('Checking if file {} exists...'.format(filename))
1163 if os.path.exists(filename):
1164 return True
1165 else:
1166 msg = 'File {} does not exist'.format(filename)
1167 if exception_if_not_exists:
1168 self.error(msg)
quilesja6748412019-12-04 07:51:26 +00001169 raise K8sException(msg)