2e7442372aeb62c1d723c8940d7b5fddbbffce02
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43 service_account = "osm"
44
45 def __init__(
46 self,
47 fs: object,
48 db: object,
49 kubectl_command: str = "/usr/bin/kubectl",
50 helm_command: str = "/usr/bin/helm",
51 log: object = None,
52 on_update_db=None,
53 ):
54 """
55
56 :param fs: file system for kubernetes and helm configuration
57 :param db: database object to write current operation status
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
60 :param log: logger
61 :param on_update_db: callback called when k8s connector updates database
62 """
63
64 # parent class
65 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
66
67 self.log.info("Initializing K8S Helm connector")
68
69 # random numbers for release name generation
70 random.seed(time.time())
71
72 # the file system
73 self.fs = fs
74
75 # exception if kubectl is not installed
76 self.kubectl_command = kubectl_command
77 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
78
79 # exception if helm is not installed
80 self._helm_command = helm_command
81 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
82
83 # initialize helm client-only
84 self.log.debug("Initializing helm client-only...")
85 command = "{} init --client-only".format(self._helm_command)
86 try:
87 asyncio.ensure_future(
88 self._local_async_exec(command=command, raise_exception_on_error=False)
89 )
90 # loop = asyncio.get_event_loop()
91 # loop.run_until_complete(self._local_async_exec(command=command,
92 # raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(
95 msg="helm init failed (it was already initialized): {}".format(e)
96 )
97
98 self.log.info("K8S Helm connector initialized")
99
100 @staticmethod
101 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
102 """
103 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
104 cluster_id for backward compatibility
105 """
106 namespace, _, cluster_id = cluster_uuid.rpartition(':')
107 return namespace, cluster_id
108
109 async def init_env(
110 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts on both sides:
114 client (OSM)
115 server (Tiller)
116
117 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 '.kube/config'
119 :param namespace: optional namespace to be used for helm. By default,
120 'kube-system' will be used
121 :param reuse_cluster_uuid: existing cluster uuid for reuse
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
129 namespace = namespace_ or namespace
130 else:
131 cluster_id = str(uuid4())
132 cluster_uuid = "{}:{}".format(namespace, cluster_id)
133
134 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
135
136 # create config filename
137 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
138 cluster_name=cluster_id, create_if_not_exist=True
139 )
140 with open(config_filename, "w") as f:
141 f.write(k8s_creds)
142
143 # check if tiller pod is up in cluster
144 command = "{} --kubeconfig={} --namespace={} get deployments".format(
145 self.kubectl_command, config_filename, namespace
146 )
147 output, _rc = await self._local_async_exec(
148 command=command, raise_exception_on_error=True
149 )
150
151 output_table = self._output_to_table(output=output)
152
153 # find 'tiller' pod in all pods
154 already_initialized = False
155 try:
156 for row in output_table:
157 if row[0].startswith("tiller-deploy"):
158 already_initialized = True
159 break
160 except Exception:
161 pass
162
163 # helm init
164 n2vc_installed_sw = False
165 if not already_initialized:
166 self.log.info(
167 "Initializing helm in client and server: {}".format(cluster_id)
168 )
169 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
170 self.kubectl_command, config_filename, self.service_account)
171 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
172
173 command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
174 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
175 ).format(self.kubectl_command, config_filename, self.service_account)
176 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
179 "init").format(self._helm_command, config_filename, namespace, helm_dir,
180 self.service_account)
181 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
182 n2vc_installed_sw = True
183 else:
184 # check client helm installation
185 check_file = helm_dir + "/repository/repositories.yaml"
186 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
187 self.log.info("Initializing helm in client: {}".format(cluster_id))
188 command = (
189 "{} --kubeconfig={} --tiller-namespace={} "
190 "--home={} init --client-only"
191 ).format(self._helm_command, config_filename, namespace, helm_dir)
192 output, _rc = await self._local_async_exec(
193 command=command, raise_exception_on_error=True
194 )
195 else:
196 self.log.info("Helm client already initialized")
197
198 self.log.info("Cluster {} initialized".format(cluster_id))
199
200 return cluster_uuid, n2vc_installed_sw
201
202 async def repo_add(
203 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
204 ):
205 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
206 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
207 cluster_id, repo_type, name, url))
208
209 # config filename
210 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
211 cluster_name=cluster_id, create_if_not_exist=True
212 )
213
214 # helm repo update
215 command = "{} --kubeconfig={} --home={} repo update".format(
216 self._helm_command, config_filename, helm_dir
217 )
218 self.log.debug("updating repo: {}".format(command))
219 await self._local_async_exec(command=command, raise_exception_on_error=False)
220
221 # helm repo add name url
222 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
223 self._helm_command, config_filename, helm_dir, name, url
224 )
225 self.log.debug("adding repo: {}".format(command))
226 await self._local_async_exec(command=command, raise_exception_on_error=True)
227
228 async def repo_list(self, cluster_uuid: str) -> list:
229 """
230 Get the list of registered repositories
231
232 :return: list of registered repositories: [ (name, url) .... ]
233 """
234
235 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
236 self.log.debug("list repositories for cluster {}".format(cluster_id))
237
238 # config filename
239 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
244 self._helm_command, config_filename, helm_dir
245 )
246
247 output, _rc = await self._local_async_exec(
248 command=command, raise_exception_on_error=True
249 )
250 if output and len(output) > 0:
251 return yaml.load(output, Loader=yaml.SafeLoader)
252 else:
253 return []
254
255 async def repo_remove(self, cluster_uuid: str, name: str):
256 """
257 Remove a repository from OSM
258
259 :param cluster_uuid: the cluster or 'namespace:cluster'
260 :param name: repo name in OSM
261 :return: True if successful
262 """
263
264 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
265 self.log.debug("list repositories for cluster {}".format(cluster_id))
266
267 # config filename
268 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
269 cluster_name=cluster_id, create_if_not_exist=True
270 )
271
272 command = "{} --kubeconfig={} --home={} repo remove {}".format(
273 self._helm_command, config_filename, helm_dir, name
274 )
275
276 await self._local_async_exec(command=command, raise_exception_on_error=True)
277
278 async def reset(
279 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
280 ) -> bool:
281
282 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
283 self.log.debug(
284 "Resetting K8s environment. cluster uuid: {}".format(cluster_id)
285 )
286
287 # get kube and helm directories
288 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
289 cluster_name=cluster_id, create_if_not_exist=False
290 )
291
292 # uninstall releases if needed
293 releases = await self.instances_list(cluster_uuid=cluster_uuid)
294 if len(releases) > 0:
295 if force:
296 for r in releases:
297 try:
298 kdu_instance = r.get("Name")
299 chart = r.get("Chart")
300 self.log.debug(
301 "Uninstalling {} -> {}".format(chart, kdu_instance)
302 )
303 await self.uninstall(
304 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
305 )
306 except Exception as e:
307 self.log.error(
308 "Error uninstalling release {}: {}".format(kdu_instance, e)
309 )
310 else:
311 msg = (
312 "Cluster has releases and not force. Cannot reset K8s "
313 "environment. Cluster uuid: {}"
314 ).format(cluster_id)
315 self.log.error(msg)
316 raise K8sException(msg)
317
318 if uninstall_sw:
319
320 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
321
322 if not namespace:
323 # find namespace for tiller pod
324 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
325 self.kubectl_command, config_filename
326 )
327 output, _rc = await self._local_async_exec(
328 command=command, raise_exception_on_error=False
329 )
330 output_table = K8sHelmConnector._output_to_table(output=output)
331 namespace = None
332 for r in output_table:
333 try:
334 if "tiller-deploy" in r[1]:
335 namespace = r[0]
336 break
337 except Exception:
338 pass
339 else:
340 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
341 self.log.error(msg)
342
343 self.log.debug("namespace for tiller: {}".format(namespace))
344
345 if namespace:
346 # uninstall tiller from cluster
347 self.log.debug(
348 "Uninstalling tiller from cluster {}".format(cluster_id)
349 )
350 command = "{} --kubeconfig={} --home={} reset".format(
351 self._helm_command, config_filename, helm_dir
352 )
353 self.log.debug("resetting: {}".format(command))
354 output, _rc = await self._local_async_exec(
355 command=command, raise_exception_on_error=True
356 )
357 # Delete clusterrolebinding and serviceaccount.
358 # Ignore if errors for backward compatibility
359 command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
360 "io/osm-tiller-cluster-rule").format(self.kubectl_command,
361 config_filename)
362 output, _rc = await self._local_async_exec(command=command,
363 raise_exception_on_error=False)
364 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
365 format(self.kubectl_command, config_filename, self.service_account)
366 output, _rc = await self._local_async_exec(command=command,
367 raise_exception_on_error=False)
368
369 else:
370 self.log.debug("namespace not found")
371
372 # delete cluster directory
373 direct = self.fs.path + "/" + cluster_id
374 self.log.debug("Removing directory {}".format(direct))
375 shutil.rmtree(direct, ignore_errors=True)
376
377 return True
378
379 async def install(
380 self,
381 cluster_uuid: str,
382 kdu_model: str,
383 atomic: bool = True,
384 timeout: float = 300,
385 params: dict = None,
386 db_dict: dict = None,
387 kdu_name: str = None,
388 namespace: str = None,
389 ):
390
391 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
392 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
393
394 # config filename
395 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
399 # params to str
400 # params_str = K8sHelmConnector._params_to_set_option(params)
401 params_str, file_to_delete = self._params_to_file_option(
402 cluster_id=cluster_id, params=params
403 )
404
405 timeout_str = ""
406 if timeout:
407 timeout_str = "--timeout {}".format(timeout)
408
409 # atomic
410 atomic_str = ""
411 if atomic:
412 atomic_str = "--atomic"
413 # namespace
414 namespace_str = ""
415 if namespace:
416 namespace_str = "--namespace {}".format(namespace)
417
418 # version
419 version_str = ""
420 if ":" in kdu_model:
421 parts = kdu_model.split(sep=":")
422 if len(parts) == 2:
423 version_str = "--version {}".format(parts[1])
424 kdu_model = parts[0]
425
426 # generate a name for the release. Then, check if already exists
427 kdu_instance = None
428 while kdu_instance is None:
429 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
430 try:
431 result = await self._status_kdu(
432 cluster_id=cluster_id,
433 kdu_instance=kdu_instance,
434 show_error_log=False,
435 )
436 if result is not None:
437 # instance already exists: generate a new one
438 kdu_instance = None
439 except K8sException:
440 pass
441
442 # helm repo install
443 command = (
444 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
445 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
446 helm=self._helm_command,
447 atomic=atomic_str,
448 config=config_filename,
449 dir=helm_dir,
450 params=params_str,
451 timeout=timeout_str,
452 name=kdu_instance,
453 ns=namespace_str,
454 model=kdu_model,
455 ver=version_str,
456 )
457 )
458 self.log.debug("installing: {}".format(command))
459
460 if atomic:
461 # exec helm in a task
462 exec_task = asyncio.ensure_future(
463 coro_or_future=self._local_async_exec(
464 command=command, raise_exception_on_error=False
465 )
466 )
467
468 # write status in another task
469 status_task = asyncio.ensure_future(
470 coro_or_future=self._store_status(
471 cluster_id=cluster_id,
472 kdu_instance=kdu_instance,
473 db_dict=db_dict,
474 operation="install",
475 run_once=False,
476 )
477 )
478
479 # wait for execution task
480 await asyncio.wait([exec_task])
481
482 # cancel status task
483 status_task.cancel()
484
485 output, rc = exec_task.result()
486
487 else:
488
489 output, rc = await self._local_async_exec(
490 command=command, raise_exception_on_error=False
491 )
492
493 # remove temporal values yaml file
494 if file_to_delete:
495 os.remove(file_to_delete)
496
497 # write final status
498 await self._store_status(
499 cluster_id=cluster_id,
500 kdu_instance=kdu_instance,
501 db_dict=db_dict,
502 operation="install",
503 run_once=True,
504 check_every=0,
505 )
506
507 if rc != 0:
508 msg = "Error executing command: {}\nOutput: {}".format(command, output)
509 self.log.error(msg)
510 raise K8sException(msg)
511
512 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
513 return kdu_instance
514
515 async def instances_list(self, cluster_uuid: str) -> list:
516 """
517 returns a list of deployed releases in a cluster
518
519 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
520 :return:
521 """
522
523 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
524 self.log.debug("list releases for cluster {}".format(cluster_id))
525
526 # config filename
527 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
528 cluster_name=cluster_id, create_if_not_exist=True
529 )
530
531 command = "{} --kubeconfig={} --home={} list --output yaml".format(
532 self._helm_command, config_filename, helm_dir
533 )
534
535 output, _rc = await self._local_async_exec(
536 command=command, raise_exception_on_error=True
537 )
538
539 if output and len(output) > 0:
540 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
541 else:
542 return []
543
544 async def upgrade(
545 self,
546 cluster_uuid: str,
547 kdu_instance: str,
548 kdu_model: str = None,
549 atomic: bool = True,
550 timeout: float = 300,
551 params: dict = None,
552 db_dict: dict = None,
553 ):
554
555 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
556 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
557
558 # config filename
559 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
560 cluster_name=cluster_id, create_if_not_exist=True
561 )
562
563 # params to str
564 # params_str = K8sHelmConnector._params_to_set_option(params)
565 params_str, file_to_delete = self._params_to_file_option(
566 cluster_id=cluster_id, params=params
567 )
568
569 timeout_str = ""
570 if timeout:
571 timeout_str = "--timeout {}".format(timeout)
572
573 # atomic
574 atomic_str = ""
575 if atomic:
576 atomic_str = "--atomic"
577
578 # version
579 version_str = ""
580 if kdu_model and ":" in kdu_model:
581 parts = kdu_model.split(sep=":")
582 if len(parts) == 2:
583 version_str = "--version {}".format(parts[1])
584 kdu_model = parts[0]
585
586 # helm repo upgrade
587 command = (
588 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
589 ).format(
590 self._helm_command,
591 atomic_str,
592 config_filename,
593 helm_dir,
594 params_str,
595 timeout_str,
596 kdu_instance,
597 kdu_model,
598 version_str,
599 )
600 self.log.debug("upgrading: {}".format(command))
601
602 if atomic:
603
604 # exec helm in a task
605 exec_task = asyncio.ensure_future(
606 coro_or_future=self._local_async_exec(
607 command=command, raise_exception_on_error=False
608 )
609 )
610 # write status in another task
611 status_task = asyncio.ensure_future(
612 coro_or_future=self._store_status(
613 cluster_id=cluster_id,
614 kdu_instance=kdu_instance,
615 db_dict=db_dict,
616 operation="upgrade",
617 run_once=False,
618 )
619 )
620
621 # wait for execution task
622 await asyncio.wait([exec_task])
623
624 # cancel status task
625 status_task.cancel()
626 output, rc = exec_task.result()
627
628 else:
629
630 output, rc = await self._local_async_exec(
631 command=command, raise_exception_on_error=False
632 )
633
634 # remove temporal values yaml file
635 if file_to_delete:
636 os.remove(file_to_delete)
637
638 # write final status
639 await self._store_status(
640 cluster_id=cluster_id,
641 kdu_instance=kdu_instance,
642 db_dict=db_dict,
643 operation="upgrade",
644 run_once=True,
645 check_every=0,
646 )
647
648 if rc != 0:
649 msg = "Error executing command: {}\nOutput: {}".format(command, output)
650 self.log.error(msg)
651 raise K8sException(msg)
652
653 # return new revision number
654 instance = await self.get_instance_info(
655 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
656 )
657 if instance:
658 revision = int(instance.get("Revision"))
659 self.log.debug("New revision: {}".format(revision))
660 return revision
661 else:
662 return 0
663
664 async def rollback(
665 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
666 ):
667
668 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
669 self.log.debug(
670 "rollback kdu_instance {} to revision {} from cluster {}".format(
671 kdu_instance, revision, cluster_id
672 )
673 )
674
675 # config filename
676 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
677 cluster_name=cluster_id, create_if_not_exist=True
678 )
679
680 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
681 self._helm_command, config_filename, helm_dir, kdu_instance, revision
682 )
683
684 # exec helm in a task
685 exec_task = asyncio.ensure_future(
686 coro_or_future=self._local_async_exec(
687 command=command, raise_exception_on_error=False
688 )
689 )
690 # write status in another task
691 status_task = asyncio.ensure_future(
692 coro_or_future=self._store_status(
693 cluster_id=cluster_id,
694 kdu_instance=kdu_instance,
695 db_dict=db_dict,
696 operation="rollback",
697 run_once=False,
698 )
699 )
700
701 # wait for execution task
702 await asyncio.wait([exec_task])
703
704 # cancel status task
705 status_task.cancel()
706
707 output, rc = exec_task.result()
708
709 # write final status
710 await self._store_status(
711 cluster_id=cluster_id,
712 kdu_instance=kdu_instance,
713 db_dict=db_dict,
714 operation="rollback",
715 run_once=True,
716 check_every=0,
717 )
718
719 if rc != 0:
720 msg = "Error executing command: {}\nOutput: {}".format(command, output)
721 self.log.error(msg)
722 raise K8sException(msg)
723
724 # return new revision number
725 instance = await self.get_instance_info(
726 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
727 )
728 if instance:
729 revision = int(instance.get("Revision"))
730 self.log.debug("New revision: {}".format(revision))
731 return revision
732 else:
733 return 0
734
735 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
736 """
737 Removes an existing KDU instance. It would implicitly use the `delete` call
738 (this call would happen after all _terminate-config-primitive_ of the VNF
739 are invoked).
740
741 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
742 :param kdu_instance: unique name for the KDU instance to be deleted
743 :return: True if successful
744 """
745
746 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
747 self.log.debug(
748 "uninstall kdu_instance {} from cluster {}".format(
749 kdu_instance, cluster_id
750 )
751 )
752
753 # config filename
754 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
755 cluster_name=cluster_id, create_if_not_exist=True
756 )
757
758 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
759 self._helm_command, config_filename, helm_dir, kdu_instance
760 )
761
762 output, _rc = await self._local_async_exec(
763 command=command, raise_exception_on_error=True
764 )
765
766 return self._output_to_table(output)
767
768 async def exec_primitive(
769 self,
770 cluster_uuid: str = None,
771 kdu_instance: str = None,
772 primitive_name: str = None,
773 timeout: float = 300,
774 params: dict = None,
775 db_dict: dict = None,
776 ) -> str:
777 """Exec primitive (Juju action)
778
779 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
780 :param kdu_instance str: The unique name of the KDU instance
781 :param primitive_name: Name of action that will be executed
782 :param timeout: Timeout for action execution
783 :param params: Dictionary of all the parameters needed for the action
784 :db_dict: Dictionary for any additional data
785
786 :return: Returns the output of the action
787 """
788 raise K8sException(
789 "KDUs deployed with Helm don't support actions "
790 "different from rollback, upgrade and status"
791 )
792
793 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
794
795 self.log.debug(
796 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
797 )
798
799 return await self._exec_inspect_comand(
800 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
801 )
802
803 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
804
805 self.log.debug(
806 "inspect kdu_model values {} from (optional) repo: {}".format(
807 kdu_model, repo_url
808 )
809 )
810
811 return await self._exec_inspect_comand(
812 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
813 )
814
815 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
816
817 self.log.debug(
818 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
819 )
820
821 return await self._exec_inspect_comand(
822 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
823 )
824
825 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
826
827 # call internal function
828 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
829 return await self._status_kdu(
830 cluster_id=cluster_id,
831 kdu_instance=kdu_instance,
832 show_error_log=True,
833 return_text=True,
834 )
835
836 async def synchronize_repos(self, cluster_uuid: str):
837
838 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
839 self.log.debug("syncronize repos for cluster helm-id: {}",)
840 try:
841 update_repos_timeout = (
842 300 # max timeout to sync a single repos, more than this is too much
843 )
844 db_k8scluster = self.db.get_one(
845 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
846 )
847 if db_k8scluster:
848 nbi_repo_list = (
849 db_k8scluster.get("_admin").get("helm_chart_repos") or []
850 )
851 cluster_repo_dict = (
852 db_k8scluster.get("_admin").get("helm_charts_added") or {}
853 )
854 # elements that must be deleted
855 deleted_repo_list = []
856 added_repo_dict = {}
857 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
858 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
859
860 # obtain repos to add: registered by nbi but not added
861 repos_to_add = [
862 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
863 ]
864
865 # obtain repos to delete: added by cluster but not in nbi list
866 repos_to_delete = [
867 repo
868 for repo in cluster_repo_dict.keys()
869 if repo not in nbi_repo_list
870 ]
871
872 # delete repos: must delete first then add because there may be
873 # different repos with same name but
874 # different id and url
875 self.log.debug("repos to delete: {}".format(repos_to_delete))
876 for repo_id in repos_to_delete:
877 # try to delete repos
878 try:
879 repo_delete_task = asyncio.ensure_future(
880 self.repo_remove(
881 cluster_uuid=cluster_uuid,
882 name=cluster_repo_dict[repo_id],
883 )
884 )
885 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
886 except Exception as e:
887 self.warning(
888 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
889 repo_id, cluster_repo_dict[repo_id], str(e)
890 )
891 )
892 # always add to the list of to_delete if there is an error
893 # because if is not there
894 # deleting raises error
895 deleted_repo_list.append(repo_id)
896
897 # add repos
898 self.log.debug("repos to add: {}".format(repos_to_add))
899 for repo_id in repos_to_add:
900 # obtain the repo data from the db
901 # if there is an error getting the repo in the database we will
902 # ignore this repo and continue
903 # because there is a possible race condition where the repo has
904 # been deleted while processing
905 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
906 self.log.debug(
907 "obtained repo: id, {}, name: {}, url: {}".format(
908 repo_id, db_repo["name"], db_repo["url"]
909 )
910 )
911 try:
912 repo_add_task = asyncio.ensure_future(
913 self.repo_add(
914 cluster_uuid=cluster_uuid,
915 name=db_repo["name"],
916 url=db_repo["url"],
917 repo_type="chart",
918 )
919 )
920 await asyncio.wait_for(repo_add_task, update_repos_timeout)
921 added_repo_dict[repo_id] = db_repo["name"]
922 self.log.debug(
923 "added repo: id, {}, name: {}".format(
924 repo_id, db_repo["name"]
925 )
926 )
927 except Exception as e:
928 # deal with error adding repo, adding a repo that already
929 # exists does not raise any error
930 # will not raise error because a wrong repos added by
931 # anyone could prevent instantiating any ns
932 self.log.error(
933 "Error adding repo id: {}, err_msg: {} ".format(
934 repo_id, repr(e)
935 )
936 )
937
938 return deleted_repo_list, added_repo_dict
939
940 else: # else db_k8scluster does not exist
941 raise K8sException(
942 "k8cluster with helm-id : {} not found".format(cluster_uuid)
943 )
944
945 except Exception as e:
946 self.log.error("Error synchronizing repos: {}".format(str(e)))
947 raise K8sException("Error synchronizing repos")
948
949 """
950 ####################################################################################
951 ################################### P R I V A T E ##################################
952 ####################################################################################
953 """
954
955 async def _exec_inspect_comand(
956 self, inspect_command: str, kdu_model: str, repo_url: str = None
957 ):
958
959 repo_str = ""
960 if repo_url:
961 repo_str = " --repo {}".format(repo_url)
962 idx = kdu_model.find("/")
963 if idx >= 0:
964 idx += 1
965 kdu_model = kdu_model[idx:]
966
967 inspect_command = "{} inspect {} {}{}".format(
968 self._helm_command, inspect_command, kdu_model, repo_str
969 )
970 output, _rc = await self._local_async_exec(
971 command=inspect_command, encode_utf8=True
972 )
973
974 return output
975
976 async def _status_kdu(
977 self,
978 cluster_id: str,
979 kdu_instance: str,
980 show_error_log: bool = False,
981 return_text: bool = False,
982 ):
983
984 self.log.debug("status of kdu_instance {}".format(kdu_instance))
985
986 # config filename
987 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
988 cluster_name=cluster_id, create_if_not_exist=True
989 )
990
991 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
992 self._helm_command, config_filename, helm_dir, kdu_instance
993 )
994
995 output, rc = await self._local_async_exec(
996 command=command,
997 raise_exception_on_error=True,
998 show_error_log=show_error_log,
999 )
1000
1001 if return_text:
1002 return str(output)
1003
1004 if rc != 0:
1005 return None
1006
1007 data = yaml.load(output, Loader=yaml.SafeLoader)
1008
1009 # remove field 'notes'
1010 try:
1011 del data.get("info").get("status")["notes"]
1012 except KeyError:
1013 pass
1014
1015 # parse field 'resources'
1016 try:
1017 resources = str(data.get("info").get("status").get("resources"))
1018 resource_table = self._output_to_table(resources)
1019 data.get("info").get("status")["resources"] = resource_table
1020 except Exception:
1021 pass
1022
1023 return data
1024
1025 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1026 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1027 for instance in instances:
1028 if instance.get("Name") == kdu_instance:
1029 return instance
1030 self.log.debug("Instance {} not found".format(kdu_instance))
1031 return None
1032
1033 @staticmethod
1034 def _generate_release_name(chart_name: str):
1035 # check embeded chart (file or dir)
1036 if chart_name.startswith("/"):
1037 # extract file or directory name
1038 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1039 # check URL
1040 elif "://" in chart_name:
1041 # extract last portion of URL
1042 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1043
1044 name = ""
1045 for c in chart_name:
1046 if c.isalpha() or c.isnumeric():
1047 name += c
1048 else:
1049 name += "-"
1050 if len(name) > 35:
1051 name = name[0:35]
1052
1053 # if does not start with alpha character, prefix 'a'
1054 if not name[0].isalpha():
1055 name = "a" + name
1056
1057 name += "-"
1058
1059 def get_random_number():
1060 r = random.randrange(start=1, stop=99999999)
1061 s = str(r)
1062 s = s.rjust(10, "0")
1063 return s
1064
1065 name = name + get_random_number()
1066 return name.lower()
1067
1068 async def _store_status(
1069 self,
1070 cluster_id: str,
1071 operation: str,
1072 kdu_instance: str,
1073 check_every: float = 10,
1074 db_dict: dict = None,
1075 run_once: bool = False,
1076 ):
1077 while True:
1078 try:
1079 await asyncio.sleep(check_every)
1080 detailed_status = await self._status_kdu(
1081 cluster_id=cluster_id, kdu_instance=kdu_instance,
1082 return_text=False
1083 )
1084 status = detailed_status.get("info").get("Description")
1085 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1086 # write status to db
1087 result = await self.write_app_status_to_db(
1088 db_dict=db_dict,
1089 status=str(status),
1090 detailed_status=str(detailed_status),
1091 operation=operation,
1092 )
1093 if not result:
1094 self.log.info("Error writing in database. Task exiting...")
1095 return
1096 except asyncio.CancelledError:
1097 self.log.debug("Task cancelled")
1098 return
1099 except Exception as e:
1100 self.log.debug("_store_status exception: {}".format(str(e)))
1101 pass
1102 finally:
1103 if run_once:
1104 return
1105
1106 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
1107
1108 status = await self._status_kdu(
1109 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
1110 )
1111
1112 # extract info.status.resources-> str
1113 # format:
1114 # ==> v1/Deployment
1115 # NAME READY UP-TO-DATE AVAILABLE AGE
1116 # halting-horse-mongodb 0/1 1 0 0s
1117 # halting-petit-mongodb 1/1 1 0 0s
1118 # blank line
1119 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1120
1121 # convert to table
1122 resources = K8sHelmConnector._output_to_table(resources)
1123
1124 num_lines = len(resources)
1125 index = 0
1126 while index < num_lines:
1127 try:
1128 line1 = resources[index]
1129 index += 1
1130 # find '==>' in column 0
1131 if line1[0] == "==>":
1132 line2 = resources[index]
1133 index += 1
1134 # find READY in column 1
1135 if line2[1] == "READY":
1136 # read next lines
1137 line3 = resources[index]
1138 index += 1
1139 while len(line3) > 1 and index < num_lines:
1140 ready_value = line3[1]
1141 parts = ready_value.split(sep="/")
1142 current = int(parts[0])
1143 total = int(parts[1])
1144 if current < total:
1145 self.log.debug("NOT READY:\n {}".format(line3))
1146 ready = False
1147 line3 = resources[index]
1148 index += 1
1149
1150 except Exception:
1151 pass
1152
1153 return ready
1154
1155 @staticmethod
1156 def _get_deep(dictionary: dict, members: tuple):
1157 target = dictionary
1158 value = None
1159 try:
1160 for m in members:
1161 value = target.get(m)
1162 if not value:
1163 return None
1164 else:
1165 target = value
1166 except Exception:
1167 pass
1168 return value
1169
1170 # find key:value in several lines
1171 @staticmethod
1172 def _find_in_lines(p_lines: list, p_key: str) -> str:
1173 for line in p_lines:
1174 try:
1175 if line.startswith(p_key + ":"):
1176 parts = line.split(":")
1177 the_value = parts[1].strip()
1178 return the_value
1179 except Exception:
1180 # ignore it
1181 pass
1182 return None
1183
1184 # params for use in -f file
1185 # returns values file option and filename (in order to delete it at the end)
1186 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1187
1188 if params and len(params) > 0:
1189 self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
1190
1191 def get_random_number():
1192 r = random.randrange(start=1, stop=99999999)
1193 s = str(r)
1194 while len(s) < 10:
1195 s = "0" + s
1196 return s
1197
1198 params2 = dict()
1199 for key in params:
1200 value = params.get(key)
1201 if "!!yaml" in str(value):
1202 value = yaml.load(value[7:])
1203 params2[key] = value
1204
1205 values_file = get_random_number() + ".yaml"
1206 with open(values_file, "w") as stream:
1207 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1208
1209 return "-f {}".format(values_file), values_file
1210
1211 return "", None
1212
1213 # params for use in --set option
1214 @staticmethod
1215 def _params_to_set_option(params: dict) -> str:
1216 params_str = ""
1217 if params and len(params) > 0:
1218 start = True
1219 for key in params:
1220 value = params.get(key, None)
1221 if value is not None:
1222 if start:
1223 params_str += "--set "
1224 start = False
1225 else:
1226 params_str += ","
1227 params_str += "{}={}".format(key, value)
1228 return params_str
1229
1230 @staticmethod
1231 def _output_to_lines(output: str) -> list:
1232 output_lines = list()
1233 lines = output.splitlines(keepends=False)
1234 for line in lines:
1235 line = line.strip()
1236 if len(line) > 0:
1237 output_lines.append(line)
1238 return output_lines
1239
1240 @staticmethod
1241 def _output_to_table(output: str) -> list:
1242 output_table = list()
1243 lines = output.splitlines(keepends=False)
1244 for line in lines:
1245 line = line.replace("\t", " ")
1246 line_list = list()
1247 output_table.append(line_list)
1248 cells = line.split(sep=" ")
1249 for cell in cells:
1250 cell = cell.strip()
1251 if len(cell) > 0:
1252 line_list.append(cell)
1253 return output_table
1254
1255 def _get_paths(
1256 self, cluster_name: str, create_if_not_exist: bool = False
1257 ) -> (str, str, str, str):
1258 """
1259 Returns kube and helm directories
1260
1261 :param cluster_name:
1262 :param create_if_not_exist:
1263 :return: kube, helm directories, config filename and cluster dir.
1264 Raises exception if not exist and cannot create
1265 """
1266
1267 base = self.fs.path
1268 if base.endswith("/") or base.endswith("\\"):
1269 base = base[:-1]
1270
1271 # base dir for cluster
1272 cluster_dir = base + "/" + cluster_name
1273 if create_if_not_exist and not os.path.exists(cluster_dir):
1274 self.log.debug("Creating dir {}".format(cluster_dir))
1275 os.makedirs(cluster_dir)
1276 if not os.path.exists(cluster_dir):
1277 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1278 self.log.error(msg)
1279 raise K8sException(msg)
1280
1281 # kube dir
1282 kube_dir = cluster_dir + "/" + ".kube"
1283 if create_if_not_exist and not os.path.exists(kube_dir):
1284 self.log.debug("Creating dir {}".format(kube_dir))
1285 os.makedirs(kube_dir)
1286 if not os.path.exists(kube_dir):
1287 msg = "Kube config dir {} does not exist".format(kube_dir)
1288 self.log.error(msg)
1289 raise K8sException(msg)
1290
1291 # helm home dir
1292 helm_dir = cluster_dir + "/" + ".helm"
1293 if create_if_not_exist and not os.path.exists(helm_dir):
1294 self.log.debug("Creating dir {}".format(helm_dir))
1295 os.makedirs(helm_dir)
1296 if not os.path.exists(helm_dir):
1297 msg = "Helm config dir {} does not exist".format(helm_dir)
1298 self.log.error(msg)
1299 raise K8sException(msg)
1300
1301 config_filename = kube_dir + "/config"
1302 return kube_dir, helm_dir, config_filename, cluster_dir
1303
1304 @staticmethod
1305 def _remove_multiple_spaces(strobj):
1306 strobj = strobj.strip()
1307 while " " in strobj:
1308 strobj = strobj.replace(" ", " ")
1309 return strobj
1310
1311 def _local_exec(self, command: str) -> (str, int):
1312 command = K8sHelmConnector._remove_multiple_spaces(command)
1313 self.log.debug("Executing sync local command: {}".format(command))
1314 # raise exception if fails
1315 output = ""
1316 try:
1317 output = subprocess.check_output(
1318 command, shell=True, universal_newlines=True
1319 )
1320 return_code = 0
1321 self.log.debug(output)
1322 except Exception:
1323 return_code = 1
1324
1325 return output, return_code
1326
1327 async def _local_async_exec(
1328 self,
1329 command: str,
1330 raise_exception_on_error: bool = False,
1331 show_error_log: bool = True,
1332 encode_utf8: bool = False,
1333 ) -> (str, int):
1334
1335 command = K8sHelmConnector._remove_multiple_spaces(command)
1336 self.log.debug("Executing async local command: {}".format(command))
1337
1338 # split command
1339 command = command.split(sep=" ")
1340
1341 try:
1342 process = await asyncio.create_subprocess_exec(
1343 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1344 )
1345
1346 # wait for command terminate
1347 stdout, stderr = await process.communicate()
1348
1349 return_code = process.returncode
1350
1351 output = ""
1352 if stdout:
1353 output = stdout.decode("utf-8").strip()
1354 # output = stdout.decode()
1355 if stderr:
1356 output = stderr.decode("utf-8").strip()
1357 # output = stderr.decode()
1358
1359 if return_code != 0 and show_error_log:
1360 self.log.debug(
1361 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1362 )
1363 else:
1364 self.log.debug("Return code: {}".format(return_code))
1365
1366 if raise_exception_on_error and return_code != 0:
1367 raise K8sException(output)
1368
1369 if encode_utf8:
1370 output = output.encode("utf-8").strip()
1371 output = str(output).replace("\\n", "\n")
1372
1373 return output, return_code
1374
1375 except asyncio.CancelledError:
1376 raise
1377 except K8sException:
1378 raise
1379 except Exception as e:
1380 msg = "Exception executing command: {} -> {}".format(command, e)
1381 self.log.error(msg)
1382 if raise_exception_on_error:
1383 raise K8sException(e) from e
1384 else:
1385 return "", -1
1386
1387 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1388 # self.log.debug('Checking if file {} exists...'.format(filename))
1389 if os.path.exists(filename):
1390 return True
1391 else:
1392 msg = "File {} does not exist".format(filename)
1393 if exception_if_not_exists:
1394 # self.log.error(msg)
1395 raise K8sException(msg)