Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43 service_account = "osm"
44
45 def __init__(
46 self,
47 fs: object,
48 db: object,
49 kubectl_command: str = "/usr/bin/kubectl",
50 helm_command: str = "/usr/bin/helm",
51 log: object = None,
52 on_update_db=None,
53 ):
54 """
55
56 :param fs: file system for kubernetes and helm configuration
57 :param db: database object to write current operation status
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
60 :param log: logger
61 :param on_update_db: callback called when k8s connector updates database
62 """
63
64 # parent class
65 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
66
67 self.log.info("Initializing K8S Helm connector")
68
69 # random numbers for release name generation
70 random.seed(time.time())
71
72 # the file system
73 self.fs = fs
74
75 # exception if kubectl is not installed
76 self.kubectl_command = kubectl_command
77 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
78
79 # exception if helm is not installed
80 self._helm_command = helm_command
81 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
82
83 # initialize helm client-only
84 self.log.debug("Initializing helm client-only...")
85 command = "{} init --client-only".format(self._helm_command)
86 try:
87 asyncio.ensure_future(
88 self._local_async_exec(command=command, raise_exception_on_error=False)
89 )
90 # loop = asyncio.get_event_loop()
91 # loop.run_until_complete(self._local_async_exec(command=command,
92 # raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(
95 msg="helm init failed (it was already initialized): {}".format(e)
96 )
97
98 self.log.info("K8S Helm connector initialized")
99
100 @staticmethod
101 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
102 """
103 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
104 cluster_id for backward compatibility
105 """
106 namespace, _, cluster_id = cluster_uuid.rpartition(':')
107 return namespace, cluster_id
108
109 async def init_env(
110 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts on both sides:
114 client (OSM)
115 server (Tiller)
116
117 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 '.kube/config'
119 :param namespace: optional namespace to be used for helm. By default,
120 'kube-system' will be used
121 :param reuse_cluster_uuid: existing cluster uuid for reuse
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
129 namespace = namespace_ or namespace
130 else:
131 cluster_id = str(uuid4())
132 cluster_uuid = "{}:{}".format(namespace, cluster_id)
133
134 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
135
136 # create config filename
137 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
138 cluster_name=cluster_id, create_if_not_exist=True
139 )
140 with open(config_filename, "w") as f:
141 f.write(k8s_creds)
142
143 # check if tiller pod is up in cluster
144 command = "{} --kubeconfig={} --namespace={} get deployments".format(
145 self.kubectl_command, config_filename, namespace
146 )
147 output, _rc = await self._local_async_exec(
148 command=command, raise_exception_on_error=True
149 )
150
151 output_table = self._output_to_table(output=output)
152
153 # find 'tiller' pod in all pods
154 already_initialized = False
155 try:
156 for row in output_table:
157 if row[0].startswith("tiller-deploy"):
158 already_initialized = True
159 break
160 except Exception:
161 pass
162
163 # helm init
164 n2vc_installed_sw = False
165 if not already_initialized:
166 self.log.info(
167 "Initializing helm in client and server: {}".format(cluster_id)
168 )
169 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
170 self.kubectl_command, config_filename, self.service_account)
171 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
172
173 command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
174 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
175 ).format(self.kubectl_command, config_filename, self.service_account)
176 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
179 "init").format(self._helm_command, config_filename, namespace, helm_dir,
180 self.service_account)
181 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
182 n2vc_installed_sw = True
183 else:
184 # check client helm installation
185 check_file = helm_dir + "/repository/repositories.yaml"
186 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
187 self.log.info("Initializing helm in client: {}".format(cluster_id))
188 command = (
189 "{} --kubeconfig={} --tiller-namespace={} "
190 "--home={} init --client-only"
191 ).format(self._helm_command, config_filename, namespace, helm_dir)
192 output, _rc = await self._local_async_exec(
193 command=command, raise_exception_on_error=True
194 )
195 else:
196 self.log.info("Helm client already initialized")
197
198 self.log.info("Cluster {} initialized".format(cluster_id))
199
200 return cluster_uuid, n2vc_installed_sw
201
202 async def repo_add(
203 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
204 ):
205 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
206 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
207 cluster_id, repo_type, name, url))
208
209 # config filename
210 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
211 cluster_name=cluster_id, create_if_not_exist=True
212 )
213
214 # helm repo update
215 command = "{} --kubeconfig={} --home={} repo update".format(
216 self._helm_command, config_filename, helm_dir
217 )
218 self.log.debug("updating repo: {}".format(command))
219 await self._local_async_exec(command=command, raise_exception_on_error=False)
220
221 # helm repo add name url
222 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
223 self._helm_command, config_filename, helm_dir, name, url
224 )
225 self.log.debug("adding repo: {}".format(command))
226 await self._local_async_exec(command=command, raise_exception_on_error=True)
227
228 async def repo_list(self, cluster_uuid: str) -> list:
229 """
230 Get the list of registered repositories
231
232 :return: list of registered repositories: [ (name, url) .... ]
233 """
234
235 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
236 self.log.debug("list repositories for cluster {}".format(cluster_id))
237
238 # config filename
239 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
244 self._helm_command, config_filename, helm_dir
245 )
246
247 output, _rc = await self._local_async_exec(
248 command=command, raise_exception_on_error=True
249 )
250 if output and len(output) > 0:
251 return yaml.load(output, Loader=yaml.SafeLoader)
252 else:
253 return []
254
255 async def repo_remove(self, cluster_uuid: str, name: str):
256 """
257 Remove a repository from OSM
258
259 :param cluster_uuid: the cluster or 'namespace:cluster'
260 :param name: repo name in OSM
261 :return: True if successful
262 """
263
264 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
265 self.log.debug("list repositories for cluster {}".format(cluster_id))
266
267 # config filename
268 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
269 cluster_name=cluster_id, create_if_not_exist=True
270 )
271
272 command = "{} --kubeconfig={} --home={} repo remove {}".format(
273 self._helm_command, config_filename, helm_dir, name
274 )
275
276 await self._local_async_exec(command=command, raise_exception_on_error=True)
277
278 async def reset(
279 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
280 ) -> bool:
281
282 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
283 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
284 .format(cluster_id, uninstall_sw))
285
286 # get kube and helm directories
287 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
288 cluster_name=cluster_id, create_if_not_exist=False
289 )
290
291 # uninstall releases if needed.
292 if uninstall_sw:
293 releases = await self.instances_list(cluster_uuid=cluster_uuid)
294 if len(releases) > 0:
295 if force:
296 for r in releases:
297 try:
298 kdu_instance = r.get("Name")
299 chart = r.get("Chart")
300 self.log.debug(
301 "Uninstalling {} -> {}".format(chart, kdu_instance)
302 )
303 await self.uninstall(
304 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
305 )
306 except Exception as e:
307 self.log.error(
308 "Error uninstalling release {}: {}".format(kdu_instance, e)
309 )
310 else:
311 msg = (
312 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
313 ).format(cluster_id)
314 self.log.warn(msg)
315 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
316
317 if uninstall_sw:
318
319 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
320
321 if not namespace:
322 # find namespace for tiller pod
323 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
324 self.kubectl_command, config_filename
325 )
326 output, _rc = await self._local_async_exec(
327 command=command, raise_exception_on_error=False
328 )
329 output_table = K8sHelmConnector._output_to_table(output=output)
330 namespace = None
331 for r in output_table:
332 try:
333 if "tiller-deploy" in r[1]:
334 namespace = r[0]
335 break
336 except Exception:
337 pass
338 else:
339 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
340 self.log.error(msg)
341
342 self.log.debug("namespace for tiller: {}".format(namespace))
343
344 if namespace:
345 # uninstall tiller from cluster
346 self.log.debug(
347 "Uninstalling tiller from cluster {}".format(cluster_id)
348 )
349 command = "{} --kubeconfig={} --home={} reset".format(
350 self._helm_command, config_filename, helm_dir
351 )
352 self.log.debug("resetting: {}".format(command))
353 output, _rc = await self._local_async_exec(
354 command=command, raise_exception_on_error=True
355 )
356 # Delete clusterrolebinding and serviceaccount.
357 # Ignore if errors for backward compatibility
358 command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
359 "io/osm-tiller-cluster-rule").format(self.kubectl_command,
360 config_filename)
361 output, _rc = await self._local_async_exec(command=command,
362 raise_exception_on_error=False)
363 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
364 format(self.kubectl_command, config_filename, self.service_account)
365 output, _rc = await self._local_async_exec(command=command,
366 raise_exception_on_error=False)
367
368 else:
369 self.log.debug("namespace not found")
370
371 # delete cluster directory
372 direct = self.fs.path + "/" + cluster_id
373 self.log.debug("Removing directory {}".format(direct))
374 shutil.rmtree(direct, ignore_errors=True)
375
376 return True
377
378 async def install(
379 self,
380 cluster_uuid: str,
381 kdu_model: str,
382 atomic: bool = True,
383 timeout: float = 300,
384 params: dict = None,
385 db_dict: dict = None,
386 kdu_name: str = None,
387 namespace: str = None,
388 ):
389
390 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
391 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
392
393 # config filename
394 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
395 cluster_name=cluster_id, create_if_not_exist=True
396 )
397
398 # params to str
399 # params_str = K8sHelmConnector._params_to_set_option(params)
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 timeout_str = ""
405 if timeout:
406 timeout_str = "--timeout {}".format(timeout)
407
408 # atomic
409 atomic_str = ""
410 if atomic:
411 atomic_str = "--atomic"
412 # namespace
413 namespace_str = ""
414 if namespace:
415 namespace_str = "--namespace {}".format(namespace)
416
417 # version
418 version_str = ""
419 if ":" in kdu_model:
420 parts = kdu_model.split(sep=":")
421 if len(parts) == 2:
422 version_str = "--version {}".format(parts[1])
423 kdu_model = parts[0]
424
425 # generate a name for the release. Then, check if already exists
426 kdu_instance = None
427 while kdu_instance is None:
428 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
429 try:
430 result = await self._status_kdu(
431 cluster_id=cluster_id,
432 kdu_instance=kdu_instance,
433 show_error_log=False,
434 )
435 if result is not None:
436 # instance already exists: generate a new one
437 kdu_instance = None
438 except K8sException:
439 pass
440
441 # helm repo install
442 command = (
443 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
444 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
445 helm=self._helm_command,
446 atomic=atomic_str,
447 config=config_filename,
448 dir=helm_dir,
449 params=params_str,
450 timeout=timeout_str,
451 name=kdu_instance,
452 ns=namespace_str,
453 model=kdu_model,
454 ver=version_str,
455 )
456 )
457 self.log.debug("installing: {}".format(command))
458
459 if atomic:
460 # exec helm in a task
461 exec_task = asyncio.ensure_future(
462 coro_or_future=self._local_async_exec(
463 command=command, raise_exception_on_error=False
464 )
465 )
466
467 # write status in another task
468 status_task = asyncio.ensure_future(
469 coro_or_future=self._store_status(
470 cluster_id=cluster_id,
471 kdu_instance=kdu_instance,
472 db_dict=db_dict,
473 operation="install",
474 run_once=False,
475 )
476 )
477
478 # wait for execution task
479 await asyncio.wait([exec_task])
480
481 # cancel status task
482 status_task.cancel()
483
484 output, rc = exec_task.result()
485
486 else:
487
488 output, rc = await self._local_async_exec(
489 command=command, raise_exception_on_error=False
490 )
491
492 # remove temporal values yaml file
493 if file_to_delete:
494 os.remove(file_to_delete)
495
496 # write final status
497 await self._store_status(
498 cluster_id=cluster_id,
499 kdu_instance=kdu_instance,
500 db_dict=db_dict,
501 operation="install",
502 run_once=True,
503 check_every=0,
504 )
505
506 if rc != 0:
507 msg = "Error executing command: {}\nOutput: {}".format(command, output)
508 self.log.error(msg)
509 raise K8sException(msg)
510
511 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
512 return kdu_instance
513
514 async def instances_list(self, cluster_uuid: str) -> list:
515 """
516 returns a list of deployed releases in a cluster
517
518 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
519 :return:
520 """
521
522 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
523 self.log.debug("list releases for cluster {}".format(cluster_id))
524
525 # config filename
526 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
527 cluster_name=cluster_id, create_if_not_exist=True
528 )
529
530 command = "{} --kubeconfig={} --home={} list --output yaml".format(
531 self._helm_command, config_filename, helm_dir
532 )
533
534 output, _rc = await self._local_async_exec(
535 command=command, raise_exception_on_error=True
536 )
537
538 if output and len(output) > 0:
539 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
540 else:
541 return []
542
543 async def upgrade(
544 self,
545 cluster_uuid: str,
546 kdu_instance: str,
547 kdu_model: str = None,
548 atomic: bool = True,
549 timeout: float = 300,
550 params: dict = None,
551 db_dict: dict = None,
552 ):
553
554 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
555 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
556
557 # config filename
558 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
559 cluster_name=cluster_id, create_if_not_exist=True
560 )
561
562 # params to str
563 # params_str = K8sHelmConnector._params_to_set_option(params)
564 params_str, file_to_delete = self._params_to_file_option(
565 cluster_id=cluster_id, params=params
566 )
567
568 timeout_str = ""
569 if timeout:
570 timeout_str = "--timeout {}".format(timeout)
571
572 # atomic
573 atomic_str = ""
574 if atomic:
575 atomic_str = "--atomic"
576
577 # version
578 version_str = ""
579 if kdu_model and ":" in kdu_model:
580 parts = kdu_model.split(sep=":")
581 if len(parts) == 2:
582 version_str = "--version {}".format(parts[1])
583 kdu_model = parts[0]
584
585 # helm repo upgrade
586 command = (
587 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
588 ).format(
589 self._helm_command,
590 atomic_str,
591 config_filename,
592 helm_dir,
593 params_str,
594 timeout_str,
595 kdu_instance,
596 kdu_model,
597 version_str,
598 )
599 self.log.debug("upgrading: {}".format(command))
600
601 if atomic:
602
603 # exec helm in a task
604 exec_task = asyncio.ensure_future(
605 coro_or_future=self._local_async_exec(
606 command=command, raise_exception_on_error=False
607 )
608 )
609 # write status in another task
610 status_task = asyncio.ensure_future(
611 coro_or_future=self._store_status(
612 cluster_id=cluster_id,
613 kdu_instance=kdu_instance,
614 db_dict=db_dict,
615 operation="upgrade",
616 run_once=False,
617 )
618 )
619
620 # wait for execution task
621 await asyncio.wait([exec_task])
622
623 # cancel status task
624 status_task.cancel()
625 output, rc = exec_task.result()
626
627 else:
628
629 output, rc = await self._local_async_exec(
630 command=command, raise_exception_on_error=False
631 )
632
633 # remove temporal values yaml file
634 if file_to_delete:
635 os.remove(file_to_delete)
636
637 # write final status
638 await self._store_status(
639 cluster_id=cluster_id,
640 kdu_instance=kdu_instance,
641 db_dict=db_dict,
642 operation="upgrade",
643 run_once=True,
644 check_every=0,
645 )
646
647 if rc != 0:
648 msg = "Error executing command: {}\nOutput: {}".format(command, output)
649 self.log.error(msg)
650 raise K8sException(msg)
651
652 # return new revision number
653 instance = await self.get_instance_info(
654 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
655 )
656 if instance:
657 revision = int(instance.get("Revision"))
658 self.log.debug("New revision: {}".format(revision))
659 return revision
660 else:
661 return 0
662
663 async def rollback(
664 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
665 ):
666
667 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
668 self.log.debug(
669 "rollback kdu_instance {} to revision {} from cluster {}".format(
670 kdu_instance, revision, cluster_id
671 )
672 )
673
674 # config filename
675 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
676 cluster_name=cluster_id, create_if_not_exist=True
677 )
678
679 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
680 self._helm_command, config_filename, helm_dir, kdu_instance, revision
681 )
682
683 # exec helm in a task
684 exec_task = asyncio.ensure_future(
685 coro_or_future=self._local_async_exec(
686 command=command, raise_exception_on_error=False
687 )
688 )
689 # write status in another task
690 status_task = asyncio.ensure_future(
691 coro_or_future=self._store_status(
692 cluster_id=cluster_id,
693 kdu_instance=kdu_instance,
694 db_dict=db_dict,
695 operation="rollback",
696 run_once=False,
697 )
698 )
699
700 # wait for execution task
701 await asyncio.wait([exec_task])
702
703 # cancel status task
704 status_task.cancel()
705
706 output, rc = exec_task.result()
707
708 # write final status
709 await self._store_status(
710 cluster_id=cluster_id,
711 kdu_instance=kdu_instance,
712 db_dict=db_dict,
713 operation="rollback",
714 run_once=True,
715 check_every=0,
716 )
717
718 if rc != 0:
719 msg = "Error executing command: {}\nOutput: {}".format(command, output)
720 self.log.error(msg)
721 raise K8sException(msg)
722
723 # return new revision number
724 instance = await self.get_instance_info(
725 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
726 )
727 if instance:
728 revision = int(instance.get("Revision"))
729 self.log.debug("New revision: {}".format(revision))
730 return revision
731 else:
732 return 0
733
734 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
735 """
736 Removes an existing KDU instance. It would implicitly use the `delete` call
737 (this call would happen after all _terminate-config-primitive_ of the VNF
738 are invoked).
739
740 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
741 :param kdu_instance: unique name for the KDU instance to be deleted
742 :return: True if successful
743 """
744
745 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
746 self.log.debug(
747 "uninstall kdu_instance {} from cluster {}".format(
748 kdu_instance, cluster_id
749 )
750 )
751
752 # config filename
753 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
754 cluster_name=cluster_id, create_if_not_exist=True
755 )
756
757 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
758 self._helm_command, config_filename, helm_dir, kdu_instance
759 )
760
761 output, _rc = await self._local_async_exec(
762 command=command, raise_exception_on_error=True
763 )
764
765 return self._output_to_table(output)
766
767 async def exec_primitive(
768 self,
769 cluster_uuid: str = None,
770 kdu_instance: str = None,
771 primitive_name: str = None,
772 timeout: float = 300,
773 params: dict = None,
774 db_dict: dict = None,
775 ) -> str:
776 """Exec primitive (Juju action)
777
778 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
779 :param kdu_instance str: The unique name of the KDU instance
780 :param primitive_name: Name of action that will be executed
781 :param timeout: Timeout for action execution
782 :param params: Dictionary of all the parameters needed for the action
783 :db_dict: Dictionary for any additional data
784
785 :return: Returns the output of the action
786 """
787 raise K8sException(
788 "KDUs deployed with Helm don't support actions "
789 "different from rollback, upgrade and status"
790 )
791
792 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
793
794 self.log.debug(
795 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
796 )
797
798 return await self._exec_inspect_comand(
799 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
800 )
801
802 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
803
804 self.log.debug(
805 "inspect kdu_model values {} from (optional) repo: {}".format(
806 kdu_model, repo_url
807 )
808 )
809
810 return await self._exec_inspect_comand(
811 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
812 )
813
814 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
815
816 self.log.debug(
817 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
818 )
819
820 return await self._exec_inspect_comand(
821 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
822 )
823
824 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
825
826 # call internal function
827 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
828 return await self._status_kdu(
829 cluster_id=cluster_id,
830 kdu_instance=kdu_instance,
831 show_error_log=True,
832 return_text=True,
833 )
834
835 async def get_services(self,
836 cluster_uuid: str,
837 kdu_instance: str,
838 namespace: str) -> list:
839
840 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
841 self.log.debug(
842 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
843 cluster_uuid, kdu_instance
844 )
845 )
846
847 status = await self._status_kdu(
848 cluster_id, kdu_instance, return_text=False
849 )
850
851 service_names = self._parse_helm_status_service_info(status)
852 service_list = []
853 for service in service_names:
854 service = await self.get_service(cluster_uuid, service, namespace)
855 service_list.append(service)
856
857 return service_list
858
859 async def get_service(self,
860 cluster_uuid: str,
861 service_name: str,
862 namespace: str) -> object:
863
864 self.log.debug(
865 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
866 service_name, namespace, cluster_uuid)
867 )
868
869 # get paths
870 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
871 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
872 cluster_name=cluster_id, create_if_not_exist=True
873 )
874
875 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
876 self.kubectl_command, config_filename, namespace, service_name
877 )
878
879 output, _rc = await self._local_async_exec(
880 command=command, raise_exception_on_error=True
881 )
882
883 data = yaml.load(output, Loader=yaml.SafeLoader)
884
885 service = {
886 "name": service_name,
887 "type": self._get_deep(data, ("spec", "type")),
888 "ports": self._get_deep(data, ("spec", "ports")),
889 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
890 }
891 if service["type"] == "LoadBalancer":
892 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
893 ip_list = [elem["ip"] for elem in ip_map_list]
894 service["external_ip"] = ip_list
895
896 return service
897
898 async def synchronize_repos(self, cluster_uuid: str):
899
900 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
901 self.log.debug("syncronize repos for cluster helm-id: {}".format(cluster_id))
902 try:
903 update_repos_timeout = (
904 300 # max timeout to sync a single repos, more than this is too much
905 )
906 db_k8scluster = self.db.get_one(
907 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
908 )
909 if db_k8scluster:
910 nbi_repo_list = (
911 db_k8scluster.get("_admin").get("helm_chart_repos") or []
912 )
913 cluster_repo_dict = (
914 db_k8scluster.get("_admin").get("helm_charts_added") or {}
915 )
916 # elements that must be deleted
917 deleted_repo_list = []
918 added_repo_dict = {}
919 # self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
920 # self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
921
922 # obtain repos to add: registered by nbi but not added
923 repos_to_add = [
924 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
925 ]
926
927 # obtain repos to delete: added by cluster but not in nbi list
928 repos_to_delete = [
929 repo
930 for repo in cluster_repo_dict.keys()
931 if repo not in nbi_repo_list
932 ]
933
934 # delete repos: must delete first then add because there may be
935 # different repos with same name but
936 # different id and url
937 if repos_to_delete:
938 self.log.debug("repos to delete: {}".format(repos_to_delete))
939 for repo_id in repos_to_delete:
940 # try to delete repos
941 try:
942 repo_delete_task = asyncio.ensure_future(
943 self.repo_remove(
944 cluster_uuid=cluster_uuid,
945 name=cluster_repo_dict[repo_id],
946 )
947 )
948 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
949 except Exception as e:
950 self.warning(
951 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
952 repo_id, cluster_repo_dict[repo_id], str(e)
953 )
954 )
955 # always add to the list of to_delete if there is an error
956 # because if is not there
957 # deleting raises error
958 deleted_repo_list.append(repo_id)
959
960 # add repos
961 if repos_to_add:
962 self.log.debug("repos to add: {}".format(repos_to_add))
963 for repo_id in repos_to_add:
964 # obtain the repo data from the db
965 # if there is an error getting the repo in the database we will
966 # ignore this repo and continue
967 # because there is a possible race condition where the repo has
968 # been deleted while processing
969 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
970 self.log.debug(
971 "obtained repo: id, {}, name: {}, url: {}".format(
972 repo_id, db_repo["name"], db_repo["url"]
973 )
974 )
975 try:
976 repo_add_task = asyncio.ensure_future(
977 self.repo_add(
978 cluster_uuid=cluster_uuid,
979 name=db_repo["name"],
980 url=db_repo["url"],
981 repo_type="chart",
982 )
983 )
984 await asyncio.wait_for(repo_add_task, update_repos_timeout)
985 added_repo_dict[repo_id] = db_repo["name"]
986 self.log.debug(
987 "added repo: id, {}, name: {}".format(
988 repo_id, db_repo["name"]
989 )
990 )
991 except Exception as e:
992 # deal with error adding repo, adding a repo that already
993 # exists does not raise any error
994 # will not raise error because a wrong repos added by
995 # anyone could prevent instantiating any ns
996 self.log.error(
997 "Error adding repo id: {}, err_msg: {} ".format(
998 repo_id, repr(e)
999 )
1000 )
1001
1002 return deleted_repo_list, added_repo_dict
1003
1004 else: # else db_k8scluster does not exist
1005 raise K8sException(
1006 "k8cluster with helm-id : {} not found".format(cluster_uuid)
1007 )
1008
1009 except Exception as e:
1010 self.log.error("Error synchronizing repos: {}".format(str(e)))
1011 raise K8sException("Error synchronizing repos")
1012
1013 """
1014 ####################################################################################
1015 ################################### P R I V A T E ##################################
1016 ####################################################################################
1017 """
1018
1019 async def _exec_inspect_comand(
1020 self, inspect_command: str, kdu_model: str, repo_url: str = None
1021 ):
1022
1023 repo_str = ""
1024 if repo_url:
1025 repo_str = " --repo {}".format(repo_url)
1026 idx = kdu_model.find("/")
1027 if idx >= 0:
1028 idx += 1
1029 kdu_model = kdu_model[idx:]
1030
1031 inspect_command = "{} inspect {} {}{}".format(
1032 self._helm_command, inspect_command, kdu_model, repo_str
1033 )
1034 output, _rc = await self._local_async_exec(
1035 command=inspect_command, encode_utf8=True
1036 )
1037
1038 return output
1039
1040 async def _status_kdu(
1041 self,
1042 cluster_id: str,
1043 kdu_instance: str,
1044 show_error_log: bool = False,
1045 return_text: bool = False,
1046 ):
1047
1048 self.log.debug("status of kdu_instance {}".format(kdu_instance))
1049
1050 # config filename
1051 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
1052 cluster_name=cluster_id, create_if_not_exist=True
1053 )
1054
1055 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
1056 self._helm_command, config_filename, helm_dir, kdu_instance
1057 )
1058
1059 output, rc = await self._local_async_exec(
1060 command=command,
1061 raise_exception_on_error=True,
1062 show_error_log=show_error_log,
1063 )
1064
1065 if return_text:
1066 return str(output)
1067
1068 if rc != 0:
1069 return None
1070
1071 data = yaml.load(output, Loader=yaml.SafeLoader)
1072
1073 # remove field 'notes'
1074 try:
1075 del data.get("info").get("status")["notes"]
1076 except KeyError:
1077 pass
1078
1079 # parse field 'resources'
1080 try:
1081 resources = str(data.get("info").get("status").get("resources"))
1082 resource_table = self._output_to_table(resources)
1083 data.get("info").get("status")["resources"] = resource_table
1084 except Exception:
1085 pass
1086
1087 return data
1088
1089 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1090 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1091 for instance in instances:
1092 if instance.get("Name") == kdu_instance:
1093 return instance
1094 self.log.debug("Instance {} not found".format(kdu_instance))
1095 return None
1096
1097 @staticmethod
1098 def _generate_release_name(chart_name: str):
1099 # check embeded chart (file or dir)
1100 if chart_name.startswith("/"):
1101 # extract file or directory name
1102 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1103 # check URL
1104 elif "://" in chart_name:
1105 # extract last portion of URL
1106 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1107
1108 name = ""
1109 for c in chart_name:
1110 if c.isalpha() or c.isnumeric():
1111 name += c
1112 else:
1113 name += "-"
1114 if len(name) > 35:
1115 name = name[0:35]
1116
1117 # if does not start with alpha character, prefix 'a'
1118 if not name[0].isalpha():
1119 name = "a" + name
1120
1121 name += "-"
1122
1123 def get_random_number():
1124 r = random.randrange(start=1, stop=99999999)
1125 s = str(r)
1126 s = s.rjust(10, "0")
1127 return s
1128
1129 name = name + get_random_number()
1130 return name.lower()
1131
1132 async def _store_status(
1133 self,
1134 cluster_id: str,
1135 operation: str,
1136 kdu_instance: str,
1137 check_every: float = 10,
1138 db_dict: dict = None,
1139 run_once: bool = False,
1140 ):
1141 previous_exception = None
1142 while True:
1143 try:
1144 await asyncio.sleep(check_every)
1145 detailed_status = await self._status_kdu(
1146 cluster_id=cluster_id, kdu_instance=kdu_instance,
1147 return_text=False
1148 )
1149 status = detailed_status.get("info").get("Description")
1150 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1151 # write status to db
1152 result = await self.write_app_status_to_db(
1153 db_dict=db_dict,
1154 status=str(status),
1155 detailed_status=str(detailed_status),
1156 operation=operation,
1157 )
1158 if not result:
1159 self.log.info("Error writing in database. Task exiting...")
1160 return
1161 except asyncio.CancelledError:
1162 self.log.debug("Task cancelled")
1163 return
1164 except Exception as e:
1165 # log only once in the while loop
1166 if str(previous_exception) != str(e):
1167 self.log.debug("_store_status exception: {}".format(str(e)))
1168 previous_exception = e
1169 finally:
1170 if run_once:
1171 return
1172
1173 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
1174
1175 status = await self._status_kdu(
1176 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
1177 )
1178
1179 # extract info.status.resources-> str
1180 # format:
1181 # ==> v1/Deployment
1182 # NAME READY UP-TO-DATE AVAILABLE AGE
1183 # halting-horse-mongodb 0/1 1 0 0s
1184 # halting-petit-mongodb 1/1 1 0 0s
1185 # blank line
1186 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1187
1188 # convert to table
1189 resources = K8sHelmConnector._output_to_table(resources)
1190
1191 num_lines = len(resources)
1192 index = 0
1193 while index < num_lines:
1194 try:
1195 line1 = resources[index]
1196 index += 1
1197 # find '==>' in column 0
1198 if line1[0] == "==>":
1199 line2 = resources[index]
1200 index += 1
1201 # find READY in column 1
1202 if line2[1] == "READY":
1203 # read next lines
1204 line3 = resources[index]
1205 index += 1
1206 while len(line3) > 1 and index < num_lines:
1207 ready_value = line3[1]
1208 parts = ready_value.split(sep="/")
1209 current = int(parts[0])
1210 total = int(parts[1])
1211 if current < total:
1212 self.log.debug("NOT READY:\n {}".format(line3))
1213 ready = False
1214 line3 = resources[index]
1215 index += 1
1216
1217 except Exception:
1218 pass
1219
1220 return ready
1221
1222 def _parse_helm_status_service_info(self, status):
1223
1224 # extract info.status.resources-> str
1225 # format:
1226 # ==> v1/Deployment
1227 # NAME READY UP-TO-DATE AVAILABLE AGE
1228 # halting-horse-mongodb 0/1 1 0 0s
1229 # halting-petit-mongodb 1/1 1 0 0s
1230 # blank line
1231 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1232
1233 service_list = []
1234 first_line_skipped = service_found = False
1235 for line in resources:
1236 if not service_found:
1237 if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
1238 service_found = True
1239 continue
1240 else:
1241 if len(line) >= 2 and line[0] == "==>":
1242 service_found = first_line_skipped = False
1243 continue
1244 if not line:
1245 continue
1246 if not first_line_skipped:
1247 first_line_skipped = True
1248 continue
1249 service_list.append(line[0])
1250
1251 return service_list
1252
1253 @staticmethod
1254 def _get_deep(dictionary: dict, members: tuple):
1255 target = dictionary
1256 value = None
1257 try:
1258 for m in members:
1259 value = target.get(m)
1260 if not value:
1261 return None
1262 else:
1263 target = value
1264 except Exception:
1265 pass
1266 return value
1267
1268 # find key:value in several lines
1269 @staticmethod
1270 def _find_in_lines(p_lines: list, p_key: str) -> str:
1271 for line in p_lines:
1272 try:
1273 if line.startswith(p_key + ":"):
1274 parts = line.split(":")
1275 the_value = parts[1].strip()
1276 return the_value
1277 except Exception:
1278 # ignore it
1279 pass
1280 return None
1281
1282 # params for use in -f file
1283 # returns values file option and filename (in order to delete it at the end)
1284 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1285
1286 if params and len(params) > 0:
1287 self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
1288
1289 def get_random_number():
1290 r = random.randrange(start=1, stop=99999999)
1291 s = str(r)
1292 while len(s) < 10:
1293 s = "0" + s
1294 return s
1295
1296 params2 = dict()
1297 for key in params:
1298 value = params.get(key)
1299 if "!!yaml" in str(value):
1300 value = yaml.load(value[7:])
1301 params2[key] = value
1302
1303 values_file = get_random_number() + ".yaml"
1304 with open(values_file, "w") as stream:
1305 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1306
1307 return "-f {}".format(values_file), values_file
1308
1309 return "", None
1310
1311 # params for use in --set option
1312 @staticmethod
1313 def _params_to_set_option(params: dict) -> str:
1314 params_str = ""
1315 if params and len(params) > 0:
1316 start = True
1317 for key in params:
1318 value = params.get(key, None)
1319 if value is not None:
1320 if start:
1321 params_str += "--set "
1322 start = False
1323 else:
1324 params_str += ","
1325 params_str += "{}={}".format(key, value)
1326 return params_str
1327
1328 @staticmethod
1329 def _output_to_lines(output: str) -> list:
1330 output_lines = list()
1331 lines = output.splitlines(keepends=False)
1332 for line in lines:
1333 line = line.strip()
1334 if len(line) > 0:
1335 output_lines.append(line)
1336 return output_lines
1337
1338 @staticmethod
1339 def _output_to_table(output: str) -> list:
1340 output_table = list()
1341 lines = output.splitlines(keepends=False)
1342 for line in lines:
1343 line = line.replace("\t", " ")
1344 line_list = list()
1345 output_table.append(line_list)
1346 cells = line.split(sep=" ")
1347 for cell in cells:
1348 cell = cell.strip()
1349 if len(cell) > 0:
1350 line_list.append(cell)
1351 return output_table
1352
1353 def _get_paths(
1354 self, cluster_name: str, create_if_not_exist: bool = False
1355 ) -> (str, str, str, str):
1356 """
1357 Returns kube and helm directories
1358
1359 :param cluster_name:
1360 :param create_if_not_exist:
1361 :return: kube, helm directories, config filename and cluster dir.
1362 Raises exception if not exist and cannot create
1363 """
1364
1365 base = self.fs.path
1366 if base.endswith("/") or base.endswith("\\"):
1367 base = base[:-1]
1368
1369 # base dir for cluster
1370 cluster_dir = base + "/" + cluster_name
1371 if create_if_not_exist and not os.path.exists(cluster_dir):
1372 self.log.debug("Creating dir {}".format(cluster_dir))
1373 os.makedirs(cluster_dir)
1374 if not os.path.exists(cluster_dir):
1375 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1376 self.log.error(msg)
1377 raise K8sException(msg)
1378
1379 # kube dir
1380 kube_dir = cluster_dir + "/" + ".kube"
1381 if create_if_not_exist and not os.path.exists(kube_dir):
1382 self.log.debug("Creating dir {}".format(kube_dir))
1383 os.makedirs(kube_dir)
1384 if not os.path.exists(kube_dir):
1385 msg = "Kube config dir {} does not exist".format(kube_dir)
1386 self.log.error(msg)
1387 raise K8sException(msg)
1388
1389 # helm home dir
1390 helm_dir = cluster_dir + "/" + ".helm"
1391 if create_if_not_exist and not os.path.exists(helm_dir):
1392 self.log.debug("Creating dir {}".format(helm_dir))
1393 os.makedirs(helm_dir)
1394 if not os.path.exists(helm_dir):
1395 msg = "Helm config dir {} does not exist".format(helm_dir)
1396 self.log.error(msg)
1397 raise K8sException(msg)
1398
1399 config_filename = kube_dir + "/config"
1400 return kube_dir, helm_dir, config_filename, cluster_dir
1401
1402 @staticmethod
1403 def _remove_multiple_spaces(strobj):
1404 strobj = strobj.strip()
1405 while " " in strobj:
1406 strobj = strobj.replace(" ", " ")
1407 return strobj
1408
1409 def _local_exec(self, command: str) -> (str, int):
1410 command = K8sHelmConnector._remove_multiple_spaces(command)
1411 self.log.debug("Executing sync local command: {}".format(command))
1412 # raise exception if fails
1413 output = ""
1414 try:
1415 output = subprocess.check_output(
1416 command, shell=True, universal_newlines=True
1417 )
1418 return_code = 0
1419 self.log.debug(output)
1420 except Exception:
1421 return_code = 1
1422
1423 return output, return_code
1424
1425 async def _local_async_exec(
1426 self,
1427 command: str,
1428 raise_exception_on_error: bool = False,
1429 show_error_log: bool = True,
1430 encode_utf8: bool = False,
1431 ) -> (str, int):
1432
1433 command = K8sHelmConnector._remove_multiple_spaces(command)
1434 self.log.debug("Executing async local command: {}".format(command))
1435
1436 # split command
1437 command = command.split(sep=" ")
1438
1439 try:
1440 process = await asyncio.create_subprocess_exec(
1441 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1442 )
1443
1444 # wait for command terminate
1445 stdout, stderr = await process.communicate()
1446
1447 return_code = process.returncode
1448
1449 output = ""
1450 if stdout:
1451 output = stdout.decode("utf-8").strip()
1452 # output = stdout.decode()
1453 if stderr:
1454 output = stderr.decode("utf-8").strip()
1455 # output = stderr.decode()
1456
1457 if return_code != 0 and show_error_log:
1458 self.log.debug(
1459 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1460 )
1461 else:
1462 self.log.debug("Return code: {}".format(return_code))
1463
1464 if raise_exception_on_error and return_code != 0:
1465 raise K8sException(output)
1466
1467 if encode_utf8:
1468 output = output.encode("utf-8").strip()
1469 output = str(output).replace("\\n", "\n")
1470
1471 return output, return_code
1472
1473 except asyncio.CancelledError:
1474 raise
1475 except K8sException:
1476 raise
1477 except Exception as e:
1478 msg = "Exception executing command: {} -> {}".format(command, e)
1479 self.log.error(msg)
1480 if raise_exception_on_error:
1481 raise K8sException(e) from e
1482 else:
1483 return "", -1
1484
1485 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1486 # self.log.debug('Checking if file {} exists...'.format(filename))
1487 if os.path.exists(filename):
1488 return True
1489 else:
1490 msg = "File {} does not exist".format(filename)
1491 if exception_if_not_exists:
1492 # self.log.error(msg)
1493 raise K8sException(msg)