Added new functionaliyty to obtain services data
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43 service_account = "osm"
44
45 def __init__(
46 self,
47 fs: object,
48 db: object,
49 kubectl_command: str = "/usr/bin/kubectl",
50 helm_command: str = "/usr/bin/helm",
51 log: object = None,
52 on_update_db=None,
53 ):
54 """
55
56 :param fs: file system for kubernetes and helm configuration
57 :param db: database object to write current operation status
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
60 :param log: logger
61 :param on_update_db: callback called when k8s connector updates database
62 """
63
64 # parent class
65 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
66
67 self.log.info("Initializing K8S Helm connector")
68
69 # random numbers for release name generation
70 random.seed(time.time())
71
72 # the file system
73 self.fs = fs
74
75 # exception if kubectl is not installed
76 self.kubectl_command = kubectl_command
77 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
78
79 # exception if helm is not installed
80 self._helm_command = helm_command
81 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
82
83 # initialize helm client-only
84 self.log.debug("Initializing helm client-only...")
85 command = "{} init --client-only".format(self._helm_command)
86 try:
87 asyncio.ensure_future(
88 self._local_async_exec(command=command, raise_exception_on_error=False)
89 )
90 # loop = asyncio.get_event_loop()
91 # loop.run_until_complete(self._local_async_exec(command=command,
92 # raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(
95 msg="helm init failed (it was already initialized): {}".format(e)
96 )
97
98 self.log.info("K8S Helm connector initialized")
99
100 @staticmethod
101 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
102 """
103 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
104 cluster_id for backward compatibility
105 """
106 namespace, _, cluster_id = cluster_uuid.rpartition(':')
107 return namespace, cluster_id
108
109 async def init_env(
110 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts on both sides:
114 client (OSM)
115 server (Tiller)
116
117 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 '.kube/config'
119 :param namespace: optional namespace to be used for helm. By default,
120 'kube-system' will be used
121 :param reuse_cluster_uuid: existing cluster uuid for reuse
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
129 namespace = namespace_ or namespace
130 else:
131 cluster_id = str(uuid4())
132 cluster_uuid = "{}:{}".format(namespace, cluster_id)
133
134 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
135
136 # create config filename
137 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
138 cluster_name=cluster_id, create_if_not_exist=True
139 )
140 with open(config_filename, "w") as f:
141 f.write(k8s_creds)
142
143 # check if tiller pod is up in cluster
144 command = "{} --kubeconfig={} --namespace={} get deployments".format(
145 self.kubectl_command, config_filename, namespace
146 )
147 output, _rc = await self._local_async_exec(
148 command=command, raise_exception_on_error=True
149 )
150
151 output_table = self._output_to_table(output=output)
152
153 # find 'tiller' pod in all pods
154 already_initialized = False
155 try:
156 for row in output_table:
157 if row[0].startswith("tiller-deploy"):
158 already_initialized = True
159 break
160 except Exception:
161 pass
162
163 # helm init
164 n2vc_installed_sw = False
165 if not already_initialized:
166 self.log.info(
167 "Initializing helm in client and server: {}".format(cluster_id)
168 )
169 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
170 self.kubectl_command, config_filename, self.service_account)
171 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
172
173 command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
174 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
175 ).format(self.kubectl_command, config_filename, self.service_account)
176 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
179 "init").format(self._helm_command, config_filename, namespace, helm_dir,
180 self.service_account)
181 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
182 n2vc_installed_sw = True
183 else:
184 # check client helm installation
185 check_file = helm_dir + "/repository/repositories.yaml"
186 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
187 self.log.info("Initializing helm in client: {}".format(cluster_id))
188 command = (
189 "{} --kubeconfig={} --tiller-namespace={} "
190 "--home={} init --client-only"
191 ).format(self._helm_command, config_filename, namespace, helm_dir)
192 output, _rc = await self._local_async_exec(
193 command=command, raise_exception_on_error=True
194 )
195 else:
196 self.log.info("Helm client already initialized")
197
198 self.log.info("Cluster {} initialized".format(cluster_id))
199
200 return cluster_uuid, n2vc_installed_sw
201
202 async def repo_add(
203 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
204 ):
205 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
206 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
207 cluster_id, repo_type, name, url))
208
209 # config filename
210 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
211 cluster_name=cluster_id, create_if_not_exist=True
212 )
213
214 # helm repo update
215 command = "{} --kubeconfig={} --home={} repo update".format(
216 self._helm_command, config_filename, helm_dir
217 )
218 self.log.debug("updating repo: {}".format(command))
219 await self._local_async_exec(command=command, raise_exception_on_error=False)
220
221 # helm repo add name url
222 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
223 self._helm_command, config_filename, helm_dir, name, url
224 )
225 self.log.debug("adding repo: {}".format(command))
226 await self._local_async_exec(command=command, raise_exception_on_error=True)
227
228 async def repo_list(self, cluster_uuid: str) -> list:
229 """
230 Get the list of registered repositories
231
232 :return: list of registered repositories: [ (name, url) .... ]
233 """
234
235 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
236 self.log.debug("list repositories for cluster {}".format(cluster_id))
237
238 # config filename
239 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
244 self._helm_command, config_filename, helm_dir
245 )
246
247 output, _rc = await self._local_async_exec(
248 command=command, raise_exception_on_error=True
249 )
250 if output and len(output) > 0:
251 return yaml.load(output, Loader=yaml.SafeLoader)
252 else:
253 return []
254
255 async def repo_remove(self, cluster_uuid: str, name: str):
256 """
257 Remove a repository from OSM
258
259 :param cluster_uuid: the cluster or 'namespace:cluster'
260 :param name: repo name in OSM
261 :return: True if successful
262 """
263
264 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
265 self.log.debug("list repositories for cluster {}".format(cluster_id))
266
267 # config filename
268 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
269 cluster_name=cluster_id, create_if_not_exist=True
270 )
271
272 command = "{} --kubeconfig={} --home={} repo remove {}".format(
273 self._helm_command, config_filename, helm_dir, name
274 )
275
276 await self._local_async_exec(command=command, raise_exception_on_error=True)
277
278 async def reset(
279 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
280 ) -> bool:
281
282 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
283 self.log.debug(
284 "Resetting K8s environment. cluster uuid: {}".format(cluster_id)
285 )
286
287 # get kube and helm directories
288 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
289 cluster_name=cluster_id, create_if_not_exist=False
290 )
291
292 # uninstall releases if needed
293 releases = await self.instances_list(cluster_uuid=cluster_uuid)
294 if len(releases) > 0:
295 if force:
296 for r in releases:
297 try:
298 kdu_instance = r.get("Name")
299 chart = r.get("Chart")
300 self.log.debug(
301 "Uninstalling {} -> {}".format(chart, kdu_instance)
302 )
303 await self.uninstall(
304 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
305 )
306 except Exception as e:
307 self.log.error(
308 "Error uninstalling release {}: {}".format(kdu_instance, e)
309 )
310 else:
311 msg = (
312 "Cluster has releases and not force. Cannot reset K8s "
313 "environment. Cluster uuid: {}"
314 ).format(cluster_id)
315 self.log.error(msg)
316 raise K8sException(msg)
317
318 if uninstall_sw:
319
320 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
321
322 if not namespace:
323 # find namespace for tiller pod
324 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
325 self.kubectl_command, config_filename
326 )
327 output, _rc = await self._local_async_exec(
328 command=command, raise_exception_on_error=False
329 )
330 output_table = K8sHelmConnector._output_to_table(output=output)
331 namespace = None
332 for r in output_table:
333 try:
334 if "tiller-deploy" in r[1]:
335 namespace = r[0]
336 break
337 except Exception:
338 pass
339 else:
340 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
341 self.log.error(msg)
342
343 self.log.debug("namespace for tiller: {}".format(namespace))
344
345 if namespace:
346 # uninstall tiller from cluster
347 self.log.debug(
348 "Uninstalling tiller from cluster {}".format(cluster_id)
349 )
350 command = "{} --kubeconfig={} --home={} reset".format(
351 self._helm_command, config_filename, helm_dir
352 )
353 self.log.debug("resetting: {}".format(command))
354 output, _rc = await self._local_async_exec(
355 command=command, raise_exception_on_error=True
356 )
357 # Delete clusterrolebinding and serviceaccount.
358 # Ignore if errors for backward compatibility
359 command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
360 "io/osm-tiller-cluster-rule").format(self.kubectl_command,
361 config_filename)
362 output, _rc = await self._local_async_exec(command=command,
363 raise_exception_on_error=False)
364 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
365 format(self.kubectl_command, config_filename, self.service_account)
366 output, _rc = await self._local_async_exec(command=command,
367 raise_exception_on_error=False)
368
369 else:
370 self.log.debug("namespace not found")
371
372 # delete cluster directory
373 direct = self.fs.path + "/" + cluster_id
374 self.log.debug("Removing directory {}".format(direct))
375 shutil.rmtree(direct, ignore_errors=True)
376
377 return True
378
379 async def install(
380 self,
381 cluster_uuid: str,
382 kdu_model: str,
383 atomic: bool = True,
384 timeout: float = 300,
385 params: dict = None,
386 db_dict: dict = None,
387 kdu_name: str = None,
388 namespace: str = None,
389 ):
390
391 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
392 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
393
394 # config filename
395 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
399 # params to str
400 # params_str = K8sHelmConnector._params_to_set_option(params)
401 params_str, file_to_delete = self._params_to_file_option(
402 cluster_id=cluster_id, params=params
403 )
404
405 timeout_str = ""
406 if timeout:
407 timeout_str = "--timeout {}".format(timeout)
408
409 # atomic
410 atomic_str = ""
411 if atomic:
412 atomic_str = "--atomic"
413 # namespace
414 namespace_str = ""
415 if namespace:
416 namespace_str = "--namespace {}".format(namespace)
417
418 # version
419 version_str = ""
420 if ":" in kdu_model:
421 parts = kdu_model.split(sep=":")
422 if len(parts) == 2:
423 version_str = "--version {}".format(parts[1])
424 kdu_model = parts[0]
425
426 # generate a name for the release. Then, check if already exists
427 kdu_instance = None
428 while kdu_instance is None:
429 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
430 try:
431 result = await self._status_kdu(
432 cluster_id=cluster_id,
433 kdu_instance=kdu_instance,
434 show_error_log=False,
435 )
436 if result is not None:
437 # instance already exists: generate a new one
438 kdu_instance = None
439 except K8sException:
440 pass
441
442 # helm repo install
443 command = (
444 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
445 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
446 helm=self._helm_command,
447 atomic=atomic_str,
448 config=config_filename,
449 dir=helm_dir,
450 params=params_str,
451 timeout=timeout_str,
452 name=kdu_instance,
453 ns=namespace_str,
454 model=kdu_model,
455 ver=version_str,
456 )
457 )
458 self.log.debug("installing: {}".format(command))
459
460 if atomic:
461 # exec helm in a task
462 exec_task = asyncio.ensure_future(
463 coro_or_future=self._local_async_exec(
464 command=command, raise_exception_on_error=False
465 )
466 )
467
468 # write status in another task
469 status_task = asyncio.ensure_future(
470 coro_or_future=self._store_status(
471 cluster_id=cluster_id,
472 kdu_instance=kdu_instance,
473 db_dict=db_dict,
474 operation="install",
475 run_once=False,
476 )
477 )
478
479 # wait for execution task
480 await asyncio.wait([exec_task])
481
482 # cancel status task
483 status_task.cancel()
484
485 output, rc = exec_task.result()
486
487 else:
488
489 output, rc = await self._local_async_exec(
490 command=command, raise_exception_on_error=False
491 )
492
493 # remove temporal values yaml file
494 if file_to_delete:
495 os.remove(file_to_delete)
496
497 # write final status
498 await self._store_status(
499 cluster_id=cluster_id,
500 kdu_instance=kdu_instance,
501 db_dict=db_dict,
502 operation="install",
503 run_once=True,
504 check_every=0,
505 )
506
507 if rc != 0:
508 msg = "Error executing command: {}\nOutput: {}".format(command, output)
509 self.log.error(msg)
510 raise K8sException(msg)
511
512 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
513 return kdu_instance
514
515 async def instances_list(self, cluster_uuid: str) -> list:
516 """
517 returns a list of deployed releases in a cluster
518
519 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
520 :return:
521 """
522
523 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
524 self.log.debug("list releases for cluster {}".format(cluster_id))
525
526 # config filename
527 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
528 cluster_name=cluster_id, create_if_not_exist=True
529 )
530
531 command = "{} --kubeconfig={} --home={} list --output yaml".format(
532 self._helm_command, config_filename, helm_dir
533 )
534
535 output, _rc = await self._local_async_exec(
536 command=command, raise_exception_on_error=True
537 )
538
539 if output and len(output) > 0:
540 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
541 else:
542 return []
543
544 async def upgrade(
545 self,
546 cluster_uuid: str,
547 kdu_instance: str,
548 kdu_model: str = None,
549 atomic: bool = True,
550 timeout: float = 300,
551 params: dict = None,
552 db_dict: dict = None,
553 ):
554
555 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
556 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
557
558 # config filename
559 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
560 cluster_name=cluster_id, create_if_not_exist=True
561 )
562
563 # params to str
564 # params_str = K8sHelmConnector._params_to_set_option(params)
565 params_str, file_to_delete = self._params_to_file_option(
566 cluster_id=cluster_id, params=params
567 )
568
569 timeout_str = ""
570 if timeout:
571 timeout_str = "--timeout {}".format(timeout)
572
573 # atomic
574 atomic_str = ""
575 if atomic:
576 atomic_str = "--atomic"
577
578 # version
579 version_str = ""
580 if kdu_model and ":" in kdu_model:
581 parts = kdu_model.split(sep=":")
582 if len(parts) == 2:
583 version_str = "--version {}".format(parts[1])
584 kdu_model = parts[0]
585
586 # helm repo upgrade
587 command = (
588 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
589 ).format(
590 self._helm_command,
591 atomic_str,
592 config_filename,
593 helm_dir,
594 params_str,
595 timeout_str,
596 kdu_instance,
597 kdu_model,
598 version_str,
599 )
600 self.log.debug("upgrading: {}".format(command))
601
602 if atomic:
603
604 # exec helm in a task
605 exec_task = asyncio.ensure_future(
606 coro_or_future=self._local_async_exec(
607 command=command, raise_exception_on_error=False
608 )
609 )
610 # write status in another task
611 status_task = asyncio.ensure_future(
612 coro_or_future=self._store_status(
613 cluster_id=cluster_id,
614 kdu_instance=kdu_instance,
615 db_dict=db_dict,
616 operation="upgrade",
617 run_once=False,
618 )
619 )
620
621 # wait for execution task
622 await asyncio.wait([exec_task])
623
624 # cancel status task
625 status_task.cancel()
626 output, rc = exec_task.result()
627
628 else:
629
630 output, rc = await self._local_async_exec(
631 command=command, raise_exception_on_error=False
632 )
633
634 # remove temporal values yaml file
635 if file_to_delete:
636 os.remove(file_to_delete)
637
638 # write final status
639 await self._store_status(
640 cluster_id=cluster_id,
641 kdu_instance=kdu_instance,
642 db_dict=db_dict,
643 operation="upgrade",
644 run_once=True,
645 check_every=0,
646 )
647
648 if rc != 0:
649 msg = "Error executing command: {}\nOutput: {}".format(command, output)
650 self.log.error(msg)
651 raise K8sException(msg)
652
653 # return new revision number
654 instance = await self.get_instance_info(
655 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
656 )
657 if instance:
658 revision = int(instance.get("Revision"))
659 self.log.debug("New revision: {}".format(revision))
660 return revision
661 else:
662 return 0
663
664 async def rollback(
665 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
666 ):
667
668 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
669 self.log.debug(
670 "rollback kdu_instance {} to revision {} from cluster {}".format(
671 kdu_instance, revision, cluster_id
672 )
673 )
674
675 # config filename
676 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
677 cluster_name=cluster_id, create_if_not_exist=True
678 )
679
680 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
681 self._helm_command, config_filename, helm_dir, kdu_instance, revision
682 )
683
684 # exec helm in a task
685 exec_task = asyncio.ensure_future(
686 coro_or_future=self._local_async_exec(
687 command=command, raise_exception_on_error=False
688 )
689 )
690 # write status in another task
691 status_task = asyncio.ensure_future(
692 coro_or_future=self._store_status(
693 cluster_id=cluster_id,
694 kdu_instance=kdu_instance,
695 db_dict=db_dict,
696 operation="rollback",
697 run_once=False,
698 )
699 )
700
701 # wait for execution task
702 await asyncio.wait([exec_task])
703
704 # cancel status task
705 status_task.cancel()
706
707 output, rc = exec_task.result()
708
709 # write final status
710 await self._store_status(
711 cluster_id=cluster_id,
712 kdu_instance=kdu_instance,
713 db_dict=db_dict,
714 operation="rollback",
715 run_once=True,
716 check_every=0,
717 )
718
719 if rc != 0:
720 msg = "Error executing command: {}\nOutput: {}".format(command, output)
721 self.log.error(msg)
722 raise K8sException(msg)
723
724 # return new revision number
725 instance = await self.get_instance_info(
726 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
727 )
728 if instance:
729 revision = int(instance.get("Revision"))
730 self.log.debug("New revision: {}".format(revision))
731 return revision
732 else:
733 return 0
734
735 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
736 """
737 Removes an existing KDU instance. It would implicitly use the `delete` call
738 (this call would happen after all _terminate-config-primitive_ of the VNF
739 are invoked).
740
741 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
742 :param kdu_instance: unique name for the KDU instance to be deleted
743 :return: True if successful
744 """
745
746 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
747 self.log.debug(
748 "uninstall kdu_instance {} from cluster {}".format(
749 kdu_instance, cluster_id
750 )
751 )
752
753 # config filename
754 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
755 cluster_name=cluster_id, create_if_not_exist=True
756 )
757
758 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
759 self._helm_command, config_filename, helm_dir, kdu_instance
760 )
761
762 output, _rc = await self._local_async_exec(
763 command=command, raise_exception_on_error=True
764 )
765
766 return self._output_to_table(output)
767
768 async def exec_primitive(
769 self,
770 cluster_uuid: str = None,
771 kdu_instance: str = None,
772 primitive_name: str = None,
773 timeout: float = 300,
774 params: dict = None,
775 db_dict: dict = None,
776 ) -> str:
777 """Exec primitive (Juju action)
778
779 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
780 :param kdu_instance str: The unique name of the KDU instance
781 :param primitive_name: Name of action that will be executed
782 :param timeout: Timeout for action execution
783 :param params: Dictionary of all the parameters needed for the action
784 :db_dict: Dictionary for any additional data
785
786 :return: Returns the output of the action
787 """
788 raise K8sException(
789 "KDUs deployed with Helm don't support actions "
790 "different from rollback, upgrade and status"
791 )
792
793 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
794
795 self.log.debug(
796 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
797 )
798
799 return await self._exec_inspect_comand(
800 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
801 )
802
803 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
804
805 self.log.debug(
806 "inspect kdu_model values {} from (optional) repo: {}".format(
807 kdu_model, repo_url
808 )
809 )
810
811 return await self._exec_inspect_comand(
812 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
813 )
814
815 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
816
817 self.log.debug(
818 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
819 )
820
821 return await self._exec_inspect_comand(
822 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
823 )
824
825 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
826
827 # call internal function
828 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
829 return await self._status_kdu(
830 cluster_id=cluster_id,
831 kdu_instance=kdu_instance,
832 show_error_log=True,
833 return_text=True,
834 )
835
836 async def get_services(self,
837 cluster_uuid: str,
838 kdu_instance: str,
839 namespace: str) -> list:
840
841 self.log.debug(
842 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
843 cluster_uuid, kdu_instance
844 )
845 )
846
847 status = await self._status_kdu(
848 cluster_uuid, kdu_instance, return_text=False
849 )
850
851 service_names = self._parse_helm_status_service_info(status)
852 service_list = []
853 for service in service_names:
854 service = await self.get_service(cluster_uuid, service, namespace)
855 service_list.append(service)
856
857 return service_list
858
859 async def get_service(self,
860 cluster_uuid: str,
861 service_name: str,
862 namespace: str) -> object:
863
864 self.log.debug(
865 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
866 service_name, namespace, cluster_uuid)
867 )
868
869 # get paths
870 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
871 cluster_name=cluster_uuid, create_if_not_exist=True
872 )
873
874 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
875 self.kubectl_command, config_filename, namespace, service_name
876 )
877
878 output, _rc = await self._local_async_exec(
879 command=command, raise_exception_on_error=True
880 )
881
882 data = yaml.load(output, Loader=yaml.SafeLoader)
883
884 service = {
885 "name": service_name,
886 "type": self._get_deep(data, ("spec", "type")),
887 "ports": self._get_deep(data, ("spec", "ports")),
888 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
889 }
890 if service["type"] == "LoadBalancer":
891 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
892 ip_list = [elem["ip"] for elem in ip_map_list]
893 service["external_ip"] = ip_list
894
895 return service
896
897 async def synchronize_repos(self, cluster_uuid: str):
898
899 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
900 self.log.debug("syncronize repos for cluster helm-id: {}",)
901 try:
902 update_repos_timeout = (
903 300 # max timeout to sync a single repos, more than this is too much
904 )
905 db_k8scluster = self.db.get_one(
906 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
907 )
908 if db_k8scluster:
909 nbi_repo_list = (
910 db_k8scluster.get("_admin").get("helm_chart_repos") or []
911 )
912 cluster_repo_dict = (
913 db_k8scluster.get("_admin").get("helm_charts_added") or {}
914 )
915 # elements that must be deleted
916 deleted_repo_list = []
917 added_repo_dict = {}
918 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
919 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
920
921 # obtain repos to add: registered by nbi but not added
922 repos_to_add = [
923 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
924 ]
925
926 # obtain repos to delete: added by cluster but not in nbi list
927 repos_to_delete = [
928 repo
929 for repo in cluster_repo_dict.keys()
930 if repo not in nbi_repo_list
931 ]
932
933 # delete repos: must delete first then add because there may be
934 # different repos with same name but
935 # different id and url
936 self.log.debug("repos to delete: {}".format(repos_to_delete))
937 for repo_id in repos_to_delete:
938 # try to delete repos
939 try:
940 repo_delete_task = asyncio.ensure_future(
941 self.repo_remove(
942 cluster_uuid=cluster_uuid,
943 name=cluster_repo_dict[repo_id],
944 )
945 )
946 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
947 except Exception as e:
948 self.warning(
949 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
950 repo_id, cluster_repo_dict[repo_id], str(e)
951 )
952 )
953 # always add to the list of to_delete if there is an error
954 # because if is not there
955 # deleting raises error
956 deleted_repo_list.append(repo_id)
957
958 # add repos
959 self.log.debug("repos to add: {}".format(repos_to_add))
960 for repo_id in repos_to_add:
961 # obtain the repo data from the db
962 # if there is an error getting the repo in the database we will
963 # ignore this repo and continue
964 # because there is a possible race condition where the repo has
965 # been deleted while processing
966 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
967 self.log.debug(
968 "obtained repo: id, {}, name: {}, url: {}".format(
969 repo_id, db_repo["name"], db_repo["url"]
970 )
971 )
972 try:
973 repo_add_task = asyncio.ensure_future(
974 self.repo_add(
975 cluster_uuid=cluster_uuid,
976 name=db_repo["name"],
977 url=db_repo["url"],
978 repo_type="chart",
979 )
980 )
981 await asyncio.wait_for(repo_add_task, update_repos_timeout)
982 added_repo_dict[repo_id] = db_repo["name"]
983 self.log.debug(
984 "added repo: id, {}, name: {}".format(
985 repo_id, db_repo["name"]
986 )
987 )
988 except Exception as e:
989 # deal with error adding repo, adding a repo that already
990 # exists does not raise any error
991 # will not raise error because a wrong repos added by
992 # anyone could prevent instantiating any ns
993 self.log.error(
994 "Error adding repo id: {}, err_msg: {} ".format(
995 repo_id, repr(e)
996 )
997 )
998
999 return deleted_repo_list, added_repo_dict
1000
1001 else: # else db_k8scluster does not exist
1002 raise K8sException(
1003 "k8cluster with helm-id : {} not found".format(cluster_uuid)
1004 )
1005
1006 except Exception as e:
1007 self.log.error("Error synchronizing repos: {}".format(str(e)))
1008 raise K8sException("Error synchronizing repos")
1009
1010 """
1011 ####################################################################################
1012 ################################### P R I V A T E ##################################
1013 ####################################################################################
1014 """
1015
1016 async def _exec_inspect_comand(
1017 self, inspect_command: str, kdu_model: str, repo_url: str = None
1018 ):
1019
1020 repo_str = ""
1021 if repo_url:
1022 repo_str = " --repo {}".format(repo_url)
1023 idx = kdu_model.find("/")
1024 if idx >= 0:
1025 idx += 1
1026 kdu_model = kdu_model[idx:]
1027
1028 inspect_command = "{} inspect {} {}{}".format(
1029 self._helm_command, inspect_command, kdu_model, repo_str
1030 )
1031 output, _rc = await self._local_async_exec(
1032 command=inspect_command, encode_utf8=True
1033 )
1034
1035 return output
1036
1037 async def _status_kdu(
1038 self,
1039 cluster_id: str,
1040 kdu_instance: str,
1041 show_error_log: bool = False,
1042 return_text: bool = False,
1043 ):
1044
1045 self.log.debug("status of kdu_instance {}".format(kdu_instance))
1046
1047 # config filename
1048 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
1049 cluster_name=cluster_id, create_if_not_exist=True
1050 )
1051
1052 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
1053 self._helm_command, config_filename, helm_dir, kdu_instance
1054 )
1055
1056 output, rc = await self._local_async_exec(
1057 command=command,
1058 raise_exception_on_error=True,
1059 show_error_log=show_error_log,
1060 )
1061
1062 if return_text:
1063 return str(output)
1064
1065 if rc != 0:
1066 return None
1067
1068 data = yaml.load(output, Loader=yaml.SafeLoader)
1069
1070 # remove field 'notes'
1071 try:
1072 del data.get("info").get("status")["notes"]
1073 except KeyError:
1074 pass
1075
1076 # parse field 'resources'
1077 try:
1078 resources = str(data.get("info").get("status").get("resources"))
1079 resource_table = self._output_to_table(resources)
1080 data.get("info").get("status")["resources"] = resource_table
1081 except Exception:
1082 pass
1083
1084 return data
1085
1086 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1087 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1088 for instance in instances:
1089 if instance.get("Name") == kdu_instance:
1090 return instance
1091 self.log.debug("Instance {} not found".format(kdu_instance))
1092 return None
1093
1094 @staticmethod
1095 def _generate_release_name(chart_name: str):
1096 # check embeded chart (file or dir)
1097 if chart_name.startswith("/"):
1098 # extract file or directory name
1099 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1100 # check URL
1101 elif "://" in chart_name:
1102 # extract last portion of URL
1103 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1104
1105 name = ""
1106 for c in chart_name:
1107 if c.isalpha() or c.isnumeric():
1108 name += c
1109 else:
1110 name += "-"
1111 if len(name) > 35:
1112 name = name[0:35]
1113
1114 # if does not start with alpha character, prefix 'a'
1115 if not name[0].isalpha():
1116 name = "a" + name
1117
1118 name += "-"
1119
1120 def get_random_number():
1121 r = random.randrange(start=1, stop=99999999)
1122 s = str(r)
1123 s = s.rjust(10, "0")
1124 return s
1125
1126 name = name + get_random_number()
1127 return name.lower()
1128
1129 async def _store_status(
1130 self,
1131 cluster_id: str,
1132 operation: str,
1133 kdu_instance: str,
1134 check_every: float = 10,
1135 db_dict: dict = None,
1136 run_once: bool = False,
1137 ):
1138 while True:
1139 try:
1140 await asyncio.sleep(check_every)
1141 detailed_status = await self._status_kdu(
1142 cluster_id=cluster_id, kdu_instance=kdu_instance,
1143 return_text=False
1144 )
1145 status = detailed_status.get("info").get("Description")
1146 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1147 # write status to db
1148 result = await self.write_app_status_to_db(
1149 db_dict=db_dict,
1150 status=str(status),
1151 detailed_status=str(detailed_status),
1152 operation=operation,
1153 )
1154 if not result:
1155 self.log.info("Error writing in database. Task exiting...")
1156 return
1157 except asyncio.CancelledError:
1158 self.log.debug("Task cancelled")
1159 return
1160 except Exception as e:
1161 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1162 pass
1163 finally:
1164 if run_once:
1165 return
1166
1167 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
1168
1169 status = await self._status_kdu(
1170 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
1171 )
1172
1173 # extract info.status.resources-> str
1174 # format:
1175 # ==> v1/Deployment
1176 # NAME READY UP-TO-DATE AVAILABLE AGE
1177 # halting-horse-mongodb 0/1 1 0 0s
1178 # halting-petit-mongodb 1/1 1 0 0s
1179 # blank line
1180 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1181
1182 # convert to table
1183 resources = K8sHelmConnector._output_to_table(resources)
1184
1185 num_lines = len(resources)
1186 index = 0
1187 while index < num_lines:
1188 try:
1189 line1 = resources[index]
1190 index += 1
1191 # find '==>' in column 0
1192 if line1[0] == "==>":
1193 line2 = resources[index]
1194 index += 1
1195 # find READY in column 1
1196 if line2[1] == "READY":
1197 # read next lines
1198 line3 = resources[index]
1199 index += 1
1200 while len(line3) > 1 and index < num_lines:
1201 ready_value = line3[1]
1202 parts = ready_value.split(sep="/")
1203 current = int(parts[0])
1204 total = int(parts[1])
1205 if current < total:
1206 self.log.debug("NOT READY:\n {}".format(line3))
1207 ready = False
1208 line3 = resources[index]
1209 index += 1
1210
1211 except Exception:
1212 pass
1213
1214 return ready
1215
1216 def _parse_helm_status_service_info(self, status):
1217
1218 # extract info.status.resources-> str
1219 # format:
1220 # ==> v1/Deployment
1221 # NAME READY UP-TO-DATE AVAILABLE AGE
1222 # halting-horse-mongodb 0/1 1 0 0s
1223 # halting-petit-mongodb 1/1 1 0 0s
1224 # blank line
1225 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1226
1227 service_list = []
1228 first_line_skipped = service_found = False
1229 for line in resources:
1230 if not service_found:
1231 if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
1232 service_found = True
1233 continue
1234 else:
1235 if len(line) >= 2 and line[0] == "==>":
1236 service_found = first_line_skipped = False
1237 continue
1238 if not line:
1239 continue
1240 if not first_line_skipped:
1241 first_line_skipped = True
1242 continue
1243 service_list.append(line[0])
1244
1245 return service_list
1246
1247 @staticmethod
1248 def _get_deep(dictionary: dict, members: tuple):
1249 target = dictionary
1250 value = None
1251 try:
1252 for m in members:
1253 value = target.get(m)
1254 if not value:
1255 return None
1256 else:
1257 target = value
1258 except Exception:
1259 pass
1260 return value
1261
1262 # find key:value in several lines
1263 @staticmethod
1264 def _find_in_lines(p_lines: list, p_key: str) -> str:
1265 for line in p_lines:
1266 try:
1267 if line.startswith(p_key + ":"):
1268 parts = line.split(":")
1269 the_value = parts[1].strip()
1270 return the_value
1271 except Exception:
1272 # ignore it
1273 pass
1274 return None
1275
1276 # params for use in -f file
1277 # returns values file option and filename (in order to delete it at the end)
1278 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1279
1280 if params and len(params) > 0:
1281 self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
1282
1283 def get_random_number():
1284 r = random.randrange(start=1, stop=99999999)
1285 s = str(r)
1286 while len(s) < 10:
1287 s = "0" + s
1288 return s
1289
1290 params2 = dict()
1291 for key in params:
1292 value = params.get(key)
1293 if "!!yaml" in str(value):
1294 value = yaml.load(value[7:])
1295 params2[key] = value
1296
1297 values_file = get_random_number() + ".yaml"
1298 with open(values_file, "w") as stream:
1299 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1300
1301 return "-f {}".format(values_file), values_file
1302
1303 return "", None
1304
1305 # params for use in --set option
1306 @staticmethod
1307 def _params_to_set_option(params: dict) -> str:
1308 params_str = ""
1309 if params and len(params) > 0:
1310 start = True
1311 for key in params:
1312 value = params.get(key, None)
1313 if value is not None:
1314 if start:
1315 params_str += "--set "
1316 start = False
1317 else:
1318 params_str += ","
1319 params_str += "{}={}".format(key, value)
1320 return params_str
1321
1322 @staticmethod
1323 def _output_to_lines(output: str) -> list:
1324 output_lines = list()
1325 lines = output.splitlines(keepends=False)
1326 for line in lines:
1327 line = line.strip()
1328 if len(line) > 0:
1329 output_lines.append(line)
1330 return output_lines
1331
1332 @staticmethod
1333 def _output_to_table(output: str) -> list:
1334 output_table = list()
1335 lines = output.splitlines(keepends=False)
1336 for line in lines:
1337 line = line.replace("\t", " ")
1338 line_list = list()
1339 output_table.append(line_list)
1340 cells = line.split(sep=" ")
1341 for cell in cells:
1342 cell = cell.strip()
1343 if len(cell) > 0:
1344 line_list.append(cell)
1345 return output_table
1346
1347 def _get_paths(
1348 self, cluster_name: str, create_if_not_exist: bool = False
1349 ) -> (str, str, str, str):
1350 """
1351 Returns kube and helm directories
1352
1353 :param cluster_name:
1354 :param create_if_not_exist:
1355 :return: kube, helm directories, config filename and cluster dir.
1356 Raises exception if not exist and cannot create
1357 """
1358
1359 base = self.fs.path
1360 if base.endswith("/") or base.endswith("\\"):
1361 base = base[:-1]
1362
1363 # base dir for cluster
1364 cluster_dir = base + "/" + cluster_name
1365 if create_if_not_exist and not os.path.exists(cluster_dir):
1366 self.log.debug("Creating dir {}".format(cluster_dir))
1367 os.makedirs(cluster_dir)
1368 if not os.path.exists(cluster_dir):
1369 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1370 self.log.error(msg)
1371 raise K8sException(msg)
1372
1373 # kube dir
1374 kube_dir = cluster_dir + "/" + ".kube"
1375 if create_if_not_exist and not os.path.exists(kube_dir):
1376 self.log.debug("Creating dir {}".format(kube_dir))
1377 os.makedirs(kube_dir)
1378 if not os.path.exists(kube_dir):
1379 msg = "Kube config dir {} does not exist".format(kube_dir)
1380 self.log.error(msg)
1381 raise K8sException(msg)
1382
1383 # helm home dir
1384 helm_dir = cluster_dir + "/" + ".helm"
1385 if create_if_not_exist and not os.path.exists(helm_dir):
1386 self.log.debug("Creating dir {}".format(helm_dir))
1387 os.makedirs(helm_dir)
1388 if not os.path.exists(helm_dir):
1389 msg = "Helm config dir {} does not exist".format(helm_dir)
1390 self.log.error(msg)
1391 raise K8sException(msg)
1392
1393 config_filename = kube_dir + "/config"
1394 return kube_dir, helm_dir, config_filename, cluster_dir
1395
1396 @staticmethod
1397 def _remove_multiple_spaces(strobj):
1398 strobj = strobj.strip()
1399 while " " in strobj:
1400 strobj = strobj.replace(" ", " ")
1401 return strobj
1402
1403 def _local_exec(self, command: str) -> (str, int):
1404 command = K8sHelmConnector._remove_multiple_spaces(command)
1405 self.log.debug("Executing sync local command: {}".format(command))
1406 # raise exception if fails
1407 output = ""
1408 try:
1409 output = subprocess.check_output(
1410 command, shell=True, universal_newlines=True
1411 )
1412 return_code = 0
1413 self.log.debug(output)
1414 except Exception:
1415 return_code = 1
1416
1417 return output, return_code
1418
1419 async def _local_async_exec(
1420 self,
1421 command: str,
1422 raise_exception_on_error: bool = False,
1423 show_error_log: bool = True,
1424 encode_utf8: bool = False,
1425 ) -> (str, int):
1426
1427 command = K8sHelmConnector._remove_multiple_spaces(command)
1428 self.log.debug("Executing async local command: {}".format(command))
1429
1430 # split command
1431 command = command.split(sep=" ")
1432
1433 try:
1434 process = await asyncio.create_subprocess_exec(
1435 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1436 )
1437
1438 # wait for command terminate
1439 stdout, stderr = await process.communicate()
1440
1441 return_code = process.returncode
1442
1443 output = ""
1444 if stdout:
1445 output = stdout.decode("utf-8").strip()
1446 # output = stdout.decode()
1447 if stderr:
1448 output = stderr.decode("utf-8").strip()
1449 # output = stderr.decode()
1450
1451 if return_code != 0 and show_error_log:
1452 self.log.debug(
1453 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1454 )
1455 else:
1456 self.log.debug("Return code: {}".format(return_code))
1457
1458 if raise_exception_on_error and return_code != 0:
1459 raise K8sException(output)
1460
1461 if encode_utf8:
1462 output = output.encode("utf-8").strip()
1463 output = str(output).replace("\\n", "\n")
1464
1465 return output, return_code
1466
1467 except asyncio.CancelledError:
1468 raise
1469 except K8sException:
1470 raise
1471 except Exception as e:
1472 msg = "Exception executing command: {} -> {}".format(command, e)
1473 self.log.error(msg)
1474 if raise_exception_on_error:
1475 raise K8sException(e) from e
1476 else:
1477 return "", -1
1478
1479 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1480 # self.log.debug('Checking if file {} exists...'.format(filename))
1481 if os.path.exists(filename):
1482 return True
1483 else:
1484 msg = "File {} does not exist".format(filename)
1485 if exception_if_not_exists:
1486 # self.log.error(msg)
1487 raise K8sException(msg)