1156 fix path obtained for k8s_helm_conn cluster
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43 service_account = "osm"
44
45 def __init__(
46 self,
47 fs: object,
48 db: object,
49 kubectl_command: str = "/usr/bin/kubectl",
50 helm_command: str = "/usr/bin/helm",
51 log: object = None,
52 on_update_db=None,
53 ):
54 """
55
56 :param fs: file system for kubernetes and helm configuration
57 :param db: database object to write current operation status
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
60 :param log: logger
61 :param on_update_db: callback called when k8s connector updates database
62 """
63
64 # parent class
65 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
66
67 self.log.info("Initializing K8S Helm connector")
68
69 # random numbers for release name generation
70 random.seed(time.time())
71
72 # the file system
73 self.fs = fs
74
75 # exception if kubectl is not installed
76 self.kubectl_command = kubectl_command
77 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
78
79 # exception if helm is not installed
80 self._helm_command = helm_command
81 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
82
83 # initialize helm client-only
84 self.log.debug("Initializing helm client-only...")
85 command = "{} init --client-only".format(self._helm_command)
86 try:
87 asyncio.ensure_future(
88 self._local_async_exec(command=command, raise_exception_on_error=False)
89 )
90 # loop = asyncio.get_event_loop()
91 # loop.run_until_complete(self._local_async_exec(command=command,
92 # raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(
95 msg="helm init failed (it was already initialized): {}".format(e)
96 )
97
98 self.log.info("K8S Helm connector initialized")
99
100 @staticmethod
101 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
102 """
103 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
104 cluster_id for backward compatibility
105 """
106 namespace, _, cluster_id = cluster_uuid.rpartition(':')
107 return namespace, cluster_id
108
109 async def init_env(
110 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts on both sides:
114 client (OSM)
115 server (Tiller)
116
117 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 '.kube/config'
119 :param namespace: optional namespace to be used for helm. By default,
120 'kube-system' will be used
121 :param reuse_cluster_uuid: existing cluster uuid for reuse
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
129 namespace = namespace_ or namespace
130 else:
131 cluster_id = str(uuid4())
132 cluster_uuid = "{}:{}".format(namespace, cluster_id)
133
134 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
135
136 # create config filename
137 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
138 cluster_name=cluster_id, create_if_not_exist=True
139 )
140 with open(config_filename, "w") as f:
141 f.write(k8s_creds)
142
143 # check if tiller pod is up in cluster
144 command = "{} --kubeconfig={} --namespace={} get deployments".format(
145 self.kubectl_command, config_filename, namespace
146 )
147 output, _rc = await self._local_async_exec(
148 command=command, raise_exception_on_error=True
149 )
150
151 output_table = self._output_to_table(output=output)
152
153 # find 'tiller' pod in all pods
154 already_initialized = False
155 try:
156 for row in output_table:
157 if row[0].startswith("tiller-deploy"):
158 already_initialized = True
159 break
160 except Exception:
161 pass
162
163 # helm init
164 n2vc_installed_sw = False
165 if not already_initialized:
166 self.log.info(
167 "Initializing helm in client and server: {}".format(cluster_id)
168 )
169 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
170 self.kubectl_command, config_filename, self.service_account)
171 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
172
173 command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
174 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
175 ).format(self.kubectl_command, config_filename, self.service_account)
176 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
179 "init").format(self._helm_command, config_filename, namespace, helm_dir,
180 self.service_account)
181 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
182 n2vc_installed_sw = True
183 else:
184 # check client helm installation
185 check_file = helm_dir + "/repository/repositories.yaml"
186 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
187 self.log.info("Initializing helm in client: {}".format(cluster_id))
188 command = (
189 "{} --kubeconfig={} --tiller-namespace={} "
190 "--home={} init --client-only"
191 ).format(self._helm_command, config_filename, namespace, helm_dir)
192 output, _rc = await self._local_async_exec(
193 command=command, raise_exception_on_error=True
194 )
195 else:
196 self.log.info("Helm client already initialized")
197
198 self.log.info("Cluster {} initialized".format(cluster_id))
199
200 return cluster_uuid, n2vc_installed_sw
201
202 async def repo_add(
203 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
204 ):
205 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
206 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
207 cluster_id, repo_type, name, url))
208
209 # config filename
210 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
211 cluster_name=cluster_id, create_if_not_exist=True
212 )
213
214 # helm repo update
215 command = "{} --kubeconfig={} --home={} repo update".format(
216 self._helm_command, config_filename, helm_dir
217 )
218 self.log.debug("updating repo: {}".format(command))
219 await self._local_async_exec(command=command, raise_exception_on_error=False)
220
221 # helm repo add name url
222 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
223 self._helm_command, config_filename, helm_dir, name, url
224 )
225 self.log.debug("adding repo: {}".format(command))
226 await self._local_async_exec(command=command, raise_exception_on_error=True)
227
228 async def repo_list(self, cluster_uuid: str) -> list:
229 """
230 Get the list of registered repositories
231
232 :return: list of registered repositories: [ (name, url) .... ]
233 """
234
235 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
236 self.log.debug("list repositories for cluster {}".format(cluster_id))
237
238 # config filename
239 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
244 self._helm_command, config_filename, helm_dir
245 )
246
247 output, _rc = await self._local_async_exec(
248 command=command, raise_exception_on_error=True
249 )
250 if output and len(output) > 0:
251 return yaml.load(output, Loader=yaml.SafeLoader)
252 else:
253 return []
254
255 async def repo_remove(self, cluster_uuid: str, name: str):
256 """
257 Remove a repository from OSM
258
259 :param cluster_uuid: the cluster or 'namespace:cluster'
260 :param name: repo name in OSM
261 :return: True if successful
262 """
263
264 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
265 self.log.debug("list repositories for cluster {}".format(cluster_id))
266
267 # config filename
268 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
269 cluster_name=cluster_id, create_if_not_exist=True
270 )
271
272 command = "{} --kubeconfig={} --home={} repo remove {}".format(
273 self._helm_command, config_filename, helm_dir, name
274 )
275
276 await self._local_async_exec(command=command, raise_exception_on_error=True)
277
278 async def reset(
279 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
280 ) -> bool:
281
282 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
283 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
284 .format(cluster_id, uninstall_sw))
285
286 # get kube and helm directories
287 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
288 cluster_name=cluster_id, create_if_not_exist=False
289 )
290
291 # uninstall releases if needed.
292 if uninstall_sw:
293 releases = await self.instances_list(cluster_uuid=cluster_uuid)
294 if len(releases) > 0:
295 if force:
296 for r in releases:
297 try:
298 kdu_instance = r.get("Name")
299 chart = r.get("Chart")
300 self.log.debug(
301 "Uninstalling {} -> {}".format(chart, kdu_instance)
302 )
303 await self.uninstall(
304 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
305 )
306 except Exception as e:
307 self.log.error(
308 "Error uninstalling release {}: {}".format(kdu_instance, e)
309 )
310 else:
311 msg = (
312 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
313 ).format(cluster_id)
314 self.log.warn(msg)
315 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
316
317 if uninstall_sw:
318
319 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
320
321 if not namespace:
322 # find namespace for tiller pod
323 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
324 self.kubectl_command, config_filename
325 )
326 output, _rc = await self._local_async_exec(
327 command=command, raise_exception_on_error=False
328 )
329 output_table = K8sHelmConnector._output_to_table(output=output)
330 namespace = None
331 for r in output_table:
332 try:
333 if "tiller-deploy" in r[1]:
334 namespace = r[0]
335 break
336 except Exception:
337 pass
338 else:
339 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
340 self.log.error(msg)
341
342 self.log.debug("namespace for tiller: {}".format(namespace))
343
344 if namespace:
345 # uninstall tiller from cluster
346 self.log.debug(
347 "Uninstalling tiller from cluster {}".format(cluster_id)
348 )
349 command = "{} --kubeconfig={} --home={} reset".format(
350 self._helm_command, config_filename, helm_dir
351 )
352 self.log.debug("resetting: {}".format(command))
353 output, _rc = await self._local_async_exec(
354 command=command, raise_exception_on_error=True
355 )
356 # Delete clusterrolebinding and serviceaccount.
357 # Ignore if errors for backward compatibility
358 command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
359 "io/osm-tiller-cluster-rule").format(self.kubectl_command,
360 config_filename)
361 output, _rc = await self._local_async_exec(command=command,
362 raise_exception_on_error=False)
363 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
364 format(self.kubectl_command, config_filename, self.service_account)
365 output, _rc = await self._local_async_exec(command=command,
366 raise_exception_on_error=False)
367
368 else:
369 self.log.debug("namespace not found")
370
371 # delete cluster directory
372 direct = self.fs.path + "/" + cluster_id
373 self.log.debug("Removing directory {}".format(direct))
374 shutil.rmtree(direct, ignore_errors=True)
375
376 return True
377
378 async def install(
379 self,
380 cluster_uuid: str,
381 kdu_model: str,
382 atomic: bool = True,
383 timeout: float = 300,
384 params: dict = None,
385 db_dict: dict = None,
386 kdu_name: str = None,
387 namespace: str = None,
388 ):
389
390 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
391 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
392
393 # config filename
394 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
395 cluster_name=cluster_id, create_if_not_exist=True
396 )
397
398 # params to str
399 # params_str = K8sHelmConnector._params_to_set_option(params)
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 timeout_str = ""
405 if timeout:
406 timeout_str = "--timeout {}".format(timeout)
407
408 # atomic
409 atomic_str = ""
410 if atomic:
411 atomic_str = "--atomic"
412 # namespace
413 namespace_str = ""
414 if namespace:
415 namespace_str = "--namespace {}".format(namespace)
416
417 # version
418 version_str = ""
419 if ":" in kdu_model:
420 parts = kdu_model.split(sep=":")
421 if len(parts) == 2:
422 version_str = "--version {}".format(parts[1])
423 kdu_model = parts[0]
424
425 # generate a name for the release. Then, check if already exists
426 kdu_instance = None
427 while kdu_instance is None:
428 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
429 try:
430 result = await self._status_kdu(
431 cluster_id=cluster_id,
432 kdu_instance=kdu_instance,
433 show_error_log=False,
434 )
435 if result is not None:
436 # instance already exists: generate a new one
437 kdu_instance = None
438 except K8sException:
439 pass
440
441 # helm repo install
442 command = (
443 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
444 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
445 helm=self._helm_command,
446 atomic=atomic_str,
447 config=config_filename,
448 dir=helm_dir,
449 params=params_str,
450 timeout=timeout_str,
451 name=kdu_instance,
452 ns=namespace_str,
453 model=kdu_model,
454 ver=version_str,
455 )
456 )
457 self.log.debug("installing: {}".format(command))
458
459 if atomic:
460 # exec helm in a task
461 exec_task = asyncio.ensure_future(
462 coro_or_future=self._local_async_exec(
463 command=command, raise_exception_on_error=False
464 )
465 )
466
467 # write status in another task
468 status_task = asyncio.ensure_future(
469 coro_or_future=self._store_status(
470 cluster_id=cluster_id,
471 kdu_instance=kdu_instance,
472 db_dict=db_dict,
473 operation="install",
474 run_once=False,
475 )
476 )
477
478 # wait for execution task
479 await asyncio.wait([exec_task])
480
481 # cancel status task
482 status_task.cancel()
483
484 output, rc = exec_task.result()
485
486 else:
487
488 output, rc = await self._local_async_exec(
489 command=command, raise_exception_on_error=False
490 )
491
492 # remove temporal values yaml file
493 if file_to_delete:
494 os.remove(file_to_delete)
495
496 # write final status
497 await self._store_status(
498 cluster_id=cluster_id,
499 kdu_instance=kdu_instance,
500 db_dict=db_dict,
501 operation="install",
502 run_once=True,
503 check_every=0,
504 )
505
506 if rc != 0:
507 msg = "Error executing command: {}\nOutput: {}".format(command, output)
508 self.log.error(msg)
509 raise K8sException(msg)
510
511 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
512 return kdu_instance
513
514 async def instances_list(self, cluster_uuid: str) -> list:
515 """
516 returns a list of deployed releases in a cluster
517
518 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
519 :return:
520 """
521
522 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
523 self.log.debug("list releases for cluster {}".format(cluster_id))
524
525 # config filename
526 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
527 cluster_name=cluster_id, create_if_not_exist=True
528 )
529
530 command = "{} --kubeconfig={} --home={} list --output yaml".format(
531 self._helm_command, config_filename, helm_dir
532 )
533
534 output, _rc = await self._local_async_exec(
535 command=command, raise_exception_on_error=True
536 )
537
538 if output and len(output) > 0:
539 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
540 else:
541 return []
542
543 async def upgrade(
544 self,
545 cluster_uuid: str,
546 kdu_instance: str,
547 kdu_model: str = None,
548 atomic: bool = True,
549 timeout: float = 300,
550 params: dict = None,
551 db_dict: dict = None,
552 ):
553
554 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
555 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
556
557 # config filename
558 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
559 cluster_name=cluster_id, create_if_not_exist=True
560 )
561
562 # params to str
563 # params_str = K8sHelmConnector._params_to_set_option(params)
564 params_str, file_to_delete = self._params_to_file_option(
565 cluster_id=cluster_id, params=params
566 )
567
568 timeout_str = ""
569 if timeout:
570 timeout_str = "--timeout {}".format(timeout)
571
572 # atomic
573 atomic_str = ""
574 if atomic:
575 atomic_str = "--atomic"
576
577 # version
578 version_str = ""
579 if kdu_model and ":" in kdu_model:
580 parts = kdu_model.split(sep=":")
581 if len(parts) == 2:
582 version_str = "--version {}".format(parts[1])
583 kdu_model = parts[0]
584
585 # helm repo upgrade
586 command = (
587 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
588 ).format(
589 self._helm_command,
590 atomic_str,
591 config_filename,
592 helm_dir,
593 params_str,
594 timeout_str,
595 kdu_instance,
596 kdu_model,
597 version_str,
598 )
599 self.log.debug("upgrading: {}".format(command))
600
601 if atomic:
602
603 # exec helm in a task
604 exec_task = asyncio.ensure_future(
605 coro_or_future=self._local_async_exec(
606 command=command, raise_exception_on_error=False
607 )
608 )
609 # write status in another task
610 status_task = asyncio.ensure_future(
611 coro_or_future=self._store_status(
612 cluster_id=cluster_id,
613 kdu_instance=kdu_instance,
614 db_dict=db_dict,
615 operation="upgrade",
616 run_once=False,
617 )
618 )
619
620 # wait for execution task
621 await asyncio.wait([exec_task])
622
623 # cancel status task
624 status_task.cancel()
625 output, rc = exec_task.result()
626
627 else:
628
629 output, rc = await self._local_async_exec(
630 command=command, raise_exception_on_error=False
631 )
632
633 # remove temporal values yaml file
634 if file_to_delete:
635 os.remove(file_to_delete)
636
637 # write final status
638 await self._store_status(
639 cluster_id=cluster_id,
640 kdu_instance=kdu_instance,
641 db_dict=db_dict,
642 operation="upgrade",
643 run_once=True,
644 check_every=0,
645 )
646
647 if rc != 0:
648 msg = "Error executing command: {}\nOutput: {}".format(command, output)
649 self.log.error(msg)
650 raise K8sException(msg)
651
652 # return new revision number
653 instance = await self.get_instance_info(
654 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
655 )
656 if instance:
657 revision = int(instance.get("Revision"))
658 self.log.debug("New revision: {}".format(revision))
659 return revision
660 else:
661 return 0
662
663 async def rollback(
664 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
665 ):
666
667 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
668 self.log.debug(
669 "rollback kdu_instance {} to revision {} from cluster {}".format(
670 kdu_instance, revision, cluster_id
671 )
672 )
673
674 # config filename
675 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
676 cluster_name=cluster_id, create_if_not_exist=True
677 )
678
679 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
680 self._helm_command, config_filename, helm_dir, kdu_instance, revision
681 )
682
683 # exec helm in a task
684 exec_task = asyncio.ensure_future(
685 coro_or_future=self._local_async_exec(
686 command=command, raise_exception_on_error=False
687 )
688 )
689 # write status in another task
690 status_task = asyncio.ensure_future(
691 coro_or_future=self._store_status(
692 cluster_id=cluster_id,
693 kdu_instance=kdu_instance,
694 db_dict=db_dict,
695 operation="rollback",
696 run_once=False,
697 )
698 )
699
700 # wait for execution task
701 await asyncio.wait([exec_task])
702
703 # cancel status task
704 status_task.cancel()
705
706 output, rc = exec_task.result()
707
708 # write final status
709 await self._store_status(
710 cluster_id=cluster_id,
711 kdu_instance=kdu_instance,
712 db_dict=db_dict,
713 operation="rollback",
714 run_once=True,
715 check_every=0,
716 )
717
718 if rc != 0:
719 msg = "Error executing command: {}\nOutput: {}".format(command, output)
720 self.log.error(msg)
721 raise K8sException(msg)
722
723 # return new revision number
724 instance = await self.get_instance_info(
725 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
726 )
727 if instance:
728 revision = int(instance.get("Revision"))
729 self.log.debug("New revision: {}".format(revision))
730 return revision
731 else:
732 return 0
733
734 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
735 """
736 Removes an existing KDU instance. It would implicitly use the `delete` call
737 (this call would happen after all _terminate-config-primitive_ of the VNF
738 are invoked).
739
740 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
741 :param kdu_instance: unique name for the KDU instance to be deleted
742 :return: True if successful
743 """
744
745 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
746 self.log.debug(
747 "uninstall kdu_instance {} from cluster {}".format(
748 kdu_instance, cluster_id
749 )
750 )
751
752 # config filename
753 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
754 cluster_name=cluster_id, create_if_not_exist=True
755 )
756
757 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
758 self._helm_command, config_filename, helm_dir, kdu_instance
759 )
760
761 output, _rc = await self._local_async_exec(
762 command=command, raise_exception_on_error=True
763 )
764
765 return self._output_to_table(output)
766
767 async def exec_primitive(
768 self,
769 cluster_uuid: str = None,
770 kdu_instance: str = None,
771 primitive_name: str = None,
772 timeout: float = 300,
773 params: dict = None,
774 db_dict: dict = None,
775 ) -> str:
776 """Exec primitive (Juju action)
777
778 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
779 :param kdu_instance str: The unique name of the KDU instance
780 :param primitive_name: Name of action that will be executed
781 :param timeout: Timeout for action execution
782 :param params: Dictionary of all the parameters needed for the action
783 :db_dict: Dictionary for any additional data
784
785 :return: Returns the output of the action
786 """
787 raise K8sException(
788 "KDUs deployed with Helm don't support actions "
789 "different from rollback, upgrade and status"
790 )
791
792 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
793
794 self.log.debug(
795 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
796 )
797
798 return await self._exec_inspect_comand(
799 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
800 )
801
802 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
803
804 self.log.debug(
805 "inspect kdu_model values {} from (optional) repo: {}".format(
806 kdu_model, repo_url
807 )
808 )
809
810 return await self._exec_inspect_comand(
811 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
812 )
813
814 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
815
816 self.log.debug(
817 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
818 )
819
820 return await self._exec_inspect_comand(
821 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
822 )
823
824 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
825
826 # call internal function
827 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
828 return await self._status_kdu(
829 cluster_id=cluster_id,
830 kdu_instance=kdu_instance,
831 show_error_log=True,
832 return_text=True,
833 )
834
835 async def get_services(self,
836 cluster_uuid: str,
837 kdu_instance: str,
838 namespace: str) -> list:
839
840 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
841 self.log.debug(
842 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
843 cluster_uuid, kdu_instance
844 )
845 )
846
847 status = await self._status_kdu(
848 cluster_id, kdu_instance, return_text=False
849 )
850
851 service_names = self._parse_helm_status_service_info(status)
852 service_list = []
853 for service in service_names:
854 service = await self.get_service(cluster_uuid, service, namespace)
855 service_list.append(service)
856
857 return service_list
858
859 async def get_service(self,
860 cluster_uuid: str,
861 service_name: str,
862 namespace: str) -> object:
863
864 self.log.debug(
865 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
866 service_name, namespace, cluster_uuid)
867 )
868
869 # get paths
870 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
871 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
872 cluster_name=cluster_id, create_if_not_exist=True
873 )
874
875 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
876 self.kubectl_command, config_filename, namespace, service_name
877 )
878
879 output, _rc = await self._local_async_exec(
880 command=command, raise_exception_on_error=True
881 )
882
883 data = yaml.load(output, Loader=yaml.SafeLoader)
884
885 service = {
886 "name": service_name,
887 "type": self._get_deep(data, ("spec", "type")),
888 "ports": self._get_deep(data, ("spec", "ports")),
889 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
890 }
891 if service["type"] == "LoadBalancer":
892 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
893 ip_list = [elem["ip"] for elem in ip_map_list]
894 service["external_ip"] = ip_list
895
896 return service
897
898 async def synchronize_repos(self, cluster_uuid: str):
899
900 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
901 self.log.debug("syncronize repos for cluster helm-id: {}",)
902 try:
903 update_repos_timeout = (
904 300 # max timeout to sync a single repos, more than this is too much
905 )
906 db_k8scluster = self.db.get_one(
907 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
908 )
909 if db_k8scluster:
910 nbi_repo_list = (
911 db_k8scluster.get("_admin").get("helm_chart_repos") or []
912 )
913 cluster_repo_dict = (
914 db_k8scluster.get("_admin").get("helm_charts_added") or {}
915 )
916 # elements that must be deleted
917 deleted_repo_list = []
918 added_repo_dict = {}
919 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
920 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
921
922 # obtain repos to add: registered by nbi but not added
923 repos_to_add = [
924 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
925 ]
926
927 # obtain repos to delete: added by cluster but not in nbi list
928 repos_to_delete = [
929 repo
930 for repo in cluster_repo_dict.keys()
931 if repo not in nbi_repo_list
932 ]
933
934 # delete repos: must delete first then add because there may be
935 # different repos with same name but
936 # different id and url
937 self.log.debug("repos to delete: {}".format(repos_to_delete))
938 for repo_id in repos_to_delete:
939 # try to delete repos
940 try:
941 repo_delete_task = asyncio.ensure_future(
942 self.repo_remove(
943 cluster_uuid=cluster_uuid,
944 name=cluster_repo_dict[repo_id],
945 )
946 )
947 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
948 except Exception as e:
949 self.warning(
950 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
951 repo_id, cluster_repo_dict[repo_id], str(e)
952 )
953 )
954 # always add to the list of to_delete if there is an error
955 # because if is not there
956 # deleting raises error
957 deleted_repo_list.append(repo_id)
958
959 # add repos
960 self.log.debug("repos to add: {}".format(repos_to_add))
961 for repo_id in repos_to_add:
962 # obtain the repo data from the db
963 # if there is an error getting the repo in the database we will
964 # ignore this repo and continue
965 # because there is a possible race condition where the repo has
966 # been deleted while processing
967 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
968 self.log.debug(
969 "obtained repo: id, {}, name: {}, url: {}".format(
970 repo_id, db_repo["name"], db_repo["url"]
971 )
972 )
973 try:
974 repo_add_task = asyncio.ensure_future(
975 self.repo_add(
976 cluster_uuid=cluster_uuid,
977 name=db_repo["name"],
978 url=db_repo["url"],
979 repo_type="chart",
980 )
981 )
982 await asyncio.wait_for(repo_add_task, update_repos_timeout)
983 added_repo_dict[repo_id] = db_repo["name"]
984 self.log.debug(
985 "added repo: id, {}, name: {}".format(
986 repo_id, db_repo["name"]
987 )
988 )
989 except Exception as e:
990 # deal with error adding repo, adding a repo that already
991 # exists does not raise any error
992 # will not raise error because a wrong repos added by
993 # anyone could prevent instantiating any ns
994 self.log.error(
995 "Error adding repo id: {}, err_msg: {} ".format(
996 repo_id, repr(e)
997 )
998 )
999
1000 return deleted_repo_list, added_repo_dict
1001
1002 else: # else db_k8scluster does not exist
1003 raise K8sException(
1004 "k8cluster with helm-id : {} not found".format(cluster_uuid)
1005 )
1006
1007 except Exception as e:
1008 self.log.error("Error synchronizing repos: {}".format(str(e)))
1009 raise K8sException("Error synchronizing repos")
1010
1011 """
1012 ####################################################################################
1013 ################################### P R I V A T E ##################################
1014 ####################################################################################
1015 """
1016
1017 async def _exec_inspect_comand(
1018 self, inspect_command: str, kdu_model: str, repo_url: str = None
1019 ):
1020
1021 repo_str = ""
1022 if repo_url:
1023 repo_str = " --repo {}".format(repo_url)
1024 idx = kdu_model.find("/")
1025 if idx >= 0:
1026 idx += 1
1027 kdu_model = kdu_model[idx:]
1028
1029 inspect_command = "{} inspect {} {}{}".format(
1030 self._helm_command, inspect_command, kdu_model, repo_str
1031 )
1032 output, _rc = await self._local_async_exec(
1033 command=inspect_command, encode_utf8=True
1034 )
1035
1036 return output
1037
1038 async def _status_kdu(
1039 self,
1040 cluster_id: str,
1041 kdu_instance: str,
1042 show_error_log: bool = False,
1043 return_text: bool = False,
1044 ):
1045
1046 self.log.debug("status of kdu_instance {}".format(kdu_instance))
1047
1048 # config filename
1049 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
1050 cluster_name=cluster_id, create_if_not_exist=True
1051 )
1052
1053 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
1054 self._helm_command, config_filename, helm_dir, kdu_instance
1055 )
1056
1057 output, rc = await self._local_async_exec(
1058 command=command,
1059 raise_exception_on_error=True,
1060 show_error_log=show_error_log,
1061 )
1062
1063 if return_text:
1064 return str(output)
1065
1066 if rc != 0:
1067 return None
1068
1069 data = yaml.load(output, Loader=yaml.SafeLoader)
1070
1071 # remove field 'notes'
1072 try:
1073 del data.get("info").get("status")["notes"]
1074 except KeyError:
1075 pass
1076
1077 # parse field 'resources'
1078 try:
1079 resources = str(data.get("info").get("status").get("resources"))
1080 resource_table = self._output_to_table(resources)
1081 data.get("info").get("status")["resources"] = resource_table
1082 except Exception:
1083 pass
1084
1085 return data
1086
1087 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1088 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1089 for instance in instances:
1090 if instance.get("Name") == kdu_instance:
1091 return instance
1092 self.log.debug("Instance {} not found".format(kdu_instance))
1093 return None
1094
1095 @staticmethod
1096 def _generate_release_name(chart_name: str):
1097 # check embeded chart (file or dir)
1098 if chart_name.startswith("/"):
1099 # extract file or directory name
1100 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1101 # check URL
1102 elif "://" in chart_name:
1103 # extract last portion of URL
1104 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1105
1106 name = ""
1107 for c in chart_name:
1108 if c.isalpha() or c.isnumeric():
1109 name += c
1110 else:
1111 name += "-"
1112 if len(name) > 35:
1113 name = name[0:35]
1114
1115 # if does not start with alpha character, prefix 'a'
1116 if not name[0].isalpha():
1117 name = "a" + name
1118
1119 name += "-"
1120
1121 def get_random_number():
1122 r = random.randrange(start=1, stop=99999999)
1123 s = str(r)
1124 s = s.rjust(10, "0")
1125 return s
1126
1127 name = name + get_random_number()
1128 return name.lower()
1129
1130 async def _store_status(
1131 self,
1132 cluster_id: str,
1133 operation: str,
1134 kdu_instance: str,
1135 check_every: float = 10,
1136 db_dict: dict = None,
1137 run_once: bool = False,
1138 ):
1139 while True:
1140 try:
1141 await asyncio.sleep(check_every)
1142 detailed_status = await self._status_kdu(
1143 cluster_id=cluster_id, kdu_instance=kdu_instance,
1144 return_text=False
1145 )
1146 status = detailed_status.get("info").get("Description")
1147 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1148 # write status to db
1149 result = await self.write_app_status_to_db(
1150 db_dict=db_dict,
1151 status=str(status),
1152 detailed_status=str(detailed_status),
1153 operation=operation,
1154 )
1155 if not result:
1156 self.log.info("Error writing in database. Task exiting...")
1157 return
1158 except asyncio.CancelledError:
1159 self.log.debug("Task cancelled")
1160 return
1161 except Exception as e:
1162 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1163 pass
1164 finally:
1165 if run_once:
1166 return
1167
1168 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
1169
1170 status = await self._status_kdu(
1171 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
1172 )
1173
1174 # extract info.status.resources-> str
1175 # format:
1176 # ==> v1/Deployment
1177 # NAME READY UP-TO-DATE AVAILABLE AGE
1178 # halting-horse-mongodb 0/1 1 0 0s
1179 # halting-petit-mongodb 1/1 1 0 0s
1180 # blank line
1181 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1182
1183 # convert to table
1184 resources = K8sHelmConnector._output_to_table(resources)
1185
1186 num_lines = len(resources)
1187 index = 0
1188 while index < num_lines:
1189 try:
1190 line1 = resources[index]
1191 index += 1
1192 # find '==>' in column 0
1193 if line1[0] == "==>":
1194 line2 = resources[index]
1195 index += 1
1196 # find READY in column 1
1197 if line2[1] == "READY":
1198 # read next lines
1199 line3 = resources[index]
1200 index += 1
1201 while len(line3) > 1 and index < num_lines:
1202 ready_value = line3[1]
1203 parts = ready_value.split(sep="/")
1204 current = int(parts[0])
1205 total = int(parts[1])
1206 if current < total:
1207 self.log.debug("NOT READY:\n {}".format(line3))
1208 ready = False
1209 line3 = resources[index]
1210 index += 1
1211
1212 except Exception:
1213 pass
1214
1215 return ready
1216
1217 def _parse_helm_status_service_info(self, status):
1218
1219 # extract info.status.resources-> str
1220 # format:
1221 # ==> v1/Deployment
1222 # NAME READY UP-TO-DATE AVAILABLE AGE
1223 # halting-horse-mongodb 0/1 1 0 0s
1224 # halting-petit-mongodb 1/1 1 0 0s
1225 # blank line
1226 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1227
1228 service_list = []
1229 first_line_skipped = service_found = False
1230 for line in resources:
1231 if not service_found:
1232 if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
1233 service_found = True
1234 continue
1235 else:
1236 if len(line) >= 2 and line[0] == "==>":
1237 service_found = first_line_skipped = False
1238 continue
1239 if not line:
1240 continue
1241 if not first_line_skipped:
1242 first_line_skipped = True
1243 continue
1244 service_list.append(line[0])
1245
1246 return service_list
1247
1248 @staticmethod
1249 def _get_deep(dictionary: dict, members: tuple):
1250 target = dictionary
1251 value = None
1252 try:
1253 for m in members:
1254 value = target.get(m)
1255 if not value:
1256 return None
1257 else:
1258 target = value
1259 except Exception:
1260 pass
1261 return value
1262
1263 # find key:value in several lines
1264 @staticmethod
1265 def _find_in_lines(p_lines: list, p_key: str) -> str:
1266 for line in p_lines:
1267 try:
1268 if line.startswith(p_key + ":"):
1269 parts = line.split(":")
1270 the_value = parts[1].strip()
1271 return the_value
1272 except Exception:
1273 # ignore it
1274 pass
1275 return None
1276
1277 # params for use in -f file
1278 # returns values file option and filename (in order to delete it at the end)
1279 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1280
1281 if params and len(params) > 0:
1282 self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
1283
1284 def get_random_number():
1285 r = random.randrange(start=1, stop=99999999)
1286 s = str(r)
1287 while len(s) < 10:
1288 s = "0" + s
1289 return s
1290
1291 params2 = dict()
1292 for key in params:
1293 value = params.get(key)
1294 if "!!yaml" in str(value):
1295 value = yaml.load(value[7:])
1296 params2[key] = value
1297
1298 values_file = get_random_number() + ".yaml"
1299 with open(values_file, "w") as stream:
1300 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1301
1302 return "-f {}".format(values_file), values_file
1303
1304 return "", None
1305
1306 # params for use in --set option
1307 @staticmethod
1308 def _params_to_set_option(params: dict) -> str:
1309 params_str = ""
1310 if params and len(params) > 0:
1311 start = True
1312 for key in params:
1313 value = params.get(key, None)
1314 if value is not None:
1315 if start:
1316 params_str += "--set "
1317 start = False
1318 else:
1319 params_str += ","
1320 params_str += "{}={}".format(key, value)
1321 return params_str
1322
1323 @staticmethod
1324 def _output_to_lines(output: str) -> list:
1325 output_lines = list()
1326 lines = output.splitlines(keepends=False)
1327 for line in lines:
1328 line = line.strip()
1329 if len(line) > 0:
1330 output_lines.append(line)
1331 return output_lines
1332
1333 @staticmethod
1334 def _output_to_table(output: str) -> list:
1335 output_table = list()
1336 lines = output.splitlines(keepends=False)
1337 for line in lines:
1338 line = line.replace("\t", " ")
1339 line_list = list()
1340 output_table.append(line_list)
1341 cells = line.split(sep=" ")
1342 for cell in cells:
1343 cell = cell.strip()
1344 if len(cell) > 0:
1345 line_list.append(cell)
1346 return output_table
1347
1348 def _get_paths(
1349 self, cluster_name: str, create_if_not_exist: bool = False
1350 ) -> (str, str, str, str):
1351 """
1352 Returns kube and helm directories
1353
1354 :param cluster_name:
1355 :param create_if_not_exist:
1356 :return: kube, helm directories, config filename and cluster dir.
1357 Raises exception if not exist and cannot create
1358 """
1359
1360 base = self.fs.path
1361 if base.endswith("/") or base.endswith("\\"):
1362 base = base[:-1]
1363
1364 # base dir for cluster
1365 cluster_dir = base + "/" + cluster_name
1366 if create_if_not_exist and not os.path.exists(cluster_dir):
1367 self.log.debug("Creating dir {}".format(cluster_dir))
1368 os.makedirs(cluster_dir)
1369 if not os.path.exists(cluster_dir):
1370 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1371 self.log.error(msg)
1372 raise K8sException(msg)
1373
1374 # kube dir
1375 kube_dir = cluster_dir + "/" + ".kube"
1376 if create_if_not_exist and not os.path.exists(kube_dir):
1377 self.log.debug("Creating dir {}".format(kube_dir))
1378 os.makedirs(kube_dir)
1379 if not os.path.exists(kube_dir):
1380 msg = "Kube config dir {} does not exist".format(kube_dir)
1381 self.log.error(msg)
1382 raise K8sException(msg)
1383
1384 # helm home dir
1385 helm_dir = cluster_dir + "/" + ".helm"
1386 if create_if_not_exist and not os.path.exists(helm_dir):
1387 self.log.debug("Creating dir {}".format(helm_dir))
1388 os.makedirs(helm_dir)
1389 if not os.path.exists(helm_dir):
1390 msg = "Helm config dir {} does not exist".format(helm_dir)
1391 self.log.error(msg)
1392 raise K8sException(msg)
1393
1394 config_filename = kube_dir + "/config"
1395 return kube_dir, helm_dir, config_filename, cluster_dir
1396
1397 @staticmethod
1398 def _remove_multiple_spaces(strobj):
1399 strobj = strobj.strip()
1400 while " " in strobj:
1401 strobj = strobj.replace(" ", " ")
1402 return strobj
1403
1404 def _local_exec(self, command: str) -> (str, int):
1405 command = K8sHelmConnector._remove_multiple_spaces(command)
1406 self.log.debug("Executing sync local command: {}".format(command))
1407 # raise exception if fails
1408 output = ""
1409 try:
1410 output = subprocess.check_output(
1411 command, shell=True, universal_newlines=True
1412 )
1413 return_code = 0
1414 self.log.debug(output)
1415 except Exception:
1416 return_code = 1
1417
1418 return output, return_code
1419
1420 async def _local_async_exec(
1421 self,
1422 command: str,
1423 raise_exception_on_error: bool = False,
1424 show_error_log: bool = True,
1425 encode_utf8: bool = False,
1426 ) -> (str, int):
1427
1428 command = K8sHelmConnector._remove_multiple_spaces(command)
1429 self.log.debug("Executing async local command: {}".format(command))
1430
1431 # split command
1432 command = command.split(sep=" ")
1433
1434 try:
1435 process = await asyncio.create_subprocess_exec(
1436 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1437 )
1438
1439 # wait for command terminate
1440 stdout, stderr = await process.communicate()
1441
1442 return_code = process.returncode
1443
1444 output = ""
1445 if stdout:
1446 output = stdout.decode("utf-8").strip()
1447 # output = stdout.decode()
1448 if stderr:
1449 output = stderr.decode("utf-8").strip()
1450 # output = stderr.decode()
1451
1452 if return_code != 0 and show_error_log:
1453 self.log.debug(
1454 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1455 )
1456 else:
1457 self.log.debug("Return code: {}".format(return_code))
1458
1459 if raise_exception_on_error and return_code != 0:
1460 raise K8sException(output)
1461
1462 if encode_utf8:
1463 output = output.encode("utf-8").strip()
1464 output = str(output).replace("\\n", "\n")
1465
1466 return output, return_code
1467
1468 except asyncio.CancelledError:
1469 raise
1470 except K8sException:
1471 raise
1472 except Exception as e:
1473 msg = "Exception executing command: {} -> {}".format(command, e)
1474 self.log.error(msg)
1475 if raise_exception_on_error:
1476 raise K8sException(e) from e
1477 else:
1478 return "", -1
1479
1480 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1481 # self.log.debug('Checking if file {} exists...'.format(filename))
1482 if os.path.exists(filename):
1483 return True
1484 else:
1485 msg = "File {} does not exist".format(filename)
1486 if exception_if_not_exists:
1487 # self.log.error(msg)
1488 raise K8sException(msg)