Install iptables-persistent if it doesn't exist
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = "/usr/bin/kubectl",
49 helm_command: str = "/usr/bin/helm",
50 log: object = None,
51 on_update_db=None,
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
65
66 self.log.info("Initializing K8S Helm connector")
67
68 # random numbers for release name generation
69 random.seed(time.time())
70
71 # the file system
72 self.fs = fs
73
74 # exception if kubectl is not installed
75 self.kubectl_command = kubectl_command
76 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
77
78 # exception if helm is not installed
79 self._helm_command = helm_command
80 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
81
82 # initialize helm client-only
83 self.log.debug("Initializing helm client-only...")
84 command = "{} init --client-only".format(self._helm_command)
85 try:
86 asyncio.ensure_future(
87 self._local_async_exec(command=command, raise_exception_on_error=False)
88 )
89 # loop = asyncio.get_event_loop()
90 # loop.run_until_complete(self._local_async_exec(command=command,
91 # raise_exception_on_error=False))
92 except Exception as e:
93 self.warning(
94 msg="helm init failed (it was already initialized): {}".format(e)
95 )
96
97 self.log.info("K8S Helm connector initialized")
98
99 async def init_env(
100 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
101 ) -> (str, bool):
102 """
103 It prepares a given K8s cluster environment to run Charts on both sides:
104 client (OSM)
105 server (Tiller)
106
107 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
108 '.kube/config'
109 :param namespace: optional namespace to be used for helm. By default,
110 'kube-system' will be used
111 :param reuse_cluster_uuid: existing cluster uuid for reuse
112 :return: uuid of the K8s cluster and True if connector has installed some
113 software in the cluster
114 (on error, an exception will be raised)
115 """
116
117 cluster_uuid = reuse_cluster_uuid
118 if not cluster_uuid:
119 cluster_uuid = str(uuid4())
120
121 self.log.debug("Initializing K8S environment. namespace: {}".format(namespace))
122
123 # create config filename
124 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
125 cluster_name=cluster_uuid, create_if_not_exist=True
126 )
127 f = open(config_filename, "w")
128 f.write(k8s_creds)
129 f.close()
130
131 # check if tiller pod is up in cluster
132 command = "{} --kubeconfig={} --namespace={} get deployments".format(
133 self.kubectl_command, config_filename, namespace
134 )
135 output, _rc = await self._local_async_exec(
136 command=command, raise_exception_on_error=True
137 )
138
139 output_table = K8sHelmConnector._output_to_table(output=output)
140
141 # find 'tiller' pod in all pods
142 already_initialized = False
143 try:
144 for row in output_table:
145 if row[0].startswith("tiller-deploy"):
146 already_initialized = True
147 break
148 except Exception:
149 pass
150
151 # helm init
152 n2vc_installed_sw = False
153 if not already_initialized:
154 self.log.info(
155 "Initializing helm in client and server: {}".format(cluster_uuid)
156 )
157 command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format(
158 self._helm_command, config_filename, namespace, helm_dir
159 )
160 output, _rc = await self._local_async_exec(
161 command=command, raise_exception_on_error=True
162 )
163 n2vc_installed_sw = True
164 else:
165 # check client helm installation
166 check_file = helm_dir + "/repository/repositories.yaml"
167 if not self._check_file_exists(
168 filename=check_file, exception_if_not_exists=False
169 ):
170 self.log.info("Initializing helm in client: {}".format(cluster_uuid))
171 command = (
172 "{} --kubeconfig={} --tiller-namespace={} "
173 "--home={} init --client-only"
174 ).format(self._helm_command, config_filename, namespace, helm_dir)
175 output, _rc = await self._local_async_exec(
176 command=command, raise_exception_on_error=True
177 )
178 else:
179 self.log.info("Helm client already initialized")
180
181 self.log.info("Cluster initialized {}".format(cluster_uuid))
182
183 return cluster_uuid, n2vc_installed_sw
184
185 async def repo_add(
186 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
187 ):
188
189 self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url))
190
191 # config filename
192 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
193 cluster_name=cluster_uuid, create_if_not_exist=True
194 )
195
196 # helm repo update
197 command = "{} --kubeconfig={} --home={} repo update".format(
198 self._helm_command, config_filename, helm_dir
199 )
200 self.log.debug("updating repo: {}".format(command))
201 await self._local_async_exec(command=command, raise_exception_on_error=False)
202
203 # helm repo add name url
204 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
205 self._helm_command, config_filename, helm_dir, name, url
206 )
207 self.log.debug("adding repo: {}".format(command))
208 await self._local_async_exec(command=command, raise_exception_on_error=True)
209
210 async def repo_list(self, cluster_uuid: str) -> list:
211 """
212 Get the list of registered repositories
213
214 :return: list of registered repositories: [ (name, url) .... ]
215 """
216
217 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
218
219 # config filename
220 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
221 cluster_name=cluster_uuid, create_if_not_exist=True
222 )
223
224 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
225 self._helm_command, config_filename, helm_dir
226 )
227
228 output, _rc = await self._local_async_exec(
229 command=command, raise_exception_on_error=True
230 )
231 if output and len(output) > 0:
232 return yaml.load(output, Loader=yaml.SafeLoader)
233 else:
234 return []
235
236 async def repo_remove(self, cluster_uuid: str, name: str):
237 """
238 Remove a repository from OSM
239
240 :param cluster_uuid: the cluster
241 :param name: repo name in OSM
242 :return: True if successful
243 """
244
245 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
246
247 # config filename
248 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
249 cluster_name=cluster_uuid, create_if_not_exist=True
250 )
251
252 command = "{} --kubeconfig={} --home={} repo remove {}".format(
253 self._helm_command, config_filename, helm_dir, name
254 )
255
256 await self._local_async_exec(command=command, raise_exception_on_error=True)
257
258 async def reset(
259 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
260 ) -> bool:
261
262 self.log.debug(
263 "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid)
264 )
265
266 # get kube and helm directories
267 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
268 cluster_name=cluster_uuid, create_if_not_exist=False
269 )
270
271 # uninstall releases if needed
272 releases = await self.instances_list(cluster_uuid=cluster_uuid)
273 if len(releases) > 0:
274 if force:
275 for r in releases:
276 try:
277 kdu_instance = r.get("Name")
278 chart = r.get("Chart")
279 self.log.debug(
280 "Uninstalling {} -> {}".format(chart, kdu_instance)
281 )
282 await self.uninstall(
283 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
284 )
285 except Exception as e:
286 self.log.error(
287 "Error uninstalling release {}: {}".format(kdu_instance, e)
288 )
289 else:
290 msg = (
291 "Cluster has releases and not force. Cannot reset K8s "
292 "environment. Cluster uuid: {}"
293 ).format(cluster_uuid)
294 self.log.error(msg)
295 raise K8sException(msg)
296
297 if uninstall_sw:
298
299 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid))
300
301 # find namespace for tiller pod
302 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
303 self.kubectl_command, config_filename
304 )
305 output, _rc = await self._local_async_exec(
306 command=command, raise_exception_on_error=False
307 )
308 output_table = K8sHelmConnector._output_to_table(output=output)
309 namespace = None
310 for r in output_table:
311 try:
312 if "tiller-deploy" in r[1]:
313 namespace = r[0]
314 break
315 except Exception:
316 pass
317 else:
318 msg = "Tiller deployment not found in cluster {}".format(cluster_uuid)
319 self.log.error(msg)
320
321 self.log.debug("namespace for tiller: {}".format(namespace))
322
323 force_str = "--force"
324
325 if namespace:
326 # delete tiller deployment
327 self.log.debug(
328 "Deleting tiller deployment for cluster {}, namespace {}".format(
329 cluster_uuid, namespace
330 )
331 )
332 command = (
333 "{} --namespace {} --kubeconfig={} {} delete deployment "
334 "tiller-deploy"
335 ).format(self.kubectl_command, namespace, config_filename, force_str)
336 await self._local_async_exec(
337 command=command, raise_exception_on_error=False
338 )
339
340 # uninstall tiller from cluster
341 self.log.debug(
342 "Uninstalling tiller from cluster {}".format(cluster_uuid)
343 )
344 command = "{} --kubeconfig={} --home={} reset".format(
345 self._helm_command, config_filename, helm_dir
346 )
347 self.log.debug("resetting: {}".format(command))
348 output, _rc = await self._local_async_exec(
349 command=command, raise_exception_on_error=True
350 )
351 else:
352 self.log.debug("namespace not found")
353
354 # delete cluster directory
355 direct = self.fs.path + "/" + cluster_uuid
356 self.log.debug("Removing directory {}".format(direct))
357 shutil.rmtree(direct, ignore_errors=True)
358
359 return True
360
361 async def install(
362 self,
363 cluster_uuid: str,
364 kdu_model: str,
365 atomic: bool = True,
366 timeout: float = 300,
367 params: dict = None,
368 db_dict: dict = None,
369 kdu_name: str = None,
370 namespace: str = None,
371 ):
372
373 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
374
375 # config filename
376 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
377 cluster_name=cluster_uuid, create_if_not_exist=True
378 )
379
380 # params to str
381 # params_str = K8sHelmConnector._params_to_set_option(params)
382 params_str, file_to_delete = self._params_to_file_option(
383 cluster_uuid=cluster_uuid, params=params
384 )
385
386 timeout_str = ""
387 if timeout:
388 timeout_str = "--timeout {}".format(timeout)
389
390 # atomic
391 atomic_str = ""
392 if atomic:
393 atomic_str = "--atomic"
394 # namespace
395 namespace_str = ""
396 if namespace:
397 namespace_str = "--namespace {}".format(namespace)
398
399 # version
400 version_str = ""
401 if ":" in kdu_model:
402 parts = kdu_model.split(sep=":")
403 if len(parts) == 2:
404 version_str = "--version {}".format(parts[1])
405 kdu_model = parts[0]
406
407 # generate a name for the release. Then, check if already exists
408 kdu_instance = None
409 while kdu_instance is None:
410 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
411 try:
412 result = await self._status_kdu(
413 cluster_uuid=cluster_uuid,
414 kdu_instance=kdu_instance,
415 show_error_log=False,
416 )
417 if result is not None:
418 # instance already exists: generate a new one
419 kdu_instance = None
420 except K8sException:
421 pass
422
423 # helm repo install
424 command = (
425 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
426 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
427 helm=self._helm_command,
428 atomic=atomic_str,
429 config=config_filename,
430 dir=helm_dir,
431 params=params_str,
432 timeout=timeout_str,
433 name=kdu_instance,
434 ns=namespace_str,
435 model=kdu_model,
436 ver=version_str,
437 )
438 )
439 self.log.debug("installing: {}".format(command))
440
441 if atomic:
442 # exec helm in a task
443 exec_task = asyncio.ensure_future(
444 coro_or_future=self._local_async_exec(
445 command=command, raise_exception_on_error=False
446 )
447 )
448
449 # write status in another task
450 status_task = asyncio.ensure_future(
451 coro_or_future=self._store_status(
452 cluster_uuid=cluster_uuid,
453 kdu_instance=kdu_instance,
454 db_dict=db_dict,
455 operation="install",
456 run_once=False,
457 )
458 )
459
460 # wait for execution task
461 await asyncio.wait([exec_task])
462
463 # cancel status task
464 status_task.cancel()
465
466 output, rc = exec_task.result()
467
468 else:
469
470 output, rc = await self._local_async_exec(
471 command=command, raise_exception_on_error=False
472 )
473
474 # remove temporal values yaml file
475 if file_to_delete:
476 os.remove(file_to_delete)
477
478 # write final status
479 await self._store_status(
480 cluster_uuid=cluster_uuid,
481 kdu_instance=kdu_instance,
482 db_dict=db_dict,
483 operation="install",
484 run_once=True,
485 check_every=0,
486 )
487
488 if rc != 0:
489 msg = "Error executing command: {}\nOutput: {}".format(command, output)
490 self.log.error(msg)
491 raise K8sException(msg)
492
493 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
494 return kdu_instance
495
496 async def instances_list(self, cluster_uuid: str) -> list:
497 """
498 returns a list of deployed releases in a cluster
499
500 :param cluster_uuid: the cluster
501 :return:
502 """
503
504 self.log.debug("list releases for cluster {}".format(cluster_uuid))
505
506 # config filename
507 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
508 cluster_name=cluster_uuid, create_if_not_exist=True
509 )
510
511 command = "{} --kubeconfig={} --home={} list --output yaml".format(
512 self._helm_command, config_filename, helm_dir
513 )
514
515 output, _rc = await self._local_async_exec(
516 command=command, raise_exception_on_error=True
517 )
518
519 if output and len(output) > 0:
520 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
521 else:
522 return []
523
524 async def upgrade(
525 self,
526 cluster_uuid: str,
527 kdu_instance: str,
528 kdu_model: str = None,
529 atomic: bool = True,
530 timeout: float = 300,
531 params: dict = None,
532 db_dict: dict = None,
533 ):
534
535 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
536
537 # config filename
538 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
539 cluster_name=cluster_uuid, create_if_not_exist=True
540 )
541
542 # params to str
543 # params_str = K8sHelmConnector._params_to_set_option(params)
544 params_str, file_to_delete = self._params_to_file_option(
545 cluster_uuid=cluster_uuid, params=params
546 )
547
548 timeout_str = ""
549 if timeout:
550 timeout_str = "--timeout {}".format(timeout)
551
552 # atomic
553 atomic_str = ""
554 if atomic:
555 atomic_str = "--atomic"
556
557 # version
558 version_str = ""
559 if kdu_model and ":" in kdu_model:
560 parts = kdu_model.split(sep=":")
561 if len(parts) == 2:
562 version_str = "--version {}".format(parts[1])
563 kdu_model = parts[0]
564
565 # helm repo upgrade
566 command = (
567 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
568 ).format(
569 self._helm_command,
570 atomic_str,
571 config_filename,
572 helm_dir,
573 params_str,
574 timeout_str,
575 kdu_instance,
576 kdu_model,
577 version_str,
578 )
579 self.log.debug("upgrading: {}".format(command))
580
581 if atomic:
582
583 # exec helm in a task
584 exec_task = asyncio.ensure_future(
585 coro_or_future=self._local_async_exec(
586 command=command, raise_exception_on_error=False
587 )
588 )
589 # write status in another task
590 status_task = asyncio.ensure_future(
591 coro_or_future=self._store_status(
592 cluster_uuid=cluster_uuid,
593 kdu_instance=kdu_instance,
594 db_dict=db_dict,
595 operation="upgrade",
596 run_once=False,
597 )
598 )
599
600 # wait for execution task
601 await asyncio.wait([exec_task])
602
603 # cancel status task
604 status_task.cancel()
605 output, rc = exec_task.result()
606
607 else:
608
609 output, rc = await self._local_async_exec(
610 command=command, raise_exception_on_error=False
611 )
612
613 # remove temporal values yaml file
614 if file_to_delete:
615 os.remove(file_to_delete)
616
617 # write final status
618 await self._store_status(
619 cluster_uuid=cluster_uuid,
620 kdu_instance=kdu_instance,
621 db_dict=db_dict,
622 operation="upgrade",
623 run_once=True,
624 check_every=0,
625 )
626
627 if rc != 0:
628 msg = "Error executing command: {}\nOutput: {}".format(command, output)
629 self.log.error(msg)
630 raise K8sException(msg)
631
632 # return new revision number
633 instance = await self.get_instance_info(
634 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
635 )
636 if instance:
637 revision = int(instance.get("Revision"))
638 self.log.debug("New revision: {}".format(revision))
639 return revision
640 else:
641 return 0
642
643 async def rollback(
644 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
645 ):
646
647 self.log.debug(
648 "rollback kdu_instance {} to revision {} from cluster {}".format(
649 kdu_instance, revision, cluster_uuid
650 )
651 )
652
653 # config filename
654 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
655 cluster_name=cluster_uuid, create_if_not_exist=True
656 )
657
658 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
659 self._helm_command, config_filename, helm_dir, kdu_instance, revision
660 )
661
662 # exec helm in a task
663 exec_task = asyncio.ensure_future(
664 coro_or_future=self._local_async_exec(
665 command=command, raise_exception_on_error=False
666 )
667 )
668 # write status in another task
669 status_task = asyncio.ensure_future(
670 coro_or_future=self._store_status(
671 cluster_uuid=cluster_uuid,
672 kdu_instance=kdu_instance,
673 db_dict=db_dict,
674 operation="rollback",
675 run_once=False,
676 )
677 )
678
679 # wait for execution task
680 await asyncio.wait([exec_task])
681
682 # cancel status task
683 status_task.cancel()
684
685 output, rc = exec_task.result()
686
687 # write final status
688 await self._store_status(
689 cluster_uuid=cluster_uuid,
690 kdu_instance=kdu_instance,
691 db_dict=db_dict,
692 operation="rollback",
693 run_once=True,
694 check_every=0,
695 )
696
697 if rc != 0:
698 msg = "Error executing command: {}\nOutput: {}".format(command, output)
699 self.log.error(msg)
700 raise K8sException(msg)
701
702 # return new revision number
703 instance = await self.get_instance_info(
704 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
705 )
706 if instance:
707 revision = int(instance.get("Revision"))
708 self.log.debug("New revision: {}".format(revision))
709 return revision
710 else:
711 return 0
712
713 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
714 """
715 Removes an existing KDU instance. It would implicitly use the `delete` call
716 (this call would happen after all _terminate-config-primitive_ of the VNF
717 are invoked).
718
719 :param cluster_uuid: UUID of a K8s cluster known by OSM
720 :param kdu_instance: unique name for the KDU instance to be deleted
721 :return: True if successful
722 """
723
724 self.log.debug(
725 "uninstall kdu_instance {} from cluster {}".format(
726 kdu_instance, cluster_uuid
727 )
728 )
729
730 # config filename
731 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
732 cluster_name=cluster_uuid, create_if_not_exist=True
733 )
734
735 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
736 self._helm_command, config_filename, helm_dir, kdu_instance
737 )
738
739 output, _rc = await self._local_async_exec(
740 command=command, raise_exception_on_error=True
741 )
742
743 return self._output_to_table(output)
744
745 async def exec_primitive(
746 self,
747 cluster_uuid: str = None,
748 kdu_instance: str = None,
749 primitive_name: str = None,
750 timeout: float = 300,
751 params: dict = None,
752 db_dict: dict = None,
753 ) -> str:
754 """Exec primitive (Juju action)
755
756 :param cluster_uuid str: The UUID of the cluster
757 :param kdu_instance str: The unique name of the KDU instance
758 :param primitive_name: Name of action that will be executed
759 :param timeout: Timeout for action execution
760 :param params: Dictionary of all the parameters needed for the action
761 :db_dict: Dictionary for any additional data
762
763 :return: Returns the output of the action
764 """
765 raise K8sException(
766 "KDUs deployed with Helm don't support actions "
767 "different from rollback, upgrade and status"
768 )
769
770 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
771
772 self.log.debug(
773 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
774 )
775
776 return await self._exec_inspect_comand(
777 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
778 )
779
780 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
781
782 self.log.debug(
783 "inspect kdu_model values {} from (optional) repo: {}".format(
784 kdu_model, repo_url
785 )
786 )
787
788 return await self._exec_inspect_comand(
789 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
790 )
791
792 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
793
794 self.log.debug(
795 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
796 )
797
798 return await self._exec_inspect_comand(
799 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
800 )
801
802 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
803
804 # call internal function
805 return await self._status_kdu(
806 cluster_uuid=cluster_uuid,
807 kdu_instance=kdu_instance,
808 show_error_log=True,
809 return_text=True,
810 )
811
812 async def synchronize_repos(self, cluster_uuid: str):
813
814 self.log.debug("syncronize repos for cluster helm-id: {}",)
815 try:
816 update_repos_timeout = (
817 300 # max timeout to sync a single repos, more than this is too much
818 )
819 db_k8scluster = self.db.get_one(
820 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
821 )
822 if db_k8scluster:
823 nbi_repo_list = (
824 db_k8scluster.get("_admin").get("helm_chart_repos") or []
825 )
826 cluster_repo_dict = (
827 db_k8scluster.get("_admin").get("helm_charts_added") or {}
828 )
829 # elements that must be deleted
830 deleted_repo_list = []
831 added_repo_dict = {}
832 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
833 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
834
835 # obtain repos to add: registered by nbi but not added
836 repos_to_add = [
837 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
838 ]
839
840 # obtain repos to delete: added by cluster but not in nbi list
841 repos_to_delete = [
842 repo
843 for repo in cluster_repo_dict.keys()
844 if repo not in nbi_repo_list
845 ]
846
847 # delete repos: must delete first then add because there may be
848 # different repos with same name but
849 # different id and url
850 self.log.debug("repos to delete: {}".format(repos_to_delete))
851 for repo_id in repos_to_delete:
852 # try to delete repos
853 try:
854 repo_delete_task = asyncio.ensure_future(
855 self.repo_remove(
856 cluster_uuid=cluster_uuid,
857 name=cluster_repo_dict[repo_id],
858 )
859 )
860 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
861 except Exception as e:
862 self.warning(
863 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
864 repo_id, cluster_repo_dict[repo_id], str(e)
865 )
866 )
867 # always add to the list of to_delete if there is an error
868 # because if is not there
869 # deleting raises error
870 deleted_repo_list.append(repo_id)
871
872 # add repos
873 self.log.debug("repos to add: {}".format(repos_to_add))
874 for repo_id in repos_to_add:
875 # obtain the repo data from the db
876 # if there is an error getting the repo in the database we will
877 # ignore this repo and continue
878 # because there is a possible race condition where the repo has
879 # been deleted while processing
880 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
881 self.log.debug(
882 "obtained repo: id, {}, name: {}, url: {}".format(
883 repo_id, db_repo["name"], db_repo["url"]
884 )
885 )
886 try:
887 repo_add_task = asyncio.ensure_future(
888 self.repo_add(
889 cluster_uuid=cluster_uuid,
890 name=db_repo["name"],
891 url=db_repo["url"],
892 repo_type="chart",
893 )
894 )
895 await asyncio.wait_for(repo_add_task, update_repos_timeout)
896 added_repo_dict[repo_id] = db_repo["name"]
897 self.log.debug(
898 "added repo: id, {}, name: {}".format(
899 repo_id, db_repo["name"]
900 )
901 )
902 except Exception as e:
903 # deal with error adding repo, adding a repo that already
904 # exists does not raise any error
905 # will not raise error because a wrong repos added by
906 # anyone could prevent instantiating any ns
907 self.log.error(
908 "Error adding repo id: {}, err_msg: {} ".format(
909 repo_id, repr(e)
910 )
911 )
912
913 return deleted_repo_list, added_repo_dict
914
915 else: # else db_k8scluster does not exist
916 raise K8sException(
917 "k8cluster with helm-id : {} not found".format(cluster_uuid)
918 )
919
920 except Exception as e:
921 self.log.error("Error synchronizing repos: {}".format(str(e)))
922 raise K8sException("Error synchronizing repos")
923
924 """
925 ####################################################################################
926 ################################### P R I V A T E ##################################
927 ####################################################################################
928 """
929
930 async def _exec_inspect_comand(
931 self, inspect_command: str, kdu_model: str, repo_url: str = None
932 ):
933
934 repo_str = ""
935 if repo_url:
936 repo_str = " --repo {}".format(repo_url)
937 idx = kdu_model.find("/")
938 if idx >= 0:
939 idx += 1
940 kdu_model = kdu_model[idx:]
941
942 inspect_command = "{} inspect {} {}{}".format(
943 self._helm_command, inspect_command, kdu_model, repo_str
944 )
945 output, _rc = await self._local_async_exec(
946 command=inspect_command, encode_utf8=True
947 )
948
949 return output
950
951 async def _status_kdu(
952 self,
953 cluster_uuid: str,
954 kdu_instance: str,
955 show_error_log: bool = False,
956 return_text: bool = False,
957 ):
958
959 self.log.debug("status of kdu_instance {}".format(kdu_instance))
960
961 # config filename
962 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
963 cluster_name=cluster_uuid, create_if_not_exist=True
964 )
965
966 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
967 self._helm_command, config_filename, helm_dir, kdu_instance
968 )
969
970 output, rc = await self._local_async_exec(
971 command=command,
972 raise_exception_on_error=True,
973 show_error_log=show_error_log,
974 )
975
976 if return_text:
977 return str(output)
978
979 if rc != 0:
980 return None
981
982 data = yaml.load(output, Loader=yaml.SafeLoader)
983
984 # remove field 'notes'
985 try:
986 del data.get("info").get("status")["notes"]
987 except KeyError:
988 pass
989
990 # parse field 'resources'
991 try:
992 resources = str(data.get("info").get("status").get("resources"))
993 resource_table = self._output_to_table(resources)
994 data.get("info").get("status")["resources"] = resource_table
995 except Exception:
996 pass
997
998 return data
999
1000 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1001 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1002 for instance in instances:
1003 if instance.get("Name") == kdu_instance:
1004 return instance
1005 self.log.debug("Instance {} not found".format(kdu_instance))
1006 return None
1007
1008 @staticmethod
1009 def _generate_release_name(chart_name: str):
1010 # check embeded chart (file or dir)
1011 if chart_name.startswith("/"):
1012 # extract file or directory name
1013 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1014 # check URL
1015 elif "://" in chart_name:
1016 # extract last portion of URL
1017 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1018
1019 name = ""
1020 for c in chart_name:
1021 if c.isalpha() or c.isnumeric():
1022 name += c
1023 else:
1024 name += "-"
1025 if len(name) > 35:
1026 name = name[0:35]
1027
1028 # if does not start with alpha character, prefix 'a'
1029 if not name[0].isalpha():
1030 name = "a" + name
1031
1032 name += "-"
1033
1034 def get_random_number():
1035 r = random.randrange(start=1, stop=99999999)
1036 s = str(r)
1037 s = s.rjust(10, "0")
1038 return s
1039
1040 name = name + get_random_number()
1041 return name.lower()
1042
1043 async def _store_status(
1044 self,
1045 cluster_uuid: str,
1046 operation: str,
1047 kdu_instance: str,
1048 check_every: float = 10,
1049 db_dict: dict = None,
1050 run_once: bool = False,
1051 ):
1052 while True:
1053 try:
1054 await asyncio.sleep(check_every)
1055 detailed_status = await self.status_kdu(
1056 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
1057 )
1058 status = detailed_status.get("info").get("Description")
1059 self.log.debug("STATUS:\n{}".format(status))
1060 self.log.debug("DETAILED STATUS:\n{}".format(detailed_status))
1061 # write status to db
1062 result = await self.write_app_status_to_db(
1063 db_dict=db_dict,
1064 status=str(status),
1065 detailed_status=str(detailed_status),
1066 operation=operation,
1067 )
1068 if not result:
1069 self.log.info("Error writing in database. Task exiting...")
1070 return
1071 except asyncio.CancelledError:
1072 self.log.debug("Task cancelled")
1073 return
1074 except Exception as e:
1075 self.log.debug("_store_status exception: {}".format(str(e)))
1076 pass
1077 finally:
1078 if run_once:
1079 return
1080
1081 async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool:
1082
1083 status = await self._status_kdu(
1084 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False
1085 )
1086
1087 # extract info.status.resources-> str
1088 # format:
1089 # ==> v1/Deployment
1090 # NAME READY UP-TO-DATE AVAILABLE AGE
1091 # halting-horse-mongodb 0/1 1 0 0s
1092 # halting-petit-mongodb 1/1 1 0 0s
1093 # blank line
1094 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1095
1096 # convert to table
1097 resources = K8sHelmConnector._output_to_table(resources)
1098
1099 num_lines = len(resources)
1100 index = 0
1101 while index < num_lines:
1102 try:
1103 line1 = resources[index]
1104 index += 1
1105 # find '==>' in column 0
1106 if line1[0] == "==>":
1107 line2 = resources[index]
1108 index += 1
1109 # find READY in column 1
1110 if line2[1] == "READY":
1111 # read next lines
1112 line3 = resources[index]
1113 index += 1
1114 while len(line3) > 1 and index < num_lines:
1115 ready_value = line3[1]
1116 parts = ready_value.split(sep="/")
1117 current = int(parts[0])
1118 total = int(parts[1])
1119 if current < total:
1120 self.log.debug("NOT READY:\n {}".format(line3))
1121 ready = False
1122 line3 = resources[index]
1123 index += 1
1124
1125 except Exception:
1126 pass
1127
1128 return ready
1129
1130 @staticmethod
1131 def _get_deep(dictionary: dict, members: tuple):
1132 target = dictionary
1133 value = None
1134 try:
1135 for m in members:
1136 value = target.get(m)
1137 if not value:
1138 return None
1139 else:
1140 target = value
1141 except Exception:
1142 pass
1143 return value
1144
1145 # find key:value in several lines
1146 @staticmethod
1147 def _find_in_lines(p_lines: list, p_key: str) -> str:
1148 for line in p_lines:
1149 try:
1150 if line.startswith(p_key + ":"):
1151 parts = line.split(":")
1152 the_value = parts[1].strip()
1153 return the_value
1154 except Exception:
1155 # ignore it
1156 pass
1157 return None
1158
1159 # params for use in -f file
1160 # returns values file option and filename (in order to delete it at the end)
1161 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
1162
1163 if params and len(params) > 0:
1164 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
1165
1166 def get_random_number():
1167 r = random.randrange(start=1, stop=99999999)
1168 s = str(r)
1169 while len(s) < 10:
1170 s = "0" + s
1171 return s
1172
1173 params2 = dict()
1174 for key in params:
1175 value = params.get(key)
1176 if "!!yaml" in str(value):
1177 value = yaml.load(value[7:])
1178 params2[key] = value
1179
1180 values_file = get_random_number() + ".yaml"
1181 with open(values_file, "w") as stream:
1182 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1183
1184 return "-f {}".format(values_file), values_file
1185
1186 return "", None
1187
1188 # params for use in --set option
1189 @staticmethod
1190 def _params_to_set_option(params: dict) -> str:
1191 params_str = ""
1192 if params and len(params) > 0:
1193 start = True
1194 for key in params:
1195 value = params.get(key, None)
1196 if value is not None:
1197 if start:
1198 params_str += "--set "
1199 start = False
1200 else:
1201 params_str += ","
1202 params_str += "{}={}".format(key, value)
1203 return params_str
1204
1205 @staticmethod
1206 def _output_to_lines(output: str) -> list:
1207 output_lines = list()
1208 lines = output.splitlines(keepends=False)
1209 for line in lines:
1210 line = line.strip()
1211 if len(line) > 0:
1212 output_lines.append(line)
1213 return output_lines
1214
1215 @staticmethod
1216 def _output_to_table(output: str) -> list:
1217 output_table = list()
1218 lines = output.splitlines(keepends=False)
1219 for line in lines:
1220 line = line.replace("\t", " ")
1221 line_list = list()
1222 output_table.append(line_list)
1223 cells = line.split(sep=" ")
1224 for cell in cells:
1225 cell = cell.strip()
1226 if len(cell) > 0:
1227 line_list.append(cell)
1228 return output_table
1229
1230 def _get_paths(
1231 self, cluster_name: str, create_if_not_exist: bool = False
1232 ) -> (str, str, str, str):
1233 """
1234 Returns kube and helm directories
1235
1236 :param cluster_name:
1237 :param create_if_not_exist:
1238 :return: kube, helm directories, config filename and cluster dir.
1239 Raises exception if not exist and cannot create
1240 """
1241
1242 base = self.fs.path
1243 if base.endswith("/") or base.endswith("\\"):
1244 base = base[:-1]
1245
1246 # base dir for cluster
1247 cluster_dir = base + "/" + cluster_name
1248 if create_if_not_exist and not os.path.exists(cluster_dir):
1249 self.log.debug("Creating dir {}".format(cluster_dir))
1250 os.makedirs(cluster_dir)
1251 if not os.path.exists(cluster_dir):
1252 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1253 self.log.error(msg)
1254 raise K8sException(msg)
1255
1256 # kube dir
1257 kube_dir = cluster_dir + "/" + ".kube"
1258 if create_if_not_exist and not os.path.exists(kube_dir):
1259 self.log.debug("Creating dir {}".format(kube_dir))
1260 os.makedirs(kube_dir)
1261 if not os.path.exists(kube_dir):
1262 msg = "Kube config dir {} does not exist".format(kube_dir)
1263 self.log.error(msg)
1264 raise K8sException(msg)
1265
1266 # helm home dir
1267 helm_dir = cluster_dir + "/" + ".helm"
1268 if create_if_not_exist and not os.path.exists(helm_dir):
1269 self.log.debug("Creating dir {}".format(helm_dir))
1270 os.makedirs(helm_dir)
1271 if not os.path.exists(helm_dir):
1272 msg = "Helm config dir {} does not exist".format(helm_dir)
1273 self.log.error(msg)
1274 raise K8sException(msg)
1275
1276 config_filename = kube_dir + "/config"
1277 return kube_dir, helm_dir, config_filename, cluster_dir
1278
1279 @staticmethod
1280 def _remove_multiple_spaces(strobj):
1281 strobj = strobj.strip()
1282 while " " in strobj:
1283 strobj = strobj.replace(" ", " ")
1284 return strobj
1285
1286 def _local_exec(self, command: str) -> (str, int):
1287 command = K8sHelmConnector._remove_multiple_spaces(command)
1288 self.log.debug("Executing sync local command: {}".format(command))
1289 # raise exception if fails
1290 output = ""
1291 try:
1292 output = subprocess.check_output(
1293 command, shell=True, universal_newlines=True
1294 )
1295 return_code = 0
1296 self.log.debug(output)
1297 except Exception:
1298 return_code = 1
1299
1300 return output, return_code
1301
1302 async def _local_async_exec(
1303 self,
1304 command: str,
1305 raise_exception_on_error: bool = False,
1306 show_error_log: bool = True,
1307 encode_utf8: bool = False,
1308 ) -> (str, int):
1309
1310 command = K8sHelmConnector._remove_multiple_spaces(command)
1311 self.log.debug("Executing async local command: {}".format(command))
1312
1313 # split command
1314 command = command.split(sep=" ")
1315
1316 try:
1317 process = await asyncio.create_subprocess_exec(
1318 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1319 )
1320
1321 # wait for command terminate
1322 stdout, stderr = await process.communicate()
1323
1324 return_code = process.returncode
1325
1326 output = ""
1327 if stdout:
1328 output = stdout.decode("utf-8").strip()
1329 # output = stdout.decode()
1330 if stderr:
1331 output = stderr.decode("utf-8").strip()
1332 # output = stderr.decode()
1333
1334 if return_code != 0 and show_error_log:
1335 self.log.debug(
1336 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1337 )
1338 else:
1339 self.log.debug("Return code: {}".format(return_code))
1340
1341 if raise_exception_on_error and return_code != 0:
1342 raise K8sException(output)
1343
1344 if encode_utf8:
1345 output = output.encode("utf-8").strip()
1346 output = str(output).replace("\\n", "\n")
1347
1348 return output, return_code
1349
1350 except asyncio.CancelledError:
1351 raise
1352 except K8sException:
1353 raise
1354 except Exception as e:
1355 msg = "Exception executing command: {} -> {}".format(command, e)
1356 self.log.error(msg)
1357 if raise_exception_on_error:
1358 raise K8sException(e) from e
1359 else:
1360 return "", -1
1361
1362 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1363 # self.log.debug('Checking if file {} exists...'.format(filename))
1364 if os.path.exists(filename):
1365 return True
1366 else:
1367 msg = "File {} does not exist".format(filename)
1368 if exception_if_not_exists:
1369 # self.log.error(msg)
1370 raise K8sException(msg)