Tox doesn't like -
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43
44 def __init__(
45 self,
46 fs: object,
47 db: object,
48 kubectl_command: str = "/usr/bin/kubectl",
49 helm_command: str = "/usr/bin/helm",
50 log: object = None,
51 on_update_db=None,
52 ):
53 """
54
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
59 :param log: logger
60 :param on_update_db: callback called when k8s connector updates database
61 """
62
63 # parent class
64 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
65
66 self.log.info("Initializing K8S Helm connector")
67
68 # random numbers for release name generation
69 random.seed(time.time())
70
71 # the file system
72 self.fs = fs
73
74 # exception if kubectl is not installed
75 self.kubectl_command = kubectl_command
76 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
77
78 # exception if helm is not installed
79 self._helm_command = helm_command
80 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
81
82 # initialize helm client-only
83 self.log.debug("Initializing helm client-only...")
84 command = "{} init --client-only".format(self._helm_command)
85 try:
86 asyncio.ensure_future(
87 self._local_async_exec(command=command, raise_exception_on_error=False)
88 )
89 # loop = asyncio.get_event_loop()
90 # loop.run_until_complete(self._local_async_exec(command=command,
91 # raise_exception_on_error=False))
92 except Exception as e:
93 self.warning(
94 msg="helm init failed (it was already initialized): {}".format(e)
95 )
96
97 self.log.info("K8S Helm connector initialized")
98
99 async def init_env(
100 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
101 ) -> (str, bool):
102 """
103 It prepares a given K8s cluster environment to run Charts on both sides:
104 client (OSM)
105 server (Tiller)
106
107 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
108 '.kube/config'
109 :param namespace: optional namespace to be used for helm. By default,
110 'kube-system' will be used
111 :param reuse_cluster_uuid: existing cluster uuid for reuse
112 :return: uuid of the K8s cluster and True if connector has installed some
113 software in the cluster
114 (on error, an exception will be raised)
115 """
116
117 cluster_uuid = reuse_cluster_uuid
118 if not cluster_uuid:
119 cluster_uuid = str(uuid4())
120
121 self.log.debug("Initializing K8S environment. namespace: {}".format(namespace))
122
123 # create config filename
124 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
125 cluster_name=cluster_uuid, create_if_not_exist=True
126 )
127 with open(config_filename, "w") as f:
128 f.write(k8s_creds)
129
130 # check if tiller pod is up in cluster
131 command = "{} --kubeconfig={} --namespace={} get deployments".format(
132 self.kubectl_command, config_filename, namespace
133 )
134 output, _rc = await self._local_async_exec(
135 command=command, raise_exception_on_error=True
136 )
137
138 output_table = self._output_to_table(output=output)
139
140 # find 'tiller' pod in all pods
141 already_initialized = False
142 try:
143 for row in output_table:
144 if row[0].startswith("tiller-deploy"):
145 already_initialized = True
146 break
147 except Exception:
148 pass
149
150 # helm init
151 n2vc_installed_sw = False
152 if not already_initialized:
153 self.log.info(
154 "Initializing helm in client and server: {}".format(cluster_uuid)
155 )
156 command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format(
157 self._helm_command, config_filename, namespace, helm_dir
158 )
159 output, _rc = await self._local_async_exec(
160 command=command, raise_exception_on_error=True
161 )
162 n2vc_installed_sw = True
163 else:
164 # check client helm installation
165 check_file = helm_dir + "/repository/repositories.yaml"
166 if not self._check_file_exists(
167 filename=check_file, exception_if_not_exists=False
168 ):
169 self.log.info("Initializing helm in client: {}".format(cluster_uuid))
170 command = (
171 "{} --kubeconfig={} --tiller-namespace={} "
172 "--home={} init --client-only"
173 ).format(self._helm_command, config_filename, namespace, helm_dir)
174 output, _rc = await self._local_async_exec(
175 command=command, raise_exception_on_error=True
176 )
177 else:
178 self.log.info("Helm client already initialized")
179
180 self.log.info("Cluster initialized {}".format(cluster_uuid))
181
182 return cluster_uuid, n2vc_installed_sw
183
184 async def repo_add(
185 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
186 ):
187
188 self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url))
189
190 # config filename
191 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
192 cluster_name=cluster_uuid, create_if_not_exist=True
193 )
194
195 # helm repo update
196 command = "{} --kubeconfig={} --home={} repo update".format(
197 self._helm_command, config_filename, helm_dir
198 )
199 self.log.debug("updating repo: {}".format(command))
200 await self._local_async_exec(command=command, raise_exception_on_error=False)
201
202 # helm repo add name url
203 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
204 self._helm_command, config_filename, helm_dir, name, url
205 )
206 self.log.debug("adding repo: {}".format(command))
207 await self._local_async_exec(command=command, raise_exception_on_error=True)
208
209 async def repo_list(self, cluster_uuid: str) -> list:
210 """
211 Get the list of registered repositories
212
213 :return: list of registered repositories: [ (name, url) .... ]
214 """
215
216 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
217
218 # config filename
219 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
220 cluster_name=cluster_uuid, create_if_not_exist=True
221 )
222
223 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
224 self._helm_command, config_filename, helm_dir
225 )
226
227 output, _rc = await self._local_async_exec(
228 command=command, raise_exception_on_error=True
229 )
230 if output and len(output) > 0:
231 return yaml.load(output, Loader=yaml.SafeLoader)
232 else:
233 return []
234
235 async def repo_remove(self, cluster_uuid: str, name: str):
236 """
237 Remove a repository from OSM
238
239 :param cluster_uuid: the cluster
240 :param name: repo name in OSM
241 :return: True if successful
242 """
243
244 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
245
246 # config filename
247 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
248 cluster_name=cluster_uuid, create_if_not_exist=True
249 )
250
251 command = "{} --kubeconfig={} --home={} repo remove {}".format(
252 self._helm_command, config_filename, helm_dir, name
253 )
254
255 await self._local_async_exec(command=command, raise_exception_on_error=True)
256
257 async def reset(
258 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
259 ) -> bool:
260
261 self.log.debug(
262 "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid)
263 )
264
265 # get kube and helm directories
266 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
267 cluster_name=cluster_uuid, create_if_not_exist=False
268 )
269
270 # uninstall releases if needed
271 releases = await self.instances_list(cluster_uuid=cluster_uuid)
272 if len(releases) > 0:
273 if force:
274 for r in releases:
275 try:
276 kdu_instance = r.get("Name")
277 chart = r.get("Chart")
278 self.log.debug(
279 "Uninstalling {} -> {}".format(chart, kdu_instance)
280 )
281 await self.uninstall(
282 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
283 )
284 except Exception as e:
285 self.log.error(
286 "Error uninstalling release {}: {}".format(kdu_instance, e)
287 )
288 else:
289 msg = (
290 "Cluster has releases and not force. Cannot reset K8s "
291 "environment. Cluster uuid: {}"
292 ).format(cluster_uuid)
293 self.log.error(msg)
294 raise K8sException(msg)
295
296 if uninstall_sw:
297
298 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid))
299
300 # find namespace for tiller pod
301 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
302 self.kubectl_command, config_filename
303 )
304 output, _rc = await self._local_async_exec(
305 command=command, raise_exception_on_error=False
306 )
307 output_table = K8sHelmConnector._output_to_table(output=output)
308 namespace = None
309 for r in output_table:
310 try:
311 if "tiller-deploy" in r[1]:
312 namespace = r[0]
313 break
314 except Exception:
315 pass
316 else:
317 msg = "Tiller deployment not found in cluster {}".format(cluster_uuid)
318 self.log.error(msg)
319
320 self.log.debug("namespace for tiller: {}".format(namespace))
321
322 force_str = "--force"
323
324 if namespace:
325 # delete tiller deployment
326 self.log.debug(
327 "Deleting tiller deployment for cluster {}, namespace {}".format(
328 cluster_uuid, namespace
329 )
330 )
331 command = (
332 "{} --namespace {} --kubeconfig={} {} delete deployment "
333 "tiller-deploy"
334 ).format(self.kubectl_command, namespace, config_filename, force_str)
335 await self._local_async_exec(
336 command=command, raise_exception_on_error=False
337 )
338
339 # uninstall tiller from cluster
340 self.log.debug(
341 "Uninstalling tiller from cluster {}".format(cluster_uuid)
342 )
343 command = "{} --kubeconfig={} --home={} reset".format(
344 self._helm_command, config_filename, helm_dir
345 )
346 self.log.debug("resetting: {}".format(command))
347 output, _rc = await self._local_async_exec(
348 command=command, raise_exception_on_error=True
349 )
350 else:
351 self.log.debug("namespace not found")
352
353 # delete cluster directory
354 direct = self.fs.path + "/" + cluster_uuid
355 self.log.debug("Removing directory {}".format(direct))
356 shutil.rmtree(direct, ignore_errors=True)
357
358 return True
359
360 async def install(
361 self,
362 cluster_uuid: str,
363 kdu_model: str,
364 atomic: bool = True,
365 timeout: float = 300,
366 params: dict = None,
367 db_dict: dict = None,
368 kdu_name: str = None,
369 namespace: str = None,
370 ):
371
372 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
373
374 # config filename
375 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
376 cluster_name=cluster_uuid, create_if_not_exist=True
377 )
378
379 # params to str
380 # params_str = K8sHelmConnector._params_to_set_option(params)
381 params_str, file_to_delete = self._params_to_file_option(
382 cluster_uuid=cluster_uuid, params=params
383 )
384
385 timeout_str = ""
386 if timeout:
387 timeout_str = "--timeout {}".format(timeout)
388
389 # atomic
390 atomic_str = ""
391 if atomic:
392 atomic_str = "--atomic"
393 # namespace
394 namespace_str = ""
395 if namespace:
396 namespace_str = "--namespace {}".format(namespace)
397
398 # version
399 version_str = ""
400 if ":" in kdu_model:
401 parts = kdu_model.split(sep=":")
402 if len(parts) == 2:
403 version_str = "--version {}".format(parts[1])
404 kdu_model = parts[0]
405
406 # generate a name for the release. Then, check if already exists
407 kdu_instance = None
408 while kdu_instance is None:
409 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
410 try:
411 result = await self._status_kdu(
412 cluster_uuid=cluster_uuid,
413 kdu_instance=kdu_instance,
414 show_error_log=False,
415 )
416 if result is not None:
417 # instance already exists: generate a new one
418 kdu_instance = None
419 except K8sException:
420 pass
421
422 # helm repo install
423 command = (
424 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
425 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
426 helm=self._helm_command,
427 atomic=atomic_str,
428 config=config_filename,
429 dir=helm_dir,
430 params=params_str,
431 timeout=timeout_str,
432 name=kdu_instance,
433 ns=namespace_str,
434 model=kdu_model,
435 ver=version_str,
436 )
437 )
438 self.log.debug("installing: {}".format(command))
439
440 if atomic:
441 # exec helm in a task
442 exec_task = asyncio.ensure_future(
443 coro_or_future=self._local_async_exec(
444 command=command, raise_exception_on_error=False
445 )
446 )
447
448 # write status in another task
449 status_task = asyncio.ensure_future(
450 coro_or_future=self._store_status(
451 cluster_uuid=cluster_uuid,
452 kdu_instance=kdu_instance,
453 db_dict=db_dict,
454 operation="install",
455 run_once=False,
456 )
457 )
458
459 # wait for execution task
460 await asyncio.wait([exec_task])
461
462 # cancel status task
463 status_task.cancel()
464
465 output, rc = exec_task.result()
466
467 else:
468
469 output, rc = await self._local_async_exec(
470 command=command, raise_exception_on_error=False
471 )
472
473 # remove temporal values yaml file
474 if file_to_delete:
475 os.remove(file_to_delete)
476
477 # write final status
478 await self._store_status(
479 cluster_uuid=cluster_uuid,
480 kdu_instance=kdu_instance,
481 db_dict=db_dict,
482 operation="install",
483 run_once=True,
484 check_every=0,
485 )
486
487 if rc != 0:
488 msg = "Error executing command: {}\nOutput: {}".format(command, output)
489 self.log.error(msg)
490 raise K8sException(msg)
491
492 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
493 return kdu_instance
494
495 async def instances_list(self, cluster_uuid: str) -> list:
496 """
497 returns a list of deployed releases in a cluster
498
499 :param cluster_uuid: the cluster
500 :return:
501 """
502
503 self.log.debug("list releases for cluster {}".format(cluster_uuid))
504
505 # config filename
506 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
507 cluster_name=cluster_uuid, create_if_not_exist=True
508 )
509
510 command = "{} --kubeconfig={} --home={} list --output yaml".format(
511 self._helm_command, config_filename, helm_dir
512 )
513
514 output, _rc = await self._local_async_exec(
515 command=command, raise_exception_on_error=True
516 )
517
518 if output and len(output) > 0:
519 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
520 else:
521 return []
522
523 async def upgrade(
524 self,
525 cluster_uuid: str,
526 kdu_instance: str,
527 kdu_model: str = None,
528 atomic: bool = True,
529 timeout: float = 300,
530 params: dict = None,
531 db_dict: dict = None,
532 ):
533
534 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
535
536 # config filename
537 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
538 cluster_name=cluster_uuid, create_if_not_exist=True
539 )
540
541 # params to str
542 # params_str = K8sHelmConnector._params_to_set_option(params)
543 params_str, file_to_delete = self._params_to_file_option(
544 cluster_uuid=cluster_uuid, params=params
545 )
546
547 timeout_str = ""
548 if timeout:
549 timeout_str = "--timeout {}".format(timeout)
550
551 # atomic
552 atomic_str = ""
553 if atomic:
554 atomic_str = "--atomic"
555
556 # version
557 version_str = ""
558 if kdu_model and ":" in kdu_model:
559 parts = kdu_model.split(sep=":")
560 if len(parts) == 2:
561 version_str = "--version {}".format(parts[1])
562 kdu_model = parts[0]
563
564 # helm repo upgrade
565 command = (
566 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
567 ).format(
568 self._helm_command,
569 atomic_str,
570 config_filename,
571 helm_dir,
572 params_str,
573 timeout_str,
574 kdu_instance,
575 kdu_model,
576 version_str,
577 )
578 self.log.debug("upgrading: {}".format(command))
579
580 if atomic:
581
582 # exec helm in a task
583 exec_task = asyncio.ensure_future(
584 coro_or_future=self._local_async_exec(
585 command=command, raise_exception_on_error=False
586 )
587 )
588 # write status in another task
589 status_task = asyncio.ensure_future(
590 coro_or_future=self._store_status(
591 cluster_uuid=cluster_uuid,
592 kdu_instance=kdu_instance,
593 db_dict=db_dict,
594 operation="upgrade",
595 run_once=False,
596 )
597 )
598
599 # wait for execution task
600 await asyncio.wait([exec_task])
601
602 # cancel status task
603 status_task.cancel()
604 output, rc = exec_task.result()
605
606 else:
607
608 output, rc = await self._local_async_exec(
609 command=command, raise_exception_on_error=False
610 )
611
612 # remove temporal values yaml file
613 if file_to_delete:
614 os.remove(file_to_delete)
615
616 # write final status
617 await self._store_status(
618 cluster_uuid=cluster_uuid,
619 kdu_instance=kdu_instance,
620 db_dict=db_dict,
621 operation="upgrade",
622 run_once=True,
623 check_every=0,
624 )
625
626 if rc != 0:
627 msg = "Error executing command: {}\nOutput: {}".format(command, output)
628 self.log.error(msg)
629 raise K8sException(msg)
630
631 # return new revision number
632 instance = await self.get_instance_info(
633 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
634 )
635 if instance:
636 revision = int(instance.get("Revision"))
637 self.log.debug("New revision: {}".format(revision))
638 return revision
639 else:
640 return 0
641
642 async def rollback(
643 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
644 ):
645
646 self.log.debug(
647 "rollback kdu_instance {} to revision {} from cluster {}".format(
648 kdu_instance, revision, cluster_uuid
649 )
650 )
651
652 # config filename
653 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
654 cluster_name=cluster_uuid, create_if_not_exist=True
655 )
656
657 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
658 self._helm_command, config_filename, helm_dir, kdu_instance, revision
659 )
660
661 # exec helm in a task
662 exec_task = asyncio.ensure_future(
663 coro_or_future=self._local_async_exec(
664 command=command, raise_exception_on_error=False
665 )
666 )
667 # write status in another task
668 status_task = asyncio.ensure_future(
669 coro_or_future=self._store_status(
670 cluster_uuid=cluster_uuid,
671 kdu_instance=kdu_instance,
672 db_dict=db_dict,
673 operation="rollback",
674 run_once=False,
675 )
676 )
677
678 # wait for execution task
679 await asyncio.wait([exec_task])
680
681 # cancel status task
682 status_task.cancel()
683
684 output, rc = exec_task.result()
685
686 # write final status
687 await self._store_status(
688 cluster_uuid=cluster_uuid,
689 kdu_instance=kdu_instance,
690 db_dict=db_dict,
691 operation="rollback",
692 run_once=True,
693 check_every=0,
694 )
695
696 if rc != 0:
697 msg = "Error executing command: {}\nOutput: {}".format(command, output)
698 self.log.error(msg)
699 raise K8sException(msg)
700
701 # return new revision number
702 instance = await self.get_instance_info(
703 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
704 )
705 if instance:
706 revision = int(instance.get("Revision"))
707 self.log.debug("New revision: {}".format(revision))
708 return revision
709 else:
710 return 0
711
712 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
713 """
714 Removes an existing KDU instance. It would implicitly use the `delete` call
715 (this call would happen after all _terminate-config-primitive_ of the VNF
716 are invoked).
717
718 :param cluster_uuid: UUID of a K8s cluster known by OSM
719 :param kdu_instance: unique name for the KDU instance to be deleted
720 :return: True if successful
721 """
722
723 self.log.debug(
724 "uninstall kdu_instance {} from cluster {}".format(
725 kdu_instance, cluster_uuid
726 )
727 )
728
729 # config filename
730 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
731 cluster_name=cluster_uuid, create_if_not_exist=True
732 )
733
734 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
735 self._helm_command, config_filename, helm_dir, kdu_instance
736 )
737
738 output, _rc = await self._local_async_exec(
739 command=command, raise_exception_on_error=True
740 )
741
742 return self._output_to_table(output)
743
744 async def exec_primitive(
745 self,
746 cluster_uuid: str = None,
747 kdu_instance: str = None,
748 primitive_name: str = None,
749 timeout: float = 300,
750 params: dict = None,
751 db_dict: dict = None,
752 ) -> str:
753 """Exec primitive (Juju action)
754
755 :param cluster_uuid str: The UUID of the cluster
756 :param kdu_instance str: The unique name of the KDU instance
757 :param primitive_name: Name of action that will be executed
758 :param timeout: Timeout for action execution
759 :param params: Dictionary of all the parameters needed for the action
760 :db_dict: Dictionary for any additional data
761
762 :return: Returns the output of the action
763 """
764 raise K8sException(
765 "KDUs deployed with Helm don't support actions "
766 "different from rollback, upgrade and status"
767 )
768
769 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
770
771 self.log.debug(
772 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
773 )
774
775 return await self._exec_inspect_comand(
776 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
777 )
778
779 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
780
781 self.log.debug(
782 "inspect kdu_model values {} from (optional) repo: {}".format(
783 kdu_model, repo_url
784 )
785 )
786
787 return await self._exec_inspect_comand(
788 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
789 )
790
791 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
792
793 self.log.debug(
794 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
795 )
796
797 return await self._exec_inspect_comand(
798 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
799 )
800
801 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
802
803 # call internal function
804 return await self._status_kdu(
805 cluster_uuid=cluster_uuid,
806 kdu_instance=kdu_instance,
807 show_error_log=True,
808 return_text=True,
809 )
810
811 async def synchronize_repos(self, cluster_uuid: str):
812
813 self.log.debug("syncronize repos for cluster helm-id: {}",)
814 try:
815 update_repos_timeout = (
816 300 # max timeout to sync a single repos, more than this is too much
817 )
818 db_k8scluster = self.db.get_one(
819 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
820 )
821 if db_k8scluster:
822 nbi_repo_list = (
823 db_k8scluster.get("_admin").get("helm_chart_repos") or []
824 )
825 cluster_repo_dict = (
826 db_k8scluster.get("_admin").get("helm_charts_added") or {}
827 )
828 # elements that must be deleted
829 deleted_repo_list = []
830 added_repo_dict = {}
831 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
832 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
833
834 # obtain repos to add: registered by nbi but not added
835 repos_to_add = [
836 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
837 ]
838
839 # obtain repos to delete: added by cluster but not in nbi list
840 repos_to_delete = [
841 repo
842 for repo in cluster_repo_dict.keys()
843 if repo not in nbi_repo_list
844 ]
845
846 # delete repos: must delete first then add because there may be
847 # different repos with same name but
848 # different id and url
849 self.log.debug("repos to delete: {}".format(repos_to_delete))
850 for repo_id in repos_to_delete:
851 # try to delete repos
852 try:
853 repo_delete_task = asyncio.ensure_future(
854 self.repo_remove(
855 cluster_uuid=cluster_uuid,
856 name=cluster_repo_dict[repo_id],
857 )
858 )
859 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
860 except Exception as e:
861 self.warning(
862 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
863 repo_id, cluster_repo_dict[repo_id], str(e)
864 )
865 )
866 # always add to the list of to_delete if there is an error
867 # because if is not there
868 # deleting raises error
869 deleted_repo_list.append(repo_id)
870
871 # add repos
872 self.log.debug("repos to add: {}".format(repos_to_add))
873 for repo_id in repos_to_add:
874 # obtain the repo data from the db
875 # if there is an error getting the repo in the database we will
876 # ignore this repo and continue
877 # because there is a possible race condition where the repo has
878 # been deleted while processing
879 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
880 self.log.debug(
881 "obtained repo: id, {}, name: {}, url: {}".format(
882 repo_id, db_repo["name"], db_repo["url"]
883 )
884 )
885 try:
886 repo_add_task = asyncio.ensure_future(
887 self.repo_add(
888 cluster_uuid=cluster_uuid,
889 name=db_repo["name"],
890 url=db_repo["url"],
891 repo_type="chart",
892 )
893 )
894 await asyncio.wait_for(repo_add_task, update_repos_timeout)
895 added_repo_dict[repo_id] = db_repo["name"]
896 self.log.debug(
897 "added repo: id, {}, name: {}".format(
898 repo_id, db_repo["name"]
899 )
900 )
901 except Exception as e:
902 # deal with error adding repo, adding a repo that already
903 # exists does not raise any error
904 # will not raise error because a wrong repos added by
905 # anyone could prevent instantiating any ns
906 self.log.error(
907 "Error adding repo id: {}, err_msg: {} ".format(
908 repo_id, repr(e)
909 )
910 )
911
912 return deleted_repo_list, added_repo_dict
913
914 else: # else db_k8scluster does not exist
915 raise K8sException(
916 "k8cluster with helm-id : {} not found".format(cluster_uuid)
917 )
918
919 except Exception as e:
920 self.log.error("Error synchronizing repos: {}".format(str(e)))
921 raise K8sException("Error synchronizing repos")
922
923 """
924 ####################################################################################
925 ################################### P R I V A T E ##################################
926 ####################################################################################
927 """
928
929 async def _exec_inspect_comand(
930 self, inspect_command: str, kdu_model: str, repo_url: str = None
931 ):
932
933 repo_str = ""
934 if repo_url:
935 repo_str = " --repo {}".format(repo_url)
936 idx = kdu_model.find("/")
937 if idx >= 0:
938 idx += 1
939 kdu_model = kdu_model[idx:]
940
941 inspect_command = "{} inspect {} {}{}".format(
942 self._helm_command, inspect_command, kdu_model, repo_str
943 )
944 output, _rc = await self._local_async_exec(
945 command=inspect_command, encode_utf8=True
946 )
947
948 return output
949
950 async def _status_kdu(
951 self,
952 cluster_uuid: str,
953 kdu_instance: str,
954 show_error_log: bool = False,
955 return_text: bool = False,
956 ):
957
958 self.log.debug("status of kdu_instance {}".format(kdu_instance))
959
960 # config filename
961 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
962 cluster_name=cluster_uuid, create_if_not_exist=True
963 )
964
965 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
966 self._helm_command, config_filename, helm_dir, kdu_instance
967 )
968
969 output, rc = await self._local_async_exec(
970 command=command,
971 raise_exception_on_error=True,
972 show_error_log=show_error_log,
973 )
974
975 if return_text:
976 return str(output)
977
978 if rc != 0:
979 return None
980
981 data = yaml.load(output, Loader=yaml.SafeLoader)
982
983 # remove field 'notes'
984 try:
985 del data.get("info").get("status")["notes"]
986 except KeyError:
987 pass
988
989 # parse field 'resources'
990 try:
991 resources = str(data.get("info").get("status").get("resources"))
992 resource_table = self._output_to_table(resources)
993 data.get("info").get("status")["resources"] = resource_table
994 except Exception:
995 pass
996
997 return data
998
999 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1000 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1001 for instance in instances:
1002 if instance.get("Name") == kdu_instance:
1003 return instance
1004 self.log.debug("Instance {} not found".format(kdu_instance))
1005 return None
1006
1007 @staticmethod
1008 def _generate_release_name(chart_name: str):
1009 # check embeded chart (file or dir)
1010 if chart_name.startswith("/"):
1011 # extract file or directory name
1012 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1013 # check URL
1014 elif "://" in chart_name:
1015 # extract last portion of URL
1016 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1017
1018 name = ""
1019 for c in chart_name:
1020 if c.isalpha() or c.isnumeric():
1021 name += c
1022 else:
1023 name += "-"
1024 if len(name) > 35:
1025 name = name[0:35]
1026
1027 # if does not start with alpha character, prefix 'a'
1028 if not name[0].isalpha():
1029 name = "a" + name
1030
1031 name += "-"
1032
1033 def get_random_number():
1034 r = random.randrange(start=1, stop=99999999)
1035 s = str(r)
1036 s = s.rjust(10, "0")
1037 return s
1038
1039 name = name + get_random_number()
1040 return name.lower()
1041
1042 async def _store_status(
1043 self,
1044 cluster_uuid: str,
1045 operation: str,
1046 kdu_instance: str,
1047 check_every: float = 10,
1048 db_dict: dict = None,
1049 run_once: bool = False,
1050 ):
1051 while True:
1052 try:
1053 await asyncio.sleep(check_every)
1054 detailed_status = await self._status_kdu(
1055 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance,
1056 return_text=False
1057 )
1058 status = detailed_status.get("info").get("Description")
1059 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1060 # write status to db
1061 result = await self.write_app_status_to_db(
1062 db_dict=db_dict,
1063 status=str(status),
1064 detailed_status=str(detailed_status),
1065 operation=operation,
1066 )
1067 if not result:
1068 self.log.info("Error writing in database. Task exiting...")
1069 return
1070 except asyncio.CancelledError:
1071 self.log.debug("Task cancelled")
1072 return
1073 except Exception as e:
1074 self.log.debug("_store_status exception: {}".format(str(e)))
1075 pass
1076 finally:
1077 if run_once:
1078 return
1079
1080 async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool:
1081
1082 status = await self._status_kdu(
1083 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False
1084 )
1085
1086 # extract info.status.resources-> str
1087 # format:
1088 # ==> v1/Deployment
1089 # NAME READY UP-TO-DATE AVAILABLE AGE
1090 # halting-horse-mongodb 0/1 1 0 0s
1091 # halting-petit-mongodb 1/1 1 0 0s
1092 # blank line
1093 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1094
1095 # convert to table
1096 resources = K8sHelmConnector._output_to_table(resources)
1097
1098 num_lines = len(resources)
1099 index = 0
1100 while index < num_lines:
1101 try:
1102 line1 = resources[index]
1103 index += 1
1104 # find '==>' in column 0
1105 if line1[0] == "==>":
1106 line2 = resources[index]
1107 index += 1
1108 # find READY in column 1
1109 if line2[1] == "READY":
1110 # read next lines
1111 line3 = resources[index]
1112 index += 1
1113 while len(line3) > 1 and index < num_lines:
1114 ready_value = line3[1]
1115 parts = ready_value.split(sep="/")
1116 current = int(parts[0])
1117 total = int(parts[1])
1118 if current < total:
1119 self.log.debug("NOT READY:\n {}".format(line3))
1120 ready = False
1121 line3 = resources[index]
1122 index += 1
1123
1124 except Exception:
1125 pass
1126
1127 return ready
1128
1129 @staticmethod
1130 def _get_deep(dictionary: dict, members: tuple):
1131 target = dictionary
1132 value = None
1133 try:
1134 for m in members:
1135 value = target.get(m)
1136 if not value:
1137 return None
1138 else:
1139 target = value
1140 except Exception:
1141 pass
1142 return value
1143
1144 # find key:value in several lines
1145 @staticmethod
1146 def _find_in_lines(p_lines: list, p_key: str) -> str:
1147 for line in p_lines:
1148 try:
1149 if line.startswith(p_key + ":"):
1150 parts = line.split(":")
1151 the_value = parts[1].strip()
1152 return the_value
1153 except Exception:
1154 # ignore it
1155 pass
1156 return None
1157
1158 # params for use in -f file
1159 # returns values file option and filename (in order to delete it at the end)
1160 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
1161
1162 if params and len(params) > 0:
1163 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
1164
1165 def get_random_number():
1166 r = random.randrange(start=1, stop=99999999)
1167 s = str(r)
1168 while len(s) < 10:
1169 s = "0" + s
1170 return s
1171
1172 params2 = dict()
1173 for key in params:
1174 value = params.get(key)
1175 if "!!yaml" in str(value):
1176 value = yaml.load(value[7:])
1177 params2[key] = value
1178
1179 values_file = get_random_number() + ".yaml"
1180 with open(values_file, "w") as stream:
1181 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1182
1183 return "-f {}".format(values_file), values_file
1184
1185 return "", None
1186
1187 # params for use in --set option
1188 @staticmethod
1189 def _params_to_set_option(params: dict) -> str:
1190 params_str = ""
1191 if params and len(params) > 0:
1192 start = True
1193 for key in params:
1194 value = params.get(key, None)
1195 if value is not None:
1196 if start:
1197 params_str += "--set "
1198 start = False
1199 else:
1200 params_str += ","
1201 params_str += "{}={}".format(key, value)
1202 return params_str
1203
1204 @staticmethod
1205 def _output_to_lines(output: str) -> list:
1206 output_lines = list()
1207 lines = output.splitlines(keepends=False)
1208 for line in lines:
1209 line = line.strip()
1210 if len(line) > 0:
1211 output_lines.append(line)
1212 return output_lines
1213
1214 @staticmethod
1215 def _output_to_table(output: str) -> list:
1216 output_table = list()
1217 lines = output.splitlines(keepends=False)
1218 for line in lines:
1219 line = line.replace("\t", " ")
1220 line_list = list()
1221 output_table.append(line_list)
1222 cells = line.split(sep=" ")
1223 for cell in cells:
1224 cell = cell.strip()
1225 if len(cell) > 0:
1226 line_list.append(cell)
1227 return output_table
1228
1229 def _get_paths(
1230 self, cluster_name: str, create_if_not_exist: bool = False
1231 ) -> (str, str, str, str):
1232 """
1233 Returns kube and helm directories
1234
1235 :param cluster_name:
1236 :param create_if_not_exist:
1237 :return: kube, helm directories, config filename and cluster dir.
1238 Raises exception if not exist and cannot create
1239 """
1240
1241 base = self.fs.path
1242 if base.endswith("/") or base.endswith("\\"):
1243 base = base[:-1]
1244
1245 # base dir for cluster
1246 cluster_dir = base + "/" + cluster_name
1247 if create_if_not_exist and not os.path.exists(cluster_dir):
1248 self.log.debug("Creating dir {}".format(cluster_dir))
1249 os.makedirs(cluster_dir)
1250 if not os.path.exists(cluster_dir):
1251 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1252 self.log.error(msg)
1253 raise K8sException(msg)
1254
1255 # kube dir
1256 kube_dir = cluster_dir + "/" + ".kube"
1257 if create_if_not_exist and not os.path.exists(kube_dir):
1258 self.log.debug("Creating dir {}".format(kube_dir))
1259 os.makedirs(kube_dir)
1260 if not os.path.exists(kube_dir):
1261 msg = "Kube config dir {} does not exist".format(kube_dir)
1262 self.log.error(msg)
1263 raise K8sException(msg)
1264
1265 # helm home dir
1266 helm_dir = cluster_dir + "/" + ".helm"
1267 if create_if_not_exist and not os.path.exists(helm_dir):
1268 self.log.debug("Creating dir {}".format(helm_dir))
1269 os.makedirs(helm_dir)
1270 if not os.path.exists(helm_dir):
1271 msg = "Helm config dir {} does not exist".format(helm_dir)
1272 self.log.error(msg)
1273 raise K8sException(msg)
1274
1275 config_filename = kube_dir + "/config"
1276 return kube_dir, helm_dir, config_filename, cluster_dir
1277
1278 @staticmethod
1279 def _remove_multiple_spaces(strobj):
1280 strobj = strobj.strip()
1281 while " " in strobj:
1282 strobj = strobj.replace(" ", " ")
1283 return strobj
1284
1285 def _local_exec(self, command: str) -> (str, int):
1286 command = K8sHelmConnector._remove_multiple_spaces(command)
1287 self.log.debug("Executing sync local command: {}".format(command))
1288 # raise exception if fails
1289 output = ""
1290 try:
1291 output = subprocess.check_output(
1292 command, shell=True, universal_newlines=True
1293 )
1294 return_code = 0
1295 self.log.debug(output)
1296 except Exception:
1297 return_code = 1
1298
1299 return output, return_code
1300
1301 async def _local_async_exec(
1302 self,
1303 command: str,
1304 raise_exception_on_error: bool = False,
1305 show_error_log: bool = True,
1306 encode_utf8: bool = False,
1307 ) -> (str, int):
1308
1309 command = K8sHelmConnector._remove_multiple_spaces(command)
1310 self.log.debug("Executing async local command: {}".format(command))
1311
1312 # split command
1313 command = command.split(sep=" ")
1314
1315 try:
1316 process = await asyncio.create_subprocess_exec(
1317 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1318 )
1319
1320 # wait for command terminate
1321 stdout, stderr = await process.communicate()
1322
1323 return_code = process.returncode
1324
1325 output = ""
1326 if stdout:
1327 output = stdout.decode("utf-8").strip()
1328 # output = stdout.decode()
1329 if stderr:
1330 output = stderr.decode("utf-8").strip()
1331 # output = stderr.decode()
1332
1333 if return_code != 0 and show_error_log:
1334 self.log.debug(
1335 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1336 )
1337 else:
1338 self.log.debug("Return code: {}".format(return_code))
1339
1340 if raise_exception_on_error and return_code != 0:
1341 raise K8sException(output)
1342
1343 if encode_utf8:
1344 output = output.encode("utf-8").strip()
1345 output = str(output).replace("\\n", "\n")
1346
1347 return output, return_code
1348
1349 except asyncio.CancelledError:
1350 raise
1351 except K8sException:
1352 raise
1353 except Exception as e:
1354 msg = "Exception executing command: {} -> {}".format(command, e)
1355 self.log.error(msg)
1356 if raise_exception_on_error:
1357 raise K8sException(e) from e
1358 else:
1359 return "", -1
1360
1361 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1362 # self.log.debug('Checking if file {} exists...'.format(filename))
1363 if os.path.exists(filename):
1364 return True
1365 else:
1366 msg = "File {} does not exist".format(filename)
1367 if exception_if_not_exists:
1368 # self.log.error(msg)
1369 raise K8sException(msg)