Implement get_service and get_services methods for K8sJujuConnector
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43 service_account = "osm"
44
45 def __init__(
46 self,
47 fs: object,
48 db: object,
49 kubectl_command: str = "/usr/bin/kubectl",
50 helm_command: str = "/usr/bin/helm",
51 log: object = None,
52 on_update_db=None,
53 ):
54 """
55
56 :param fs: file system for kubernetes and helm configuration
57 :param db: database object to write current operation status
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
60 :param log: logger
61 :param on_update_db: callback called when k8s connector updates database
62 """
63
64 # parent class
65 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
66
67 self.log.info("Initializing K8S Helm connector")
68
69 # random numbers for release name generation
70 random.seed(time.time())
71
72 # the file system
73 self.fs = fs
74
75 # exception if kubectl is not installed
76 self.kubectl_command = kubectl_command
77 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
78
79 # exception if helm is not installed
80 self._helm_command = helm_command
81 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
82
83 # initialize helm client-only
84 self.log.debug("Initializing helm client-only...")
85 command = "{} init --client-only".format(self._helm_command)
86 try:
87 asyncio.ensure_future(
88 self._local_async_exec(command=command, raise_exception_on_error=False)
89 )
90 # loop = asyncio.get_event_loop()
91 # loop.run_until_complete(self._local_async_exec(command=command,
92 # raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(
95 msg="helm init failed (it was already initialized): {}".format(e)
96 )
97
98 self.log.info("K8S Helm connector initialized")
99
100 @staticmethod
101 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
102 """
103 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
104 cluster_id for backward compatibility
105 """
106 namespace, _, cluster_id = cluster_uuid.rpartition(':')
107 return namespace, cluster_id
108
109 async def init_env(
110 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts on both sides:
114 client (OSM)
115 server (Tiller)
116
117 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 '.kube/config'
119 :param namespace: optional namespace to be used for helm. By default,
120 'kube-system' will be used
121 :param reuse_cluster_uuid: existing cluster uuid for reuse
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
129 namespace = namespace_ or namespace
130 else:
131 cluster_id = str(uuid4())
132 cluster_uuid = "{}:{}".format(namespace, cluster_id)
133
134 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
135
136 # create config filename
137 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
138 cluster_name=cluster_id, create_if_not_exist=True
139 )
140 with open(config_filename, "w") as f:
141 f.write(k8s_creds)
142
143 # check if tiller pod is up in cluster
144 command = "{} --kubeconfig={} --namespace={} get deployments".format(
145 self.kubectl_command, config_filename, namespace
146 )
147 output, _rc = await self._local_async_exec(
148 command=command, raise_exception_on_error=True
149 )
150
151 output_table = self._output_to_table(output=output)
152
153 # find 'tiller' pod in all pods
154 already_initialized = False
155 try:
156 for row in output_table:
157 if row[0].startswith("tiller-deploy"):
158 already_initialized = True
159 break
160 except Exception:
161 pass
162
163 # helm init
164 n2vc_installed_sw = False
165 if not already_initialized:
166 self.log.info(
167 "Initializing helm in client and server: {}".format(cluster_id)
168 )
169 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
170 self.kubectl_command, config_filename, self.service_account)
171 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
172
173 command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
174 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
175 ).format(self.kubectl_command, config_filename, self.service_account)
176 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
177
178 command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
179 "init").format(self._helm_command, config_filename, namespace, helm_dir,
180 self.service_account)
181 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
182 n2vc_installed_sw = True
183 else:
184 # check client helm installation
185 check_file = helm_dir + "/repository/repositories.yaml"
186 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
187 self.log.info("Initializing helm in client: {}".format(cluster_id))
188 command = (
189 "{} --kubeconfig={} --tiller-namespace={} "
190 "--home={} init --client-only"
191 ).format(self._helm_command, config_filename, namespace, helm_dir)
192 output, _rc = await self._local_async_exec(
193 command=command, raise_exception_on_error=True
194 )
195 else:
196 self.log.info("Helm client already initialized")
197
198 self.log.info("Cluster {} initialized".format(cluster_id))
199
200 return cluster_uuid, n2vc_installed_sw
201
202 async def repo_add(
203 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
204 ):
205 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
206 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
207 cluster_id, repo_type, name, url))
208
209 # config filename
210 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
211 cluster_name=cluster_id, create_if_not_exist=True
212 )
213
214 # helm repo update
215 command = "{} --kubeconfig={} --home={} repo update".format(
216 self._helm_command, config_filename, helm_dir
217 )
218 self.log.debug("updating repo: {}".format(command))
219 await self._local_async_exec(command=command, raise_exception_on_error=False)
220
221 # helm repo add name url
222 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
223 self._helm_command, config_filename, helm_dir, name, url
224 )
225 self.log.debug("adding repo: {}".format(command))
226 await self._local_async_exec(command=command, raise_exception_on_error=True)
227
228 async def repo_list(self, cluster_uuid: str) -> list:
229 """
230 Get the list of registered repositories
231
232 :return: list of registered repositories: [ (name, url) .... ]
233 """
234
235 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
236 self.log.debug("list repositories for cluster {}".format(cluster_id))
237
238 # config filename
239 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
244 self._helm_command, config_filename, helm_dir
245 )
246
247 output, _rc = await self._local_async_exec(
248 command=command, raise_exception_on_error=True
249 )
250 if output and len(output) > 0:
251 return yaml.load(output, Loader=yaml.SafeLoader)
252 else:
253 return []
254
255 async def repo_remove(self, cluster_uuid: str, name: str):
256 """
257 Remove a repository from OSM
258
259 :param cluster_uuid: the cluster or 'namespace:cluster'
260 :param name: repo name in OSM
261 :return: True if successful
262 """
263
264 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
265 self.log.debug("list repositories for cluster {}".format(cluster_id))
266
267 # config filename
268 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
269 cluster_name=cluster_id, create_if_not_exist=True
270 )
271
272 command = "{} --kubeconfig={} --home={} repo remove {}".format(
273 self._helm_command, config_filename, helm_dir, name
274 )
275
276 await self._local_async_exec(command=command, raise_exception_on_error=True)
277
278 async def reset(
279 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
280 ) -> bool:
281
282 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
283 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
284 .format(cluster_id, uninstall_sw))
285
286 # get kube and helm directories
287 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
288 cluster_name=cluster_id, create_if_not_exist=False
289 )
290
291 # uninstall releases if needed.
292 if uninstall_sw:
293 releases = await self.instances_list(cluster_uuid=cluster_uuid)
294 if len(releases) > 0:
295 if force:
296 for r in releases:
297 try:
298 kdu_instance = r.get("Name")
299 chart = r.get("Chart")
300 self.log.debug(
301 "Uninstalling {} -> {}".format(chart, kdu_instance)
302 )
303 await self.uninstall(
304 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
305 )
306 except Exception as e:
307 self.log.error(
308 "Error uninstalling release {}: {}".format(kdu_instance, e)
309 )
310 else:
311 msg = (
312 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
313 ).format(cluster_id)
314 self.log.warn(msg)
315 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
316
317 if uninstall_sw:
318
319 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
320
321 if not namespace:
322 # find namespace for tiller pod
323 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
324 self.kubectl_command, config_filename
325 )
326 output, _rc = await self._local_async_exec(
327 command=command, raise_exception_on_error=False
328 )
329 output_table = K8sHelmConnector._output_to_table(output=output)
330 namespace = None
331 for r in output_table:
332 try:
333 if "tiller-deploy" in r[1]:
334 namespace = r[0]
335 break
336 except Exception:
337 pass
338 else:
339 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
340 self.log.error(msg)
341
342 self.log.debug("namespace for tiller: {}".format(namespace))
343
344 if namespace:
345 # uninstall tiller from cluster
346 self.log.debug(
347 "Uninstalling tiller from cluster {}".format(cluster_id)
348 )
349 command = "{} --kubeconfig={} --home={} reset".format(
350 self._helm_command, config_filename, helm_dir
351 )
352 self.log.debug("resetting: {}".format(command))
353 output, _rc = await self._local_async_exec(
354 command=command, raise_exception_on_error=True
355 )
356 # Delete clusterrolebinding and serviceaccount.
357 # Ignore if errors for backward compatibility
358 command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
359 "io/osm-tiller-cluster-rule").format(self.kubectl_command,
360 config_filename)
361 output, _rc = await self._local_async_exec(command=command,
362 raise_exception_on_error=False)
363 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
364 format(self.kubectl_command, config_filename, self.service_account)
365 output, _rc = await self._local_async_exec(command=command,
366 raise_exception_on_error=False)
367
368 else:
369 self.log.debug("namespace not found")
370
371 # delete cluster directory
372 direct = self.fs.path + "/" + cluster_id
373 self.log.debug("Removing directory {}".format(direct))
374 shutil.rmtree(direct, ignore_errors=True)
375
376 return True
377
378 async def install(
379 self,
380 cluster_uuid: str,
381 kdu_model: str,
382 atomic: bool = True,
383 timeout: float = 300,
384 params: dict = None,
385 db_dict: dict = None,
386 kdu_name: str = None,
387 namespace: str = None,
388 ):
389
390 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
391 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
392
393 # config filename
394 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
395 cluster_name=cluster_id, create_if_not_exist=True
396 )
397
398 # params to str
399 # params_str = K8sHelmConnector._params_to_set_option(params)
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 timeout_str = ""
405 if timeout:
406 timeout_str = "--timeout {}".format(timeout)
407
408 # atomic
409 atomic_str = ""
410 if atomic:
411 atomic_str = "--atomic"
412 # namespace
413 namespace_str = ""
414 if namespace:
415 namespace_str = "--namespace {}".format(namespace)
416
417 # version
418 version_str = ""
419 if ":" in kdu_model:
420 parts = kdu_model.split(sep=":")
421 if len(parts) == 2:
422 version_str = "--version {}".format(parts[1])
423 kdu_model = parts[0]
424
425 # generate a name for the release. Then, check if already exists
426 kdu_instance = None
427 while kdu_instance is None:
428 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
429 try:
430 result = await self._status_kdu(
431 cluster_id=cluster_id,
432 kdu_instance=kdu_instance,
433 show_error_log=False,
434 )
435 if result is not None:
436 # instance already exists: generate a new one
437 kdu_instance = None
438 except K8sException:
439 pass
440
441 # helm repo install
442 command = (
443 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
444 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
445 helm=self._helm_command,
446 atomic=atomic_str,
447 config=config_filename,
448 dir=helm_dir,
449 params=params_str,
450 timeout=timeout_str,
451 name=kdu_instance,
452 ns=namespace_str,
453 model=kdu_model,
454 ver=version_str,
455 )
456 )
457 self.log.debug("installing: {}".format(command))
458
459 if atomic:
460 # exec helm in a task
461 exec_task = asyncio.ensure_future(
462 coro_or_future=self._local_async_exec(
463 command=command, raise_exception_on_error=False
464 )
465 )
466
467 # write status in another task
468 status_task = asyncio.ensure_future(
469 coro_or_future=self._store_status(
470 cluster_id=cluster_id,
471 kdu_instance=kdu_instance,
472 db_dict=db_dict,
473 operation="install",
474 run_once=False,
475 )
476 )
477
478 # wait for execution task
479 await asyncio.wait([exec_task])
480
481 # cancel status task
482 status_task.cancel()
483
484 output, rc = exec_task.result()
485
486 else:
487
488 output, rc = await self._local_async_exec(
489 command=command, raise_exception_on_error=False
490 )
491
492 # remove temporal values yaml file
493 if file_to_delete:
494 os.remove(file_to_delete)
495
496 # write final status
497 await self._store_status(
498 cluster_id=cluster_id,
499 kdu_instance=kdu_instance,
500 db_dict=db_dict,
501 operation="install",
502 run_once=True,
503 check_every=0,
504 )
505
506 if rc != 0:
507 msg = "Error executing command: {}\nOutput: {}".format(command, output)
508 self.log.error(msg)
509 raise K8sException(msg)
510
511 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
512 return kdu_instance
513
514 async def instances_list(self, cluster_uuid: str) -> list:
515 """
516 returns a list of deployed releases in a cluster
517
518 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
519 :return:
520 """
521
522 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
523 self.log.debug("list releases for cluster {}".format(cluster_id))
524
525 # config filename
526 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
527 cluster_name=cluster_id, create_if_not_exist=True
528 )
529
530 command = "{} --kubeconfig={} --home={} list --output yaml".format(
531 self._helm_command, config_filename, helm_dir
532 )
533
534 output, _rc = await self._local_async_exec(
535 command=command, raise_exception_on_error=True
536 )
537
538 if output and len(output) > 0:
539 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
540 else:
541 return []
542
543 async def upgrade(
544 self,
545 cluster_uuid: str,
546 kdu_instance: str,
547 kdu_model: str = None,
548 atomic: bool = True,
549 timeout: float = 300,
550 params: dict = None,
551 db_dict: dict = None,
552 ):
553
554 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
555 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
556
557 # config filename
558 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
559 cluster_name=cluster_id, create_if_not_exist=True
560 )
561
562 # params to str
563 # params_str = K8sHelmConnector._params_to_set_option(params)
564 params_str, file_to_delete = self._params_to_file_option(
565 cluster_id=cluster_id, params=params
566 )
567
568 timeout_str = ""
569 if timeout:
570 timeout_str = "--timeout {}".format(timeout)
571
572 # atomic
573 atomic_str = ""
574 if atomic:
575 atomic_str = "--atomic"
576
577 # version
578 version_str = ""
579 if kdu_model and ":" in kdu_model:
580 parts = kdu_model.split(sep=":")
581 if len(parts) == 2:
582 version_str = "--version {}".format(parts[1])
583 kdu_model = parts[0]
584
585 # helm repo upgrade
586 command = (
587 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
588 ).format(
589 self._helm_command,
590 atomic_str,
591 config_filename,
592 helm_dir,
593 params_str,
594 timeout_str,
595 kdu_instance,
596 kdu_model,
597 version_str,
598 )
599 self.log.debug("upgrading: {}".format(command))
600
601 if atomic:
602
603 # exec helm in a task
604 exec_task = asyncio.ensure_future(
605 coro_or_future=self._local_async_exec(
606 command=command, raise_exception_on_error=False
607 )
608 )
609 # write status in another task
610 status_task = asyncio.ensure_future(
611 coro_or_future=self._store_status(
612 cluster_id=cluster_id,
613 kdu_instance=kdu_instance,
614 db_dict=db_dict,
615 operation="upgrade",
616 run_once=False,
617 )
618 )
619
620 # wait for execution task
621 await asyncio.wait([exec_task])
622
623 # cancel status task
624 status_task.cancel()
625 output, rc = exec_task.result()
626
627 else:
628
629 output, rc = await self._local_async_exec(
630 command=command, raise_exception_on_error=False
631 )
632
633 # remove temporal values yaml file
634 if file_to_delete:
635 os.remove(file_to_delete)
636
637 # write final status
638 await self._store_status(
639 cluster_id=cluster_id,
640 kdu_instance=kdu_instance,
641 db_dict=db_dict,
642 operation="upgrade",
643 run_once=True,
644 check_every=0,
645 )
646
647 if rc != 0:
648 msg = "Error executing command: {}\nOutput: {}".format(command, output)
649 self.log.error(msg)
650 raise K8sException(msg)
651
652 # return new revision number
653 instance = await self.get_instance_info(
654 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
655 )
656 if instance:
657 revision = int(instance.get("Revision"))
658 self.log.debug("New revision: {}".format(revision))
659 return revision
660 else:
661 return 0
662
663 async def rollback(
664 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
665 ):
666
667 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
668 self.log.debug(
669 "rollback kdu_instance {} to revision {} from cluster {}".format(
670 kdu_instance, revision, cluster_id
671 )
672 )
673
674 # config filename
675 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
676 cluster_name=cluster_id, create_if_not_exist=True
677 )
678
679 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
680 self._helm_command, config_filename, helm_dir, kdu_instance, revision
681 )
682
683 # exec helm in a task
684 exec_task = asyncio.ensure_future(
685 coro_or_future=self._local_async_exec(
686 command=command, raise_exception_on_error=False
687 )
688 )
689 # write status in another task
690 status_task = asyncio.ensure_future(
691 coro_or_future=self._store_status(
692 cluster_id=cluster_id,
693 kdu_instance=kdu_instance,
694 db_dict=db_dict,
695 operation="rollback",
696 run_once=False,
697 )
698 )
699
700 # wait for execution task
701 await asyncio.wait([exec_task])
702
703 # cancel status task
704 status_task.cancel()
705
706 output, rc = exec_task.result()
707
708 # write final status
709 await self._store_status(
710 cluster_id=cluster_id,
711 kdu_instance=kdu_instance,
712 db_dict=db_dict,
713 operation="rollback",
714 run_once=True,
715 check_every=0,
716 )
717
718 if rc != 0:
719 msg = "Error executing command: {}\nOutput: {}".format(command, output)
720 self.log.error(msg)
721 raise K8sException(msg)
722
723 # return new revision number
724 instance = await self.get_instance_info(
725 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
726 )
727 if instance:
728 revision = int(instance.get("Revision"))
729 self.log.debug("New revision: {}".format(revision))
730 return revision
731 else:
732 return 0
733
734 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
735 """
736 Removes an existing KDU instance. It would implicitly use the `delete` call
737 (this call would happen after all _terminate-config-primitive_ of the VNF
738 are invoked).
739
740 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
741 :param kdu_instance: unique name for the KDU instance to be deleted
742 :return: True if successful
743 """
744
745 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
746 self.log.debug(
747 "uninstall kdu_instance {} from cluster {}".format(
748 kdu_instance, cluster_id
749 )
750 )
751
752 # config filename
753 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
754 cluster_name=cluster_id, create_if_not_exist=True
755 )
756
757 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
758 self._helm_command, config_filename, helm_dir, kdu_instance
759 )
760
761 output, _rc = await self._local_async_exec(
762 command=command, raise_exception_on_error=True
763 )
764
765 return self._output_to_table(output)
766
767 async def exec_primitive(
768 self,
769 cluster_uuid: str = None,
770 kdu_instance: str = None,
771 primitive_name: str = None,
772 timeout: float = 300,
773 params: dict = None,
774 db_dict: dict = None,
775 ) -> str:
776 """Exec primitive (Juju action)
777
778 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
779 :param kdu_instance str: The unique name of the KDU instance
780 :param primitive_name: Name of action that will be executed
781 :param timeout: Timeout for action execution
782 :param params: Dictionary of all the parameters needed for the action
783 :db_dict: Dictionary for any additional data
784
785 :return: Returns the output of the action
786 """
787 raise K8sException(
788 "KDUs deployed with Helm don't support actions "
789 "different from rollback, upgrade and status"
790 )
791
792 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
793
794 self.log.debug(
795 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
796 )
797
798 return await self._exec_inspect_comand(
799 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
800 )
801
802 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
803
804 self.log.debug(
805 "inspect kdu_model values {} from (optional) repo: {}".format(
806 kdu_model, repo_url
807 )
808 )
809
810 return await self._exec_inspect_comand(
811 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
812 )
813
814 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
815
816 self.log.debug(
817 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
818 )
819
820 return await self._exec_inspect_comand(
821 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
822 )
823
824 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
825
826 # call internal function
827 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
828 return await self._status_kdu(
829 cluster_id=cluster_id,
830 kdu_instance=kdu_instance,
831 show_error_log=True,
832 return_text=True,
833 )
834
835 async def get_services(self,
836 cluster_uuid: str,
837 kdu_instance: str,
838 namespace: str) -> list:
839
840 self.log.debug(
841 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
842 cluster_uuid, kdu_instance
843 )
844 )
845
846 status = await self._status_kdu(
847 cluster_uuid, kdu_instance, return_text=False
848 )
849
850 service_names = self._parse_helm_status_service_info(status)
851 service_list = []
852 for service in service_names:
853 service = await self.get_service(cluster_uuid, service, namespace)
854 service_list.append(service)
855
856 return service_list
857
858 async def get_service(self,
859 cluster_uuid: str,
860 service_name: str,
861 namespace: str) -> object:
862
863 self.log.debug(
864 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
865 service_name, namespace, cluster_uuid)
866 )
867
868 # get paths
869 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
870 cluster_name=cluster_uuid, create_if_not_exist=True
871 )
872
873 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
874 self.kubectl_command, config_filename, namespace, service_name
875 )
876
877 output, _rc = await self._local_async_exec(
878 command=command, raise_exception_on_error=True
879 )
880
881 data = yaml.load(output, Loader=yaml.SafeLoader)
882
883 service = {
884 "name": service_name,
885 "type": self._get_deep(data, ("spec", "type")),
886 "ports": self._get_deep(data, ("spec", "ports")),
887 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
888 }
889 if service["type"] == "LoadBalancer":
890 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
891 ip_list = [elem["ip"] for elem in ip_map_list]
892 service["external_ip"] = ip_list
893
894 return service
895
896 async def synchronize_repos(self, cluster_uuid: str):
897
898 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
899 self.log.debug("syncronize repos for cluster helm-id: {}",)
900 try:
901 update_repos_timeout = (
902 300 # max timeout to sync a single repos, more than this is too much
903 )
904 db_k8scluster = self.db.get_one(
905 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
906 )
907 if db_k8scluster:
908 nbi_repo_list = (
909 db_k8scluster.get("_admin").get("helm_chart_repos") or []
910 )
911 cluster_repo_dict = (
912 db_k8scluster.get("_admin").get("helm_charts_added") or {}
913 )
914 # elements that must be deleted
915 deleted_repo_list = []
916 added_repo_dict = {}
917 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
918 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
919
920 # obtain repos to add: registered by nbi but not added
921 repos_to_add = [
922 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
923 ]
924
925 # obtain repos to delete: added by cluster but not in nbi list
926 repos_to_delete = [
927 repo
928 for repo in cluster_repo_dict.keys()
929 if repo not in nbi_repo_list
930 ]
931
932 # delete repos: must delete first then add because there may be
933 # different repos with same name but
934 # different id and url
935 self.log.debug("repos to delete: {}".format(repos_to_delete))
936 for repo_id in repos_to_delete:
937 # try to delete repos
938 try:
939 repo_delete_task = asyncio.ensure_future(
940 self.repo_remove(
941 cluster_uuid=cluster_uuid,
942 name=cluster_repo_dict[repo_id],
943 )
944 )
945 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
946 except Exception as e:
947 self.warning(
948 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
949 repo_id, cluster_repo_dict[repo_id], str(e)
950 )
951 )
952 # always add to the list of to_delete if there is an error
953 # because if is not there
954 # deleting raises error
955 deleted_repo_list.append(repo_id)
956
957 # add repos
958 self.log.debug("repos to add: {}".format(repos_to_add))
959 for repo_id in repos_to_add:
960 # obtain the repo data from the db
961 # if there is an error getting the repo in the database we will
962 # ignore this repo and continue
963 # because there is a possible race condition where the repo has
964 # been deleted while processing
965 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
966 self.log.debug(
967 "obtained repo: id, {}, name: {}, url: {}".format(
968 repo_id, db_repo["name"], db_repo["url"]
969 )
970 )
971 try:
972 repo_add_task = asyncio.ensure_future(
973 self.repo_add(
974 cluster_uuid=cluster_uuid,
975 name=db_repo["name"],
976 url=db_repo["url"],
977 repo_type="chart",
978 )
979 )
980 await asyncio.wait_for(repo_add_task, update_repos_timeout)
981 added_repo_dict[repo_id] = db_repo["name"]
982 self.log.debug(
983 "added repo: id, {}, name: {}".format(
984 repo_id, db_repo["name"]
985 )
986 )
987 except Exception as e:
988 # deal with error adding repo, adding a repo that already
989 # exists does not raise any error
990 # will not raise error because a wrong repos added by
991 # anyone could prevent instantiating any ns
992 self.log.error(
993 "Error adding repo id: {}, err_msg: {} ".format(
994 repo_id, repr(e)
995 )
996 )
997
998 return deleted_repo_list, added_repo_dict
999
1000 else: # else db_k8scluster does not exist
1001 raise K8sException(
1002 "k8cluster with helm-id : {} not found".format(cluster_uuid)
1003 )
1004
1005 except Exception as e:
1006 self.log.error("Error synchronizing repos: {}".format(str(e)))
1007 raise K8sException("Error synchronizing repos")
1008
1009 """
1010 ####################################################################################
1011 ################################### P R I V A T E ##################################
1012 ####################################################################################
1013 """
1014
1015 async def _exec_inspect_comand(
1016 self, inspect_command: str, kdu_model: str, repo_url: str = None
1017 ):
1018
1019 repo_str = ""
1020 if repo_url:
1021 repo_str = " --repo {}".format(repo_url)
1022 idx = kdu_model.find("/")
1023 if idx >= 0:
1024 idx += 1
1025 kdu_model = kdu_model[idx:]
1026
1027 inspect_command = "{} inspect {} {}{}".format(
1028 self._helm_command, inspect_command, kdu_model, repo_str
1029 )
1030 output, _rc = await self._local_async_exec(
1031 command=inspect_command, encode_utf8=True
1032 )
1033
1034 return output
1035
1036 async def _status_kdu(
1037 self,
1038 cluster_id: str,
1039 kdu_instance: str,
1040 show_error_log: bool = False,
1041 return_text: bool = False,
1042 ):
1043
1044 self.log.debug("status of kdu_instance {}".format(kdu_instance))
1045
1046 # config filename
1047 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
1048 cluster_name=cluster_id, create_if_not_exist=True
1049 )
1050
1051 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
1052 self._helm_command, config_filename, helm_dir, kdu_instance
1053 )
1054
1055 output, rc = await self._local_async_exec(
1056 command=command,
1057 raise_exception_on_error=True,
1058 show_error_log=show_error_log,
1059 )
1060
1061 if return_text:
1062 return str(output)
1063
1064 if rc != 0:
1065 return None
1066
1067 data = yaml.load(output, Loader=yaml.SafeLoader)
1068
1069 # remove field 'notes'
1070 try:
1071 del data.get("info").get("status")["notes"]
1072 except KeyError:
1073 pass
1074
1075 # parse field 'resources'
1076 try:
1077 resources = str(data.get("info").get("status").get("resources"))
1078 resource_table = self._output_to_table(resources)
1079 data.get("info").get("status")["resources"] = resource_table
1080 except Exception:
1081 pass
1082
1083 return data
1084
1085 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1086 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1087 for instance in instances:
1088 if instance.get("Name") == kdu_instance:
1089 return instance
1090 self.log.debug("Instance {} not found".format(kdu_instance))
1091 return None
1092
1093 @staticmethod
1094 def _generate_release_name(chart_name: str):
1095 # check embeded chart (file or dir)
1096 if chart_name.startswith("/"):
1097 # extract file or directory name
1098 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1099 # check URL
1100 elif "://" in chart_name:
1101 # extract last portion of URL
1102 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1103
1104 name = ""
1105 for c in chart_name:
1106 if c.isalpha() or c.isnumeric():
1107 name += c
1108 else:
1109 name += "-"
1110 if len(name) > 35:
1111 name = name[0:35]
1112
1113 # if does not start with alpha character, prefix 'a'
1114 if not name[0].isalpha():
1115 name = "a" + name
1116
1117 name += "-"
1118
1119 def get_random_number():
1120 r = random.randrange(start=1, stop=99999999)
1121 s = str(r)
1122 s = s.rjust(10, "0")
1123 return s
1124
1125 name = name + get_random_number()
1126 return name.lower()
1127
1128 async def _store_status(
1129 self,
1130 cluster_id: str,
1131 operation: str,
1132 kdu_instance: str,
1133 check_every: float = 10,
1134 db_dict: dict = None,
1135 run_once: bool = False,
1136 ):
1137 while True:
1138 try:
1139 await asyncio.sleep(check_every)
1140 detailed_status = await self._status_kdu(
1141 cluster_id=cluster_id, kdu_instance=kdu_instance,
1142 return_text=False
1143 )
1144 status = detailed_status.get("info").get("Description")
1145 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1146 # write status to db
1147 result = await self.write_app_status_to_db(
1148 db_dict=db_dict,
1149 status=str(status),
1150 detailed_status=str(detailed_status),
1151 operation=operation,
1152 )
1153 if not result:
1154 self.log.info("Error writing in database. Task exiting...")
1155 return
1156 except asyncio.CancelledError:
1157 self.log.debug("Task cancelled")
1158 return
1159 except Exception as e:
1160 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1161 pass
1162 finally:
1163 if run_once:
1164 return
1165
1166 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
1167
1168 status = await self._status_kdu(
1169 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
1170 )
1171
1172 # extract info.status.resources-> str
1173 # format:
1174 # ==> v1/Deployment
1175 # NAME READY UP-TO-DATE AVAILABLE AGE
1176 # halting-horse-mongodb 0/1 1 0 0s
1177 # halting-petit-mongodb 1/1 1 0 0s
1178 # blank line
1179 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1180
1181 # convert to table
1182 resources = K8sHelmConnector._output_to_table(resources)
1183
1184 num_lines = len(resources)
1185 index = 0
1186 while index < num_lines:
1187 try:
1188 line1 = resources[index]
1189 index += 1
1190 # find '==>' in column 0
1191 if line1[0] == "==>":
1192 line2 = resources[index]
1193 index += 1
1194 # find READY in column 1
1195 if line2[1] == "READY":
1196 # read next lines
1197 line3 = resources[index]
1198 index += 1
1199 while len(line3) > 1 and index < num_lines:
1200 ready_value = line3[1]
1201 parts = ready_value.split(sep="/")
1202 current = int(parts[0])
1203 total = int(parts[1])
1204 if current < total:
1205 self.log.debug("NOT READY:\n {}".format(line3))
1206 ready = False
1207 line3 = resources[index]
1208 index += 1
1209
1210 except Exception:
1211 pass
1212
1213 return ready
1214
1215 def _parse_helm_status_service_info(self, status):
1216
1217 # extract info.status.resources-> str
1218 # format:
1219 # ==> v1/Deployment
1220 # NAME READY UP-TO-DATE AVAILABLE AGE
1221 # halting-horse-mongodb 0/1 1 0 0s
1222 # halting-petit-mongodb 1/1 1 0 0s
1223 # blank line
1224 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1225
1226 service_list = []
1227 first_line_skipped = service_found = False
1228 for line in resources:
1229 if not service_found:
1230 if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
1231 service_found = True
1232 continue
1233 else:
1234 if len(line) >= 2 and line[0] == "==>":
1235 service_found = first_line_skipped = False
1236 continue
1237 if not line:
1238 continue
1239 if not first_line_skipped:
1240 first_line_skipped = True
1241 continue
1242 service_list.append(line[0])
1243
1244 return service_list
1245
1246 @staticmethod
1247 def _get_deep(dictionary: dict, members: tuple):
1248 target = dictionary
1249 value = None
1250 try:
1251 for m in members:
1252 value = target.get(m)
1253 if not value:
1254 return None
1255 else:
1256 target = value
1257 except Exception:
1258 pass
1259 return value
1260
1261 # find key:value in several lines
1262 @staticmethod
1263 def _find_in_lines(p_lines: list, p_key: str) -> str:
1264 for line in p_lines:
1265 try:
1266 if line.startswith(p_key + ":"):
1267 parts = line.split(":")
1268 the_value = parts[1].strip()
1269 return the_value
1270 except Exception:
1271 # ignore it
1272 pass
1273 return None
1274
1275 # params for use in -f file
1276 # returns values file option and filename (in order to delete it at the end)
1277 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1278
1279 if params and len(params) > 0:
1280 self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
1281
1282 def get_random_number():
1283 r = random.randrange(start=1, stop=99999999)
1284 s = str(r)
1285 while len(s) < 10:
1286 s = "0" + s
1287 return s
1288
1289 params2 = dict()
1290 for key in params:
1291 value = params.get(key)
1292 if "!!yaml" in str(value):
1293 value = yaml.load(value[7:])
1294 params2[key] = value
1295
1296 values_file = get_random_number() + ".yaml"
1297 with open(values_file, "w") as stream:
1298 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1299
1300 return "-f {}".format(values_file), values_file
1301
1302 return "", None
1303
1304 # params for use in --set option
1305 @staticmethod
1306 def _params_to_set_option(params: dict) -> str:
1307 params_str = ""
1308 if params and len(params) > 0:
1309 start = True
1310 for key in params:
1311 value = params.get(key, None)
1312 if value is not None:
1313 if start:
1314 params_str += "--set "
1315 start = False
1316 else:
1317 params_str += ","
1318 params_str += "{}={}".format(key, value)
1319 return params_str
1320
1321 @staticmethod
1322 def _output_to_lines(output: str) -> list:
1323 output_lines = list()
1324 lines = output.splitlines(keepends=False)
1325 for line in lines:
1326 line = line.strip()
1327 if len(line) > 0:
1328 output_lines.append(line)
1329 return output_lines
1330
1331 @staticmethod
1332 def _output_to_table(output: str) -> list:
1333 output_table = list()
1334 lines = output.splitlines(keepends=False)
1335 for line in lines:
1336 line = line.replace("\t", " ")
1337 line_list = list()
1338 output_table.append(line_list)
1339 cells = line.split(sep=" ")
1340 for cell in cells:
1341 cell = cell.strip()
1342 if len(cell) > 0:
1343 line_list.append(cell)
1344 return output_table
1345
1346 def _get_paths(
1347 self, cluster_name: str, create_if_not_exist: bool = False
1348 ) -> (str, str, str, str):
1349 """
1350 Returns kube and helm directories
1351
1352 :param cluster_name:
1353 :param create_if_not_exist:
1354 :return: kube, helm directories, config filename and cluster dir.
1355 Raises exception if not exist and cannot create
1356 """
1357
1358 base = self.fs.path
1359 if base.endswith("/") or base.endswith("\\"):
1360 base = base[:-1]
1361
1362 # base dir for cluster
1363 cluster_dir = base + "/" + cluster_name
1364 if create_if_not_exist and not os.path.exists(cluster_dir):
1365 self.log.debug("Creating dir {}".format(cluster_dir))
1366 os.makedirs(cluster_dir)
1367 if not os.path.exists(cluster_dir):
1368 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1369 self.log.error(msg)
1370 raise K8sException(msg)
1371
1372 # kube dir
1373 kube_dir = cluster_dir + "/" + ".kube"
1374 if create_if_not_exist and not os.path.exists(kube_dir):
1375 self.log.debug("Creating dir {}".format(kube_dir))
1376 os.makedirs(kube_dir)
1377 if not os.path.exists(kube_dir):
1378 msg = "Kube config dir {} does not exist".format(kube_dir)
1379 self.log.error(msg)
1380 raise K8sException(msg)
1381
1382 # helm home dir
1383 helm_dir = cluster_dir + "/" + ".helm"
1384 if create_if_not_exist and not os.path.exists(helm_dir):
1385 self.log.debug("Creating dir {}".format(helm_dir))
1386 os.makedirs(helm_dir)
1387 if not os.path.exists(helm_dir):
1388 msg = "Helm config dir {} does not exist".format(helm_dir)
1389 self.log.error(msg)
1390 raise K8sException(msg)
1391
1392 config_filename = kube_dir + "/config"
1393 return kube_dir, helm_dir, config_filename, cluster_dir
1394
1395 @staticmethod
1396 def _remove_multiple_spaces(strobj):
1397 strobj = strobj.strip()
1398 while " " in strobj:
1399 strobj = strobj.replace(" ", " ")
1400 return strobj
1401
1402 def _local_exec(self, command: str) -> (str, int):
1403 command = K8sHelmConnector._remove_multiple_spaces(command)
1404 self.log.debug("Executing sync local command: {}".format(command))
1405 # raise exception if fails
1406 output = ""
1407 try:
1408 output = subprocess.check_output(
1409 command, shell=True, universal_newlines=True
1410 )
1411 return_code = 0
1412 self.log.debug(output)
1413 except Exception:
1414 return_code = 1
1415
1416 return output, return_code
1417
1418 async def _local_async_exec(
1419 self,
1420 command: str,
1421 raise_exception_on_error: bool = False,
1422 show_error_log: bool = True,
1423 encode_utf8: bool = False,
1424 ) -> (str, int):
1425
1426 command = K8sHelmConnector._remove_multiple_spaces(command)
1427 self.log.debug("Executing async local command: {}".format(command))
1428
1429 # split command
1430 command = command.split(sep=" ")
1431
1432 try:
1433 process = await asyncio.create_subprocess_exec(
1434 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1435 )
1436
1437 # wait for command terminate
1438 stdout, stderr = await process.communicate()
1439
1440 return_code = process.returncode
1441
1442 output = ""
1443 if stdout:
1444 output = stdout.decode("utf-8").strip()
1445 # output = stdout.decode()
1446 if stderr:
1447 output = stderr.decode("utf-8").strip()
1448 # output = stderr.decode()
1449
1450 if return_code != 0 and show_error_log:
1451 self.log.debug(
1452 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1453 )
1454 else:
1455 self.log.debug("Return code: {}".format(return_code))
1456
1457 if raise_exception_on_error and return_code != 0:
1458 raise K8sException(output)
1459
1460 if encode_utf8:
1461 output = output.encode("utf-8").strip()
1462 output = str(output).replace("\\n", "\n")
1463
1464 return output, return_code
1465
1466 except asyncio.CancelledError:
1467 raise
1468 except K8sException:
1469 raise
1470 except Exception as e:
1471 msg = "Exception executing command: {} -> {}".format(command, e)
1472 self.log.error(msg)
1473 if raise_exception_on_error:
1474 raise K8sException(e) from e
1475 else:
1476 return "", -1
1477
1478 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1479 # self.log.debug('Checking if file {} exists...'.format(filename))
1480 if os.path.exists(filename):
1481 return True
1482 else:
1483 msg = "File {} does not exist".format(filename)
1484 if exception_if_not_exists:
1485 # self.log.error(msg)
1486 raise K8sException(msg)