Fix bug 1448: minor fix in JujuModelWatcher.wait_for_model
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
24 from n2vc.k8s_conn import K8sConnector
25 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
26 from .exceptions import MethodNotImplemented
27 from n2vc.utils import base64_to_cacert
28 from n2vc.libjuju import Libjuju
29
30 from kubernetes.client.models import (
31 V1ClusterRole,
32 V1ObjectMeta,
33 V1PolicyRule,
34 V1ServiceAccount,
35 V1ClusterRoleBinding,
36 V1RoleRef,
37 V1Subject,
38 )
39
40 from typing import Dict
41
42 SERVICE_ACCOUNT_TOKEN_KEY = "token"
43 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
44 RBAC_LABEL_KEY_NAME = "rbac-id"
45
46 ADMIN_NAMESPACE = "kube-system"
47 RBAC_STACK_PREFIX = "juju-credential"
48
49
50 def generate_rbac_id():
51 return binascii.hexlify(os.urandom(4)).decode()
52
53
54 class K8sJujuConnector(K8sConnector):
55 def __init__(
56 self,
57 fs: object,
58 db: object,
59 kubectl_command: str = "/usr/bin/kubectl",
60 juju_command: str = "/usr/bin/juju",
61 log: object = None,
62 loop: object = None,
63 on_update_db=None,
64 vca_config: dict = None,
65 ):
66 """
67 :param fs: file system for kubernetes and helm configuration
68 :param db: Database object
69 :param kubectl_command: path to kubectl executable
70 :param helm_command: path to helm executable
71 :param log: logger
72 :param: loop: Asyncio loop
73 """
74
75 # parent class
76 K8sConnector.__init__(
77 self,
78 db,
79 log=log,
80 on_update_db=on_update_db,
81 )
82
83 self.fs = fs
84 self.loop = loop or asyncio.get_event_loop()
85 self.log.debug("Initializing K8S Juju connector")
86
87 required_vca_config = [
88 "host",
89 "user",
90 "secret",
91 "ca_cert",
92 ]
93 if not vca_config or not all(k in vca_config for k in required_vca_config):
94 raise N2VCBadArgumentsException(
95 message="Missing arguments in vca_config: {}".format(vca_config),
96 bad_args=required_vca_config,
97 )
98 port = vca_config["port"] if "port" in vca_config else 17070
99 url = "{}:{}".format(vca_config["host"], port)
100 enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
101 apt_mirror = vca_config.get("apt_mirror", None)
102 username = vca_config["user"]
103 secret = vca_config["secret"]
104 ca_cert = base64_to_cacert(vca_config["ca_cert"])
105
106 self.libjuju = Libjuju(
107 endpoint=url,
108 api_proxy=None, # Not needed for k8s charms
109 enable_os_upgrade=enable_os_upgrade,
110 apt_mirror=apt_mirror,
111 username=username,
112 password=secret,
113 cacert=ca_cert,
114 loop=self.loop,
115 log=self.log,
116 db=self.db,
117 )
118 self.log.debug("K8S Juju connector initialized")
119 # TODO: Remove these commented lines:
120 # self.authenticated = False
121 # self.models = {}
122 # self.juju_secret = ""
123
124 """Initialization"""
125
126 async def init_env(
127 self,
128 k8s_creds: str,
129 namespace: str = "kube-system",
130 reuse_cluster_uuid: str = None,
131 ) -> (str, bool):
132 """
133 It prepares a given K8s cluster environment to run Juju bundles.
134
135 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
136 '.kube/config'
137 :param namespace: optional namespace to be used for juju. By default,
138 'kube-system' will be used
139 :param reuse_cluster_uuid: existing cluster uuid for reuse
140 :return: uuid of the K8s cluster and True if connector has installed some
141 software in the cluster
142 (on error, an exception will be raised)
143 """
144
145 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
146
147 kubecfg = tempfile.NamedTemporaryFile()
148 with open(kubecfg.name, "w") as kubecfg_file:
149 kubecfg_file.write(k8s_creds)
150 kubectl = Kubectl(config_file=kubecfg.name)
151
152 # CREATING RESOURCES IN K8S
153 rbac_id = generate_rbac_id()
154 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
155 labels = {RBAC_STACK_PREFIX: rbac_id}
156
157 # Create cleanup dictionary to clean up created resources
158 # if it fails in the middle of the process
159 cleanup_data = []
160 try:
161 self._create_cluster_role(
162 kubectl,
163 name=metadata_name,
164 labels=labels,
165 )
166 cleanup_data.append(
167 {
168 "delete": self._delete_cluster_role,
169 "args": (kubectl, metadata_name),
170 }
171 )
172
173 self._create_service_account(
174 kubectl,
175 name=metadata_name,
176 labels=labels,
177 )
178 cleanup_data.append(
179 {
180 "delete": self._delete_service_account,
181 "args": (kubectl, metadata_name),
182 }
183 )
184
185 self._create_cluster_role_binding(
186 kubectl,
187 name=metadata_name,
188 labels=labels,
189 )
190 cleanup_data.append(
191 {
192 "delete": self._delete_service_account,
193 "args": (kubectl, metadata_name),
194 }
195 )
196 token, client_cert_data = await self._get_secret_data(
197 kubectl,
198 metadata_name,
199 )
200
201 default_storage_class = kubectl.get_default_storage_class()
202 await self.libjuju.add_k8s(
203 name=cluster_uuid,
204 rbac_id=rbac_id,
205 token=token,
206 client_cert_data=client_cert_data,
207 configuration=kubectl.configuration,
208 storage_class=default_storage_class,
209 credential_name=self._get_credential_name(cluster_uuid),
210 )
211 return cluster_uuid, True
212 except Exception as e:
213 self.log.error("Error initializing k8scluster: {}".format(e))
214 if len(cleanup_data) > 0:
215 self.log.debug("Cleaning up created resources in k8s cluster...")
216 for item in cleanup_data:
217 delete_function = item["delete"]
218 delete_args = item["args"]
219 delete_function(*delete_args)
220 self.log.debug("Cleanup finished")
221 raise e
222
223 """Repo Management"""
224
225 async def repo_add(
226 self,
227 name: str,
228 url: str,
229 _type: str = "charm",
230 ):
231 raise MethodNotImplemented()
232
233 async def repo_list(self):
234 raise MethodNotImplemented()
235
236 async def repo_remove(
237 self,
238 name: str,
239 ):
240 raise MethodNotImplemented()
241
242 async def synchronize_repos(self, cluster_uuid: str, name: str):
243 """
244 Returns None as currently add_repo is not implemented
245 """
246 return None
247
248 """Reset"""
249
250 async def reset(
251 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
252 ) -> bool:
253 """Reset a cluster
254
255 Resets the Kubernetes cluster by removing the model that represents it.
256
257 :param cluster_uuid str: The UUID of the cluster to reset
258 :return: Returns True if successful or raises an exception.
259 """
260
261 try:
262 self.log.debug("[reset] Removing k8s cloud")
263
264 cloud_creds = await self.libjuju.get_cloud_credentials(
265 cluster_uuid,
266 self._get_credential_name(cluster_uuid),
267 )
268
269 await self.libjuju.remove_cloud(cluster_uuid)
270
271 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
272
273 kubecfg_file = tempfile.NamedTemporaryFile()
274 with open(kubecfg_file.name, "w") as f:
275 f.write(kubecfg)
276 kubectl = Kubectl(config_file=kubecfg_file.name)
277
278 delete_functions = [
279 self._delete_cluster_role_binding,
280 self._delete_service_account,
281 self._delete_cluster_role,
282 ]
283
284 credential_attrs = cloud_creds[0].result["attrs"]
285 if RBAC_LABEL_KEY_NAME in credential_attrs:
286 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
287 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
288 delete_args = (kubectl, metadata_name)
289 for delete_func in delete_functions:
290 try:
291 delete_func(*delete_args)
292 except Exception as e:
293 self.log.warning("Cannot remove resource in K8s {}".format(e))
294
295 except Exception as e:
296 self.log.debug("Caught exception during reset: {}".format(e))
297 raise e
298 return True
299
300 """Deployment"""
301
302 async def install(
303 self,
304 cluster_uuid: str,
305 kdu_model: str,
306 kdu_instance: str,
307 atomic: bool = True,
308 timeout: float = 1800,
309 params: dict = None,
310 db_dict: dict = None,
311 kdu_name: str = None,
312 namespace: str = None,
313 ) -> bool:
314 """Install a bundle
315
316 :param cluster_uuid str: The UUID of the cluster to install to
317 :param kdu_model str: The name or path of a bundle to install
318 :param kdu_instance: Kdu instance name
319 :param atomic bool: If set, waits until the model is active and resets
320 the cluster on failure.
321 :param timeout int: The time, in seconds, to wait for the install
322 to finish
323 :param params dict: Key-value pairs of instantiation parameters
324 :param kdu_name: Name of the KDU instance to be installed
325 :param namespace: K8s namespace to use for the KDU instance
326
327 :return: If successful, returns ?
328 """
329 bundle = kdu_model
330
331 if not db_dict:
332 raise K8sException("db_dict must be set")
333 if not bundle:
334 raise K8sException("bundle must be set")
335
336 if bundle.startswith("cs:"):
337 pass
338 elif bundle.startswith("http"):
339 # Download the file
340 pass
341 else:
342 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
343 os.chdir(new_workdir)
344 bundle = "local:{}".format(kdu_model)
345
346 self.log.debug("Checking for model named {}".format(kdu_instance))
347
348 # Create the new model
349 self.log.debug("Adding model: {}".format(kdu_instance))
350 await self.libjuju.add_model(
351 model_name=kdu_instance,
352 cloud_name=cluster_uuid,
353 credential_name=self._get_credential_name(cluster_uuid),
354 )
355
356 # if model:
357 # TODO: Instantiation parameters
358
359 """
360 "Juju bundle that models the KDU, in any of the following ways:
361 - <juju-repo>/<juju-bundle>
362 - <juju-bundle folder under k8s_models folder in the package>
363 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
364 in the package>
365 - <URL_where_to_fetch_juju_bundle>
366 """
367 try:
368 previous_workdir = os.getcwd()
369 except FileNotFoundError:
370 previous_workdir = "/app/storage"
371
372 self.log.debug("[install] deploying {}".format(bundle))
373 await self.libjuju.deploy(
374 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
375 )
376 os.chdir(previous_workdir)
377 return True
378
379 async def instances_list(self, cluster_uuid: str) -> list:
380 """
381 returns a list of deployed releases in a cluster
382
383 :param cluster_uuid: the cluster
384 :return:
385 """
386 return []
387
388 async def upgrade(
389 self,
390 cluster_uuid: str,
391 kdu_instance: str,
392 kdu_model: str = None,
393 params: dict = None,
394 ) -> str:
395 """Upgrade a model
396
397 :param cluster_uuid str: The UUID of the cluster to upgrade
398 :param kdu_instance str: The unique name of the KDU instance
399 :param kdu_model str: The name or path of the bundle to upgrade to
400 :param params dict: Key-value pairs of instantiation parameters
401
402 :return: If successful, reference to the new revision number of the
403 KDU instance.
404 """
405
406 # TODO: Loop through the bundle and upgrade each charm individually
407
408 """
409 The API doesn't have a concept of bundle upgrades, because there are
410 many possible changes: charm revision, disk, number of units, etc.
411
412 As such, we are only supporting a limited subset of upgrades. We'll
413 upgrade the charm revision but leave storage and scale untouched.
414
415 Scale changes should happen through OSM constructs, and changes to
416 storage would require a redeployment of the service, at least in this
417 initial release.
418 """
419 raise MethodNotImplemented()
420
421 """Rollback"""
422
423 async def rollback(
424 self,
425 cluster_uuid: str,
426 kdu_instance: str,
427 revision: int = 0,
428 ) -> str:
429 """Rollback a model
430
431 :param cluster_uuid str: The UUID of the cluster to rollback
432 :param kdu_instance str: The unique name of the KDU instance
433 :param revision int: The revision to revert to. If omitted, rolls back
434 the previous upgrade.
435
436 :return: If successful, returns the revision of active KDU instance,
437 or raises an exception
438 """
439 raise MethodNotImplemented()
440
441 """Deletion"""
442
443 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
444 """Uninstall a KDU instance
445
446 :param cluster_uuid str: The UUID of the cluster
447 :param kdu_instance str: The unique name of the KDU instance
448
449 :return: Returns True if successful, or raises an exception
450 """
451
452 self.log.debug("[uninstall] Destroying model")
453
454 await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
455
456 # self.log.debug("[uninstall] Model destroyed and disconnecting")
457 # await controller.disconnect()
458
459 return True
460 # TODO: Remove these commented lines
461 # if not self.authenticated:
462 # self.log.debug("[uninstall] Connecting to controller")
463 # await self.login(cluster_uuid)
464
465 async def exec_primitive(
466 self,
467 cluster_uuid: str = None,
468 kdu_instance: str = None,
469 primitive_name: str = None,
470 timeout: float = 300,
471 params: dict = None,
472 db_dict: dict = None,
473 ) -> str:
474 """Exec primitive (Juju action)
475
476 :param cluster_uuid str: The UUID of the cluster
477 :param kdu_instance str: The unique name of the KDU instance
478 :param primitive_name: Name of action that will be executed
479 :param timeout: Timeout for action execution
480 :param params: Dictionary of all the parameters needed for the action
481 :db_dict: Dictionary for any additional data
482
483 :return: Returns the output of the action
484 """
485
486 if not params or "application-name" not in params:
487 raise K8sException(
488 "Missing application-name argument, \
489 argument needed for K8s actions"
490 )
491 try:
492 self.log.debug(
493 "[exec_primitive] Getting model "
494 "kdu_instance: {}".format(kdu_instance)
495 )
496 application_name = params["application-name"]
497 actions = await self.libjuju.get_actions(application_name, kdu_instance)
498 if primitive_name not in actions:
499 raise K8sException("Primitive {} not found".format(primitive_name))
500 output, status = await self.libjuju.execute_action(
501 application_name, kdu_instance, primitive_name, **params
502 )
503
504 if status != "completed":
505 raise K8sException(
506 "status is not completed: {} output: {}".format(status, output)
507 )
508
509 return output
510
511 except Exception as e:
512 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
513 self.log.error(error_msg)
514 raise K8sException(message=error_msg)
515
516 """Introspection"""
517
518 async def inspect_kdu(
519 self,
520 kdu_model: str,
521 ) -> dict:
522 """Inspect a KDU
523
524 Inspects a bundle and returns a dictionary of config parameters and
525 their default values.
526
527 :param kdu_model str: The name or path of the bundle to inspect.
528
529 :return: If successful, returns a dictionary of available parameters
530 and their default values.
531 """
532
533 kdu = {}
534 if not os.path.exists(kdu_model):
535 raise K8sException("file {} not found".format(kdu_model))
536
537 with open(kdu_model, "r") as f:
538 bundle = yaml.safe_load(f.read())
539
540 """
541 {
542 'description': 'Test bundle',
543 'bundle': 'kubernetes',
544 'applications': {
545 'mariadb-k8s': {
546 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
547 'scale': 1,
548 'options': {
549 'password': 'manopw',
550 'root_password': 'osm4u',
551 'user': 'mano'
552 },
553 'series': 'kubernetes'
554 }
555 }
556 }
557 """
558 # TODO: This should be returned in an agreed-upon format
559 kdu = bundle["applications"]
560
561 return kdu
562
563 async def help_kdu(
564 self,
565 kdu_model: str,
566 ) -> str:
567 """View the README
568
569 If available, returns the README of the bundle.
570
571 :param kdu_model str: The name or path of a bundle
572
573 :return: If found, returns the contents of the README.
574 """
575 readme = None
576
577 files = ["README", "README.txt", "README.md"]
578 path = os.path.dirname(kdu_model)
579 for file in os.listdir(path):
580 if file in files:
581 with open(file, "r") as f:
582 readme = f.read()
583 break
584
585 return readme
586
587 async def status_kdu(
588 self,
589 cluster_uuid: str,
590 kdu_instance: str,
591 ) -> dict:
592 """Get the status of the KDU
593
594 Get the current status of the KDU instance.
595
596 :param cluster_uuid str: The UUID of the cluster
597 :param kdu_instance str: The unique id of the KDU instance
598
599 :return: Returns a dictionary containing namespace, state, resources,
600 and deployment_time.
601 """
602 status = {}
603 model_status = await self.libjuju.get_model_status(kdu_instance)
604 for name in model_status.applications:
605 application = model_status.applications[name]
606 status[name] = {"status": application["status"]["status"]}
607
608 return status
609
610 async def get_services(
611 self, cluster_uuid: str, kdu_instance: str, namespace: str
612 ) -> list:
613 """Return a list of services of a kdu_instance"""
614
615 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
616
617 kubecfg = tempfile.NamedTemporaryFile()
618 with open(kubecfg.name, "w") as kubecfg_file:
619 kubecfg_file.write(credentials)
620 kubectl = Kubectl(config_file=kubecfg.name)
621
622 return kubectl.get_services(
623 field_selector="metadata.namespace={}".format(kdu_instance)
624 )
625
626 async def get_service(
627 self, cluster_uuid: str, service_name: str, namespace: str
628 ) -> object:
629 """Return data for a specific service inside a namespace"""
630
631 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
632
633 kubecfg = tempfile.NamedTemporaryFile()
634 with open(kubecfg.name, "w") as kubecfg_file:
635 kubecfg_file.write(credentials)
636 kubectl = Kubectl(config_file=kubecfg.name)
637
638 return kubectl.get_services(
639 field_selector="metadata.name={},metadata.namespace={}".format(
640 service_name, namespace
641 )
642 )[0]
643
644 def get_credentials(self, cluster_uuid: str) -> str:
645 """
646 Get Cluster Kubeconfig
647 """
648 k8scluster = self.db.get_one(
649 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
650 )
651
652 self.db.encrypt_decrypt_fields(
653 k8scluster.get("credentials"),
654 "decrypt",
655 ["password", "secret"],
656 schema_version=k8scluster["schema_version"],
657 salt=k8scluster["_id"],
658 )
659
660 return yaml.safe_dump(k8scluster.get("credentials"))
661
662 def _get_credential_name(self, cluster_uuid: str) -> str:
663 """
664 Get credential name for a k8s cloud
665
666 We cannot use the cluster_uuid for the credential name directly,
667 because it cannot start with a number, it must start with a letter.
668 Therefore, the k8s cloud credential name will be "cred-" followed
669 by the cluster uuid.
670
671 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
672
673 :return: Name to use for the credential name.
674 """
675 return "cred-{}".format(cluster_uuid)
676
677 def get_namespace(
678 self,
679 cluster_uuid: str,
680 ) -> str:
681 """Get the namespace UUID
682 Gets the namespace's unique name
683
684 :param cluster_uuid str: The UUID of the cluster
685 :returns: The namespace UUID, or raises an exception
686 """
687 pass
688
689 def _create_cluster_role(
690 self,
691 kubectl: Kubectl,
692 name: str,
693 labels: Dict[str, str],
694 ):
695 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
696 field_selector="metadata.name={}".format(name)
697 )
698
699 if len(cluster_roles.items) > 0:
700 raise Exception(
701 "Cluster role with metadata.name={} already exists".format(name)
702 )
703
704 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
705 # Cluster role
706 cluster_role = V1ClusterRole(
707 metadata=metadata,
708 rules=[
709 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
710 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
711 ],
712 )
713
714 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
715
716 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
717 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
718
719 def _create_service_account(
720 self,
721 kubectl: Kubectl,
722 name: str,
723 labels: Dict[str, str],
724 ):
725 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
726 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
727 )
728 if len(service_accounts.items) > 0:
729 raise Exception(
730 "Service account with metadata.name={} already exists".format(name)
731 )
732
733 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
734 service_account = V1ServiceAccount(metadata=metadata)
735
736 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
737 ADMIN_NAMESPACE, service_account
738 )
739
740 def _delete_service_account(self, kubectl: Kubectl, name: str):
741 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
742 name, ADMIN_NAMESPACE
743 )
744
745 def _create_cluster_role_binding(
746 self,
747 kubectl: Kubectl,
748 name: str,
749 labels: Dict[str, str],
750 ):
751 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
752 field_selector="metadata.name={}".format(name)
753 )
754 if len(role_bindings.items) > 0:
755 raise Exception("Generated rbac id already exists")
756
757 role_binding = V1ClusterRoleBinding(
758 metadata=V1ObjectMeta(name=name, labels=labels),
759 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
760 subjects=[
761 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
762 ],
763 )
764 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
765
766 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
767 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
768
769 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
770 v1_core = kubectl.clients[CORE_CLIENT]
771
772 retries_limit = 10
773 secret_name = None
774 while True:
775 retries_limit -= 1
776 service_accounts = v1_core.list_namespaced_service_account(
777 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
778 )
779 if len(service_accounts.items) == 0:
780 raise Exception(
781 "Service account not found with metadata.name={}".format(name)
782 )
783 service_account = service_accounts.items[0]
784 if service_account.secrets and len(service_account.secrets) > 0:
785 secret_name = service_account.secrets[0].name
786 if secret_name is not None or not retries_limit:
787 break
788 if not secret_name:
789 raise Exception(
790 "Failed getting the secret from service account {}".format(name)
791 )
792 secret = v1_core.list_namespaced_secret(
793 ADMIN_NAMESPACE,
794 field_selector="metadata.name={}".format(secret_name),
795 ).items[0]
796
797 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
798 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
799
800 return (
801 base64.b64decode(token).decode("utf-8"),
802 base64.b64decode(client_certificate_data).decode("utf-8"),
803 )
804
805 @staticmethod
806 def generate_kdu_instance_name(**kwargs):
807 db_dict = kwargs.get("db_dict")
808 kdu_name = kwargs.get("kdu_name", None)
809 if kdu_name:
810 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
811 else:
812 kdu_instance = db_dict["filter"]["_id"]
813 return kdu_instance