b367e7421cc02a54b8e06d632ca67d279f3616e3
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import subprocess
24 import os
25 import shutil
26 import asyncio
27 import time
28 import yaml
29 from uuid import uuid4
30 import random
31 from n2vc.k8s_conn import K8sConnector
32 from n2vc.exceptions import K8sException
33
34
35 class K8sHelmConnector(K8sConnector):
36
37 """
38 ##################################################################################################
39 ########################################## P U B L I C ###########################################
40 ##################################################################################################
41 """
42
43 def __init__(
44 self,
45 fs: object,
46 db: object,
47 kubectl_command: str = '/usr/bin/kubectl',
48 helm_command: str = '/usr/bin/helm',
49 log: object = None,
50 on_update_db=None
51 ):
52 """
53
54 :param fs: file system for kubernetes and helm configuration
55 :param db: database object to write current operation status
56 :param kubectl_command: path to kubectl executable
57 :param helm_command: path to helm executable
58 :param log: logger
59 :param on_update_db: callback called when k8s connector updates database
60 """
61
62 # parent class
63 K8sConnector.__init__(
64 self,
65 db=db,
66 log=log,
67 on_update_db=on_update_db
68 )
69
70 self.log.info('Initializing K8S Helm connector')
71
72 # random numbers for release name generation
73 random.seed(time.time())
74
75 # the file system
76 self.fs = fs
77
78 # exception if kubectl is not installed
79 self.kubectl_command = kubectl_command
80 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
81
82 # exception if helm is not installed
83 self._helm_command = helm_command
84 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
85
86 # initialize helm client-only
87 self.log.debug('Initializing helm client-only...')
88 command = '{} init --client-only'.format(self._helm_command)
89 try:
90 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
91 # loop = asyncio.get_event_loop()
92 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
95
96 self.log.info('K8S Helm connector initialized')
97
98 async def init_env(
99 self,
100 k8s_creds: str,
101 namespace: str = 'kube-system',
102 reuse_cluster_uuid=None
103 ) -> (str, bool):
104 """
105 It prepares a given K8s cluster environment to run Charts on both sides:
106 client (OSM)
107 server (Tiller)
108
109 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
110 :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
111 :param reuse_cluster_uuid: existing cluster uuid for reuse
112 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
113 (on error, an exception will be raised)
114 """
115
116 cluster_uuid = reuse_cluster_uuid
117 if not cluster_uuid:
118 cluster_uuid = str(uuid4())
119
120 self.log.debug('Initializing K8S environment. namespace: {}'.format(namespace))
121
122 # create config filename
123 kube_dir, helm_dir, config_filename, cluster_dir = \
124 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
125 f = open(config_filename, "w")
126 f.write(k8s_creds)
127 f.close()
128
129 # check if tiller pod is up in cluster
130 command = '{} --kubeconfig={} --namespace={} get deployments'\
131 .format(self.kubectl_command, config_filename, namespace)
132 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
133
134 output_table = K8sHelmConnector._output_to_table(output=output)
135
136 # find 'tiller' pod in all pods
137 already_initialized = False
138 try:
139 for row in output_table:
140 if row[0].startswith('tiller-deploy'):
141 already_initialized = True
142 break
143 except Exception as e:
144 pass
145
146 # helm init
147 n2vc_installed_sw = False
148 if not already_initialized:
149 self.log.info('Initializing helm in client and server: {}'.format(cluster_uuid))
150 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
151 .format(self._helm_command, config_filename, namespace, helm_dir)
152 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
153 n2vc_installed_sw = True
154 else:
155 # check client helm installation
156 check_file = helm_dir + '/repository/repositories.yaml'
157 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
158 self.log.info('Initializing helm in client: {}'.format(cluster_uuid))
159 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
160 .format(self._helm_command, config_filename, namespace, helm_dir)
161 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
162 else:
163 self.log.info('Helm client already initialized')
164
165 self.log.info('Cluster initialized {}'.format(cluster_uuid))
166
167 return cluster_uuid, n2vc_installed_sw
168
169 async def repo_add(
170 self,
171 cluster_uuid: str,
172 name: str,
173 url: str,
174 repo_type: str = 'chart'
175 ):
176
177 self.log.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
178
179 # config filename
180 kube_dir, helm_dir, config_filename, cluster_dir = \
181 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
182
183 # helm repo update
184 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
185 self.log.debug('updating repo: {}'.format(command))
186 await self._local_async_exec(command=command, raise_exception_on_error=False)
187
188 # helm repo add name url
189 command = '{} --kubeconfig={} --home={} repo add {} {}'\
190 .format(self._helm_command, config_filename, helm_dir, name, url)
191 self.log.debug('adding repo: {}'.format(command))
192 await self._local_async_exec(command=command, raise_exception_on_error=True)
193
194 async def repo_list(
195 self,
196 cluster_uuid: str
197 ) -> list:
198 """
199 Get the list of registered repositories
200
201 :return: list of registered repositories: [ (name, url) .... ]
202 """
203
204 self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
205
206 # config filename
207 kube_dir, helm_dir, config_filename, cluster_dir = \
208 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
209
210 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
211 .format(self._helm_command, config_filename, helm_dir)
212
213 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
214 if output and len(output) > 0:
215 return yaml.load(output, Loader=yaml.SafeLoader)
216 else:
217 return []
218
219 async def repo_remove(
220 self,
221 cluster_uuid: str,
222 name: str
223 ):
224 """
225 Remove a repository from OSM
226
227 :param cluster_uuid: the cluster
228 :param name: repo name in OSM
229 :return: True if successful
230 """
231
232 self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
233
234 # config filename
235 kube_dir, helm_dir, config_filename, cluster_dir = \
236 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
237
238 command = '{} --kubeconfig={} --home={} repo remove {}'\
239 .format(self._helm_command, config_filename, helm_dir, name)
240
241 await self._local_async_exec(command=command, raise_exception_on_error=True)
242
243 async def reset(
244 self,
245 cluster_uuid: str,
246 force: bool = False,
247 uninstall_sw: bool = False
248 ) -> bool:
249
250 self.log.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
251
252 # get kube and helm directories
253 kube_dir, helm_dir, config_filename, cluster_dir = \
254 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
255
256 # uninstall releases if needed
257 releases = await self.instances_list(cluster_uuid=cluster_uuid)
258 if len(releases) > 0:
259 if force:
260 for r in releases:
261 try:
262 kdu_instance = r.get('Name')
263 chart = r.get('Chart')
264 self.log.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
265 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
266 except Exception as e:
267 self.log.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
268 else:
269 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
270 .format(cluster_uuid)
271 self.log.error(msg)
272 raise K8sException(msg)
273
274 if uninstall_sw:
275
276 self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
277
278 # find namespace for tiller pod
279 command = '{} --kubeconfig={} get deployments --all-namespaces'\
280 .format(self.kubectl_command, config_filename)
281 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
282 output_table = K8sHelmConnector._output_to_table(output=output)
283 namespace = None
284 for r in output_table:
285 try:
286 if 'tiller-deploy' in r[1]:
287 namespace = r[0]
288 break
289 except Exception as e:
290 pass
291 else:
292 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
293 self.log.error(msg)
294
295 self.log.debug('namespace for tiller: {}'.format(namespace))
296
297 force_str = '--force'
298
299 if namespace:
300 # delete tiller deployment
301 self.log.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
302 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
303 .format(self.kubectl_command, namespace, config_filename, force_str)
304 await self._local_async_exec(command=command, raise_exception_on_error=False)
305
306 # uninstall tiller from cluster
307 self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
308 command = '{} --kubeconfig={} --home={} reset'\
309 .format(self._helm_command, config_filename, helm_dir)
310 self.log.debug('resetting: {}'.format(command))
311 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
312 else:
313 self.log.debug('namespace not found')
314
315 # delete cluster directory
316 dir = self.fs.path + '/' + cluster_uuid
317 self.log.debug('Removing directory {}'.format(dir))
318 shutil.rmtree(dir, ignore_errors=True)
319
320 return True
321
322 async def install(
323 self,
324 cluster_uuid: str,
325 kdu_model: str,
326 atomic: bool = True,
327 timeout: float = 300,
328 params: dict = None,
329 db_dict: dict = None,
330 kdu_name: str = None,
331 namespace: str = None
332 ):
333
334 self.log.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
335
336 # config filename
337 kube_dir, helm_dir, config_filename, cluster_dir = \
338 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
339
340 # params to str
341 # params_str = K8sHelmConnector._params_to_set_option(params)
342 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
343
344 timeout_str = ''
345 if timeout:
346 timeout_str = '--timeout {}'.format(timeout)
347
348 # atomic
349 atomic_str = ''
350 if atomic:
351 atomic_str = '--atomic'
352 # namespace
353 namespace_str = ''
354 if namespace:
355 namespace_str = "--namespace {}".format(namespace)
356
357 # version
358 version_str = ''
359 if ':' in kdu_model:
360 parts = kdu_model.split(sep=':')
361 if len(parts) == 2:
362 version_str = '--version {}'.format(parts[1])
363 kdu_model = parts[0]
364
365 # generate a name for the release. Then, check if already exists
366 kdu_instance = None
367 while kdu_instance is None:
368 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
369 try:
370 result = await self._status_kdu(
371 cluster_uuid=cluster_uuid,
372 kdu_instance=kdu_instance,
373 show_error_log=False
374 )
375 if result is not None:
376 # instance already exists: generate a new one
377 kdu_instance = None
378 except K8sException:
379 pass
380
381 # helm repo install
382 command = '{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} {params} {timeout} ' \
383 '--name={name} {ns} {model} {ver}'.format(helm=self._helm_command, atomic=atomic_str,
384 config=config_filename, dir=helm_dir, params=params_str,
385 timeout=timeout_str, name=kdu_instance, ns=namespace_str,
386 model=kdu_model, ver=version_str)
387 self.log.debug('installing: {}'.format(command))
388
389 if atomic:
390 # exec helm in a task
391 exec_task = asyncio.ensure_future(
392 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
393 )
394
395 # write status in another task
396 status_task = asyncio.ensure_future(
397 coro_or_future=self._store_status(
398 cluster_uuid=cluster_uuid,
399 kdu_instance=kdu_instance,
400 db_dict=db_dict,
401 operation='install',
402 run_once=False
403 )
404 )
405
406 # wait for execution task
407 await asyncio.wait([exec_task])
408
409 # cancel status task
410 status_task.cancel()
411
412 output, rc = exec_task.result()
413
414 else:
415
416 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
417
418 # remove temporal values yaml file
419 if file_to_delete:
420 os.remove(file_to_delete)
421
422 # write final status
423 await self._store_status(
424 cluster_uuid=cluster_uuid,
425 kdu_instance=kdu_instance,
426 db_dict=db_dict,
427 operation='install',
428 run_once=True,
429 check_every=0
430 )
431
432 if rc != 0:
433 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
434 self.log.error(msg)
435 raise K8sException(msg)
436
437 self.log.debug('Returning kdu_instance {}'.format(kdu_instance))
438 return kdu_instance
439
440 async def instances_list(
441 self,
442 cluster_uuid: str
443 ) -> list:
444 """
445 returns a list of deployed releases in a cluster
446
447 :param cluster_uuid: the cluster
448 :return:
449 """
450
451 self.log.debug('list releases for cluster {}'.format(cluster_uuid))
452
453 # config filename
454 kube_dir, helm_dir, config_filename, cluster_dir = \
455 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
456
457 command = '{} --kubeconfig={} --home={} list --output yaml'\
458 .format(self._helm_command, config_filename, helm_dir)
459
460 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
461
462 if output and len(output) > 0:
463 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
464 else:
465 return []
466
467 async def upgrade(
468 self,
469 cluster_uuid: str,
470 kdu_instance: str,
471 kdu_model: str = None,
472 atomic: bool = True,
473 timeout: float = 300,
474 params: dict = None,
475 db_dict: dict = None
476 ):
477
478 self.log.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
479
480 # config filename
481 kube_dir, helm_dir, config_filename, cluster_dir = \
482 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
483
484 # params to str
485 # params_str = K8sHelmConnector._params_to_set_option(params)
486 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
487
488 timeout_str = ''
489 if timeout:
490 timeout_str = '--timeout {}'.format(timeout)
491
492 # atomic
493 atomic_str = ''
494 if atomic:
495 atomic_str = '--atomic'
496
497 # version
498 version_str = ''
499 if kdu_model and ':' in kdu_model:
500 parts = kdu_model.split(sep=':')
501 if len(parts) == 2:
502 version_str = '--version {}'.format(parts[1])
503 kdu_model = parts[0]
504
505 # helm repo upgrade
506 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
507 .format(self._helm_command, atomic_str, config_filename, helm_dir,
508 params_str, timeout_str, kdu_instance, kdu_model, version_str)
509 self.log.debug('upgrading: {}'.format(command))
510
511 if atomic:
512
513 # exec helm in a task
514 exec_task = asyncio.ensure_future(
515 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
516 )
517 # write status in another task
518 status_task = asyncio.ensure_future(
519 coro_or_future=self._store_status(
520 cluster_uuid=cluster_uuid,
521 kdu_instance=kdu_instance,
522 db_dict=db_dict,
523 operation='upgrade',
524 run_once=False
525 )
526 )
527
528 # wait for execution task
529 await asyncio.wait([exec_task])
530
531 # cancel status task
532 status_task.cancel()
533 output, rc = exec_task.result()
534
535 else:
536
537 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
538
539 # remove temporal values yaml file
540 if file_to_delete:
541 os.remove(file_to_delete)
542
543 # write final status
544 await self._store_status(
545 cluster_uuid=cluster_uuid,
546 kdu_instance=kdu_instance,
547 db_dict=db_dict,
548 operation='upgrade',
549 run_once=True,
550 check_every=0
551 )
552
553 if rc != 0:
554 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
555 self.log.error(msg)
556 raise K8sException(msg)
557
558 # return new revision number
559 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
560 if instance:
561 revision = int(instance.get('Revision'))
562 self.log.debug('New revision: {}'.format(revision))
563 return revision
564 else:
565 return 0
566
567 async def rollback(
568 self,
569 cluster_uuid: str,
570 kdu_instance: str,
571 revision=0,
572 db_dict: dict = None
573 ):
574
575 self.log.debug('rollback kdu_instance {} to revision {} from cluster {}'
576 .format(kdu_instance, revision, cluster_uuid))
577
578 # config filename
579 kube_dir, helm_dir, config_filename, cluster_dir = \
580 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
581
582 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
583 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
584
585 # exec helm in a task
586 exec_task = asyncio.ensure_future(
587 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
588 )
589 # write status in another task
590 status_task = asyncio.ensure_future(
591 coro_or_future=self._store_status(
592 cluster_uuid=cluster_uuid,
593 kdu_instance=kdu_instance,
594 db_dict=db_dict,
595 operation='rollback',
596 run_once=False
597 )
598 )
599
600 # wait for execution task
601 await asyncio.wait([exec_task])
602
603 # cancel status task
604 status_task.cancel()
605
606 output, rc = exec_task.result()
607
608 # write final status
609 await self._store_status(
610 cluster_uuid=cluster_uuid,
611 kdu_instance=kdu_instance,
612 db_dict=db_dict,
613 operation='rollback',
614 run_once=True,
615 check_every=0
616 )
617
618 if rc != 0:
619 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
620 self.log.error(msg)
621 raise K8sException(msg)
622
623 # return new revision number
624 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
625 if instance:
626 revision = int(instance.get('Revision'))
627 self.log.debug('New revision: {}'.format(revision))
628 return revision
629 else:
630 return 0
631
632 async def uninstall(
633 self,
634 cluster_uuid: str,
635 kdu_instance: str
636 ):
637 """
638 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
639 after all _terminate-config-primitive_ of the VNF are invoked).
640
641 :param cluster_uuid: UUID of a K8s cluster known by OSM
642 :param kdu_instance: unique name for the KDU instance to be deleted
643 :return: True if successful
644 """
645
646 self.log.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
647
648 # config filename
649 kube_dir, helm_dir, config_filename, cluster_dir = \
650 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
651
652 command = '{} --kubeconfig={} --home={} delete --purge {}'\
653 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
654
655 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
656
657 return self._output_to_table(output)
658
659 async def inspect_kdu(
660 self,
661 kdu_model: str,
662 repo_url: str = None
663 ) -> str:
664
665 self.log.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
666
667 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
668
669 async def values_kdu(
670 self,
671 kdu_model: str,
672 repo_url: str = None
673 ) -> str:
674
675 self.log.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
676
677 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
678
679 async def help_kdu(
680 self,
681 kdu_model: str,
682 repo_url: str = None
683 ) -> str:
684
685 self.log.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
686
687 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
688
689 async def status_kdu(
690 self,
691 cluster_uuid: str,
692 kdu_instance: str
693 ) -> str:
694
695 # call internal function
696 return await self._status_kdu(
697 cluster_uuid=cluster_uuid,
698 kdu_instance=kdu_instance,
699 show_error_log=True,
700 return_text=True
701 )
702
703 async def synchronize_repos(self, cluster_uuid: str):
704
705 self.log.debug("syncronize repos for cluster helm-id: {}",)
706 try:
707 update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much
708 db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid})
709 if db_k8scluster:
710 nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or []
711 cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {}
712 # elements that must be deleted
713 deleted_repo_list = []
714 added_repo_dict = {}
715 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
716 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
717
718 # obtain repos to add: registered by nbi but not added
719 repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)]
720
721 # obtain repos to delete: added by cluster but not in nbi list
722 repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list]
723
724 # delete repos: must delete first then add because there may be different repos with same name but
725 # different id and url
726 self.log.debug("repos to delete: {}".format(repos_to_delete))
727 for repo_id in repos_to_delete:
728 # try to delete repos
729 try:
730 repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid,
731 name=cluster_repo_dict[repo_id]))
732 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
733 except Exception as e:
734 self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id,
735 cluster_repo_dict[repo_id], str(e)))
736 # always add to the list of to_delete if there is an error because if is not there deleting raises error
737 deleted_repo_list.append(repo_id)
738
739 # add repos
740 self.log.debug("repos to add: {}".format(repos_to_add))
741 add_task_list = []
742 for repo_id in repos_to_add:
743 # obtain the repo data from the db
744 # if there is an error getting the repo in the database we will ignore this repo and continue
745 # because there is a possible race condition where the repo has been deleted while processing
746 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
747 self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"]))
748 try:
749 repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid,
750 name=db_repo["name"], url=db_repo["url"],
751 repo_type="chart"))
752 await asyncio.wait_for(repo_add_task, update_repos_timeout)
753 added_repo_dict[repo_id] = db_repo["name"]
754 self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"]))
755 except Exception as e:
756 # deal with error adding repo, adding a repo that already exists does not raise any error
757 # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
758 self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e)))
759
760 return deleted_repo_list, added_repo_dict
761
762 else: # else db_k8scluster does not exist
763 raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid))
764
765 except Exception as e:
766 self.log.error("Error synchronizing repos: {}".format(str(e)))
767 raise K8sException("Error synchronizing repos")
768
769 """
770 ##################################################################################################
771 ########################################## P R I V A T E #########################################
772 ##################################################################################################
773 """
774
775 async def _exec_inspect_comand(
776 self,
777 inspect_command: str,
778 kdu_model: str,
779 repo_url: str = None
780 ):
781
782 repo_str = ''
783 if repo_url:
784 repo_str = ' --repo {}'.format(repo_url)
785 idx = kdu_model.find('/')
786 if idx >= 0:
787 idx += 1
788 kdu_model = kdu_model[idx:]
789
790 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
791 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
792
793 return output
794
795 async def _status_kdu(
796 self,
797 cluster_uuid: str,
798 kdu_instance: str,
799 show_error_log: bool = False,
800 return_text: bool = False
801 ):
802
803 self.log.debug('status of kdu_instance {}'.format(kdu_instance))
804
805 # config filename
806 kube_dir, helm_dir, config_filename, cluster_dir = \
807 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
808
809 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
810 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
811
812 output, rc = await self._local_async_exec(
813 command=command,
814 raise_exception_on_error=True,
815 show_error_log=show_error_log
816 )
817
818 if return_text:
819 return str(output)
820
821 if rc != 0:
822 return None
823
824 data = yaml.load(output, Loader=yaml.SafeLoader)
825
826 # remove field 'notes'
827 try:
828 del data.get('info').get('status')['notes']
829 except KeyError:
830 pass
831
832 # parse field 'resources'
833 try:
834 resources = str(data.get('info').get('status').get('resources'))
835 resource_table = self._output_to_table(resources)
836 data.get('info').get('status')['resources'] = resource_table
837 except Exception as e:
838 pass
839
840 return data
841
842 async def get_instance_info(
843 self,
844 cluster_uuid: str,
845 kdu_instance: str
846 ):
847 instances = await self.instances_list(cluster_uuid=cluster_uuid)
848 for instance in instances:
849 if instance.get('Name') == kdu_instance:
850 return instance
851 self.log.debug('Instance {} not found'.format(kdu_instance))
852 return None
853
854 @staticmethod
855 def _generate_release_name(
856 chart_name: str
857 ):
858 # check embeded chart (file or dir)
859 if chart_name.startswith('/'):
860 # extract file or directory name
861 chart_name = chart_name[chart_name.rfind('/')+1:]
862 # check URL
863 elif '://' in chart_name:
864 # extract last portion of URL
865 chart_name = chart_name[chart_name.rfind('/')+1:]
866
867 name = ''
868 for c in chart_name:
869 if c.isalpha() or c.isnumeric():
870 name += c
871 else:
872 name += '-'
873 if len(name) > 35:
874 name = name[0:35]
875
876 # if does not start with alpha character, prefix 'a'
877 if not name[0].isalpha():
878 name = 'a' + name
879
880 name += '-'
881
882 def get_random_number():
883 r = random.randrange(start=1, stop=99999999)
884 s = str(r)
885 s = s.rjust(10, '0')
886 return s
887
888 name = name + get_random_number()
889 return name.lower()
890
891 async def _store_status(
892 self,
893 cluster_uuid: str,
894 operation: str,
895 kdu_instance: str,
896 check_every: float = 10,
897 db_dict: dict = None,
898 run_once: bool = False
899 ):
900 while True:
901 try:
902 await asyncio.sleep(check_every)
903 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
904 status = detailed_status.get('info').get('Description')
905 self.log.debug('STATUS:\n{}'.format(status))
906 self.log.debug('DETAILED STATUS:\n{}'.format(detailed_status))
907 # write status to db
908 result = await self.write_app_status_to_db(
909 db_dict=db_dict,
910 status=str(status),
911 detailed_status=str(detailed_status),
912 operation=operation)
913 if not result:
914 self.log.info('Error writing in database. Task exiting...')
915 return
916 except asyncio.CancelledError:
917 self.log.debug('Task cancelled')
918 return
919 except Exception as e:
920 self.log.debug('_store_status exception: {}'.format(str(e)))
921 pass
922 finally:
923 if run_once:
924 return
925
926 async def _is_install_completed(
927 self,
928 cluster_uuid: str,
929 kdu_instance: str
930 ) -> bool:
931
932 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
933
934 # extract info.status.resources-> str
935 # format:
936 # ==> v1/Deployment
937 # NAME READY UP-TO-DATE AVAILABLE AGE
938 # halting-horse-mongodb 0/1 1 0 0s
939 # halting-petit-mongodb 1/1 1 0 0s
940 # blank line
941 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
942
943 # convert to table
944 resources = K8sHelmConnector._output_to_table(resources)
945
946 num_lines = len(resources)
947 index = 0
948 while index < num_lines:
949 try:
950 line1 = resources[index]
951 index += 1
952 # find '==>' in column 0
953 if line1[0] == '==>':
954 line2 = resources[index]
955 index += 1
956 # find READY in column 1
957 if line2[1] == 'READY':
958 # read next lines
959 line3 = resources[index]
960 index += 1
961 while len(line3) > 1 and index < num_lines:
962 ready_value = line3[1]
963 parts = ready_value.split(sep='/')
964 current = int(parts[0])
965 total = int(parts[1])
966 if current < total:
967 self.log.debug('NOT READY:\n {}'.format(line3))
968 ready = False
969 line3 = resources[index]
970 index += 1
971
972 except Exception as e:
973 pass
974
975 return ready
976
977 @staticmethod
978 def _get_deep(dictionary: dict, members: tuple):
979 target = dictionary
980 value = None
981 try:
982 for m in members:
983 value = target.get(m)
984 if not value:
985 return None
986 else:
987 target = value
988 except Exception as e:
989 pass
990 return value
991
992 # find key:value in several lines
993 @staticmethod
994 def _find_in_lines(p_lines: list, p_key: str) -> str:
995 for line in p_lines:
996 try:
997 if line.startswith(p_key + ':'):
998 parts = line.split(':')
999 the_value = parts[1].strip()
1000 return the_value
1001 except Exception as e:
1002 # ignore it
1003 pass
1004 return None
1005
1006 # params for use in -f file
1007 # returns values file option and filename (in order to delete it at the end)
1008 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
1009
1010 if params and len(params) > 0:
1011 kube_dir, helm_dir, config_filename, cluster_dir = \
1012 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
1013
1014 def get_random_number():
1015 r = random.randrange(start=1, stop=99999999)
1016 s = str(r)
1017 while len(s) < 10:
1018 s = '0' + s
1019 return s
1020
1021 params2 = dict()
1022 for key in params:
1023 value = params.get(key)
1024 if '!!yaml' in str(value):
1025 value = yaml.load(value[7:])
1026 params2[key] = value
1027
1028 values_file = get_random_number() + '.yaml'
1029 with open(values_file, 'w') as stream:
1030 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1031
1032 return '-f {}'.format(values_file), values_file
1033
1034 return '', None
1035
1036 # params for use in --set option
1037 @staticmethod
1038 def _params_to_set_option(params: dict) -> str:
1039 params_str = ''
1040 if params and len(params) > 0:
1041 start = True
1042 for key in params:
1043 value = params.get(key, None)
1044 if value is not None:
1045 if start:
1046 params_str += '--set '
1047 start = False
1048 else:
1049 params_str += ','
1050 params_str += '{}={}'.format(key, value)
1051 return params_str
1052
1053 @staticmethod
1054 def _output_to_lines(output: str) -> list:
1055 output_lines = list()
1056 lines = output.splitlines(keepends=False)
1057 for line in lines:
1058 line = line.strip()
1059 if len(line) > 0:
1060 output_lines.append(line)
1061 return output_lines
1062
1063 @staticmethod
1064 def _output_to_table(output: str) -> list:
1065 output_table = list()
1066 lines = output.splitlines(keepends=False)
1067 for line in lines:
1068 line = line.replace('\t', ' ')
1069 line_list = list()
1070 output_table.append(line_list)
1071 cells = line.split(sep=' ')
1072 for cell in cells:
1073 cell = cell.strip()
1074 if len(cell) > 0:
1075 line_list.append(cell)
1076 return output_table
1077
1078 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
1079 """
1080 Returns kube and helm directories
1081
1082 :param cluster_name:
1083 :param create_if_not_exist:
1084 :return: kube, helm directories, config filename and cluster dir.
1085 Raises exception if not exist and cannot create
1086 """
1087
1088 base = self.fs.path
1089 if base.endswith("/") or base.endswith("\\"):
1090 base = base[:-1]
1091
1092 # base dir for cluster
1093 cluster_dir = base + '/' + cluster_name
1094 if create_if_not_exist and not os.path.exists(cluster_dir):
1095 self.log.debug('Creating dir {}'.format(cluster_dir))
1096 os.makedirs(cluster_dir)
1097 if not os.path.exists(cluster_dir):
1098 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
1099 self.log.error(msg)
1100 raise K8sException(msg)
1101
1102 # kube dir
1103 kube_dir = cluster_dir + '/' + '.kube'
1104 if create_if_not_exist and not os.path.exists(kube_dir):
1105 self.log.debug('Creating dir {}'.format(kube_dir))
1106 os.makedirs(kube_dir)
1107 if not os.path.exists(kube_dir):
1108 msg = 'Kube config dir {} does not exist'.format(kube_dir)
1109 self.log.error(msg)
1110 raise K8sException(msg)
1111
1112 # helm home dir
1113 helm_dir = cluster_dir + '/' + '.helm'
1114 if create_if_not_exist and not os.path.exists(helm_dir):
1115 self.log.debug('Creating dir {}'.format(helm_dir))
1116 os.makedirs(helm_dir)
1117 if not os.path.exists(helm_dir):
1118 msg = 'Helm config dir {} does not exist'.format(helm_dir)
1119 self.log.error(msg)
1120 raise K8sException(msg)
1121
1122 config_filename = kube_dir + '/config'
1123 return kube_dir, helm_dir, config_filename, cluster_dir
1124
1125 @staticmethod
1126 def _remove_multiple_spaces(str):
1127 str = str.strip()
1128 while ' ' in str:
1129 str = str.replace(' ', ' ')
1130 return str
1131
1132 def _local_exec(
1133 self,
1134 command: str
1135 ) -> (str, int):
1136 command = K8sHelmConnector._remove_multiple_spaces(command)
1137 self.log.debug('Executing sync local command: {}'.format(command))
1138 # raise exception if fails
1139 output = ''
1140 try:
1141 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1142 return_code = 0
1143 self.log.debug(output)
1144 except Exception as e:
1145 return_code = 1
1146
1147 return output, return_code
1148
1149 async def _local_async_exec(
1150 self,
1151 command: str,
1152 raise_exception_on_error: bool = False,
1153 show_error_log: bool = True,
1154 encode_utf8: bool = False
1155 ) -> (str, int):
1156
1157 command = K8sHelmConnector._remove_multiple_spaces(command)
1158 self.log.debug('Executing async local command: {}'.format(command))
1159
1160 # split command
1161 command = command.split(sep=' ')
1162
1163 try:
1164 process = await asyncio.create_subprocess_exec(
1165 *command,
1166 stdout=asyncio.subprocess.PIPE,
1167 stderr=asyncio.subprocess.PIPE
1168 )
1169
1170 # wait for command terminate
1171 stdout, stderr = await process.communicate()
1172
1173 return_code = process.returncode
1174
1175 output = ''
1176 if stdout:
1177 output = stdout.decode('utf-8').strip()
1178 # output = stdout.decode()
1179 if stderr:
1180 output = stderr.decode('utf-8').strip()
1181 # output = stderr.decode()
1182
1183 if return_code != 0 and show_error_log:
1184 self.log.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
1185 else:
1186 self.log.debug('Return code: {}'.format(return_code))
1187
1188 if raise_exception_on_error and return_code != 0:
1189 raise K8sException(output)
1190
1191 if encode_utf8:
1192 output = output.encode('utf-8').strip()
1193 output = str(output).replace('\\n', '\n')
1194
1195 return output, return_code
1196
1197 except asyncio.CancelledError:
1198 raise
1199 except K8sException:
1200 raise
1201 except Exception as e:
1202 msg = 'Exception executing command: {} -> {}'.format(command, e)
1203 self.log.error(msg)
1204 if raise_exception_on_error:
1205 raise K8sException(e) from e
1206 else:
1207 return '', -1
1208
1209 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1210 # self.log.debug('Checking if file {} exists...'.format(filename))
1211 if os.path.exists(filename):
1212 return True
1213 else:
1214 msg = 'File {} does not exist'.format(filename)
1215 if exception_if_not_exists:
1216 # self.log.error(msg)
1217 raise K8sException(msg)
1218
1219