Fix flake8 minor issues
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
24 from n2vc.k8s_conn import K8sConnector
25 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
26 from .exceptions import MethodNotImplemented
27 from n2vc.utils import base64_to_cacert
28 from n2vc.libjuju import Libjuju
29
30 from kubernetes.client.models import (
31 V1ClusterRole,
32 V1ObjectMeta,
33 V1PolicyRule,
34 V1ServiceAccount,
35 V1ClusterRoleBinding,
36 V1RoleRef,
37 V1Subject,
38 )
39
40 from typing import Dict
41
42 SERVICE_ACCOUNT_TOKEN_KEY = "token"
43 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
44 RBAC_LABEL_KEY_NAME = "rbac-id"
45
46 ADMIN_NAMESPACE = "kube-system"
47 RBAC_STACK_PREFIX = "juju-credential"
48
49
50 def generate_rbac_id():
51 return binascii.hexlify(os.urandom(4)).decode()
52
53
54 class K8sJujuConnector(K8sConnector):
55 def __init__(
56 self,
57 fs: object,
58 db: object,
59 kubectl_command: str = "/usr/bin/kubectl",
60 juju_command: str = "/usr/bin/juju",
61 log: object = None,
62 loop: object = None,
63 on_update_db=None,
64 vca_config: dict = None,
65 ):
66 """
67 :param fs: file system for kubernetes and helm configuration
68 :param db: Database object
69 :param kubectl_command: path to kubectl executable
70 :param helm_command: path to helm executable
71 :param log: logger
72 :param: loop: Asyncio loop
73 """
74
75 # parent class
76 K8sConnector.__init__(
77 self,
78 db,
79 log=log,
80 on_update_db=on_update_db,
81 )
82
83 self.fs = fs
84 self.loop = loop or asyncio.get_event_loop()
85 self.log.debug("Initializing K8S Juju connector")
86
87 required_vca_config = [
88 "host",
89 "user",
90 "secret",
91 "ca_cert",
92 ]
93 if not vca_config or not all(k in vca_config for k in required_vca_config):
94 raise N2VCBadArgumentsException(
95 message="Missing arguments in vca_config: {}".format(vca_config),
96 bad_args=required_vca_config,
97 )
98 port = vca_config["port"] if "port" in vca_config else 17070
99 url = "{}:{}".format(vca_config["host"], port)
100 enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
101 apt_mirror = vca_config.get("apt_mirror", None)
102 username = vca_config["user"]
103 secret = vca_config["secret"]
104 ca_cert = base64_to_cacert(vca_config["ca_cert"])
105
106 self.libjuju = Libjuju(
107 endpoint=url,
108 api_proxy=None, # Not needed for k8s charms
109 enable_os_upgrade=enable_os_upgrade,
110 apt_mirror=apt_mirror,
111 username=username,
112 password=secret,
113 cacert=ca_cert,
114 loop=self.loop,
115 log=self.log,
116 db=self.db,
117 )
118 self.log.debug("K8S Juju connector initialized")
119 # TODO: Remove these commented lines:
120 # self.authenticated = False
121 # self.models = {}
122 # self.juju_secret = ""
123
124 """Initialization"""
125
126 async def init_env(
127 self,
128 k8s_creds: str,
129 namespace: str = "kube-system",
130 reuse_cluster_uuid: str = None,
131 ) -> (str, bool):
132 """
133 It prepares a given K8s cluster environment to run Juju bundles.
134
135 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
136 '.kube/config'
137 :param namespace: optional namespace to be used for juju. By default,
138 'kube-system' will be used
139 :param reuse_cluster_uuid: existing cluster uuid for reuse
140 :return: uuid of the K8s cluster and True if connector has installed some
141 software in the cluster
142 (on error, an exception will be raised)
143 """
144
145 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
146
147 kubecfg = tempfile.NamedTemporaryFile()
148 with open(kubecfg.name, "w") as kubecfg_file:
149 kubecfg_file.write(k8s_creds)
150 kubectl = Kubectl(config_file=kubecfg.name)
151
152 # CREATING RESOURCES IN K8S
153 rbac_id = generate_rbac_id()
154 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
155 labels = {RBAC_STACK_PREFIX: rbac_id}
156
157 # Create cleanup dictionary to clean up created resources
158 # if it fails in the middle of the process
159 cleanup_data = []
160 try:
161 self._create_cluster_role(
162 kubectl,
163 name=metadata_name,
164 labels=labels,
165 )
166 cleanup_data.append(
167 {
168 "delete": self._delete_cluster_role,
169 "args": (kubectl, metadata_name),
170 }
171 )
172
173 self._create_service_account(
174 kubectl,
175 name=metadata_name,
176 labels=labels,
177 )
178 cleanup_data.append(
179 {
180 "delete": self._delete_service_account,
181 "args": (kubectl, metadata_name),
182 }
183 )
184
185 self._create_cluster_role_binding(
186 kubectl,
187 name=metadata_name,
188 labels=labels,
189 )
190 cleanup_data.append(
191 {
192 "delete": self._delete_service_account,
193 "args": (kubectl, metadata_name),
194 }
195 )
196 token, client_cert_data = await self._get_secret_data(
197 kubectl,
198 metadata_name,
199 )
200
201 default_storage_class = kubectl.get_default_storage_class()
202 await self.libjuju.add_k8s(
203 name=cluster_uuid,
204 rbac_id=rbac_id,
205 token=token,
206 client_cert_data=client_cert_data,
207 configuration=kubectl.configuration,
208 storage_class=default_storage_class,
209 credential_name=self._get_credential_name(cluster_uuid),
210 )
211 return cluster_uuid, True
212 except Exception as e:
213 self.log.error("Error initializing k8scluster: {}".format(e))
214 if len(cleanup_data) > 0:
215 self.log.debug("Cleaning up created resources in k8s cluster...")
216 for item in cleanup_data:
217 delete_function = item["delete"]
218 delete_args = item["args"]
219 delete_function(*delete_args)
220 self.log.debug("Cleanup finished")
221 raise e
222
223 """Repo Management"""
224
225 async def repo_add(
226 self,
227 name: str,
228 url: str,
229 _type: str = "charm",
230 ):
231 raise MethodNotImplemented()
232
233 async def repo_list(self):
234 raise MethodNotImplemented()
235
236 async def repo_remove(
237 self,
238 name: str,
239 ):
240 raise MethodNotImplemented()
241
242 async def synchronize_repos(self, cluster_uuid: str, name: str):
243 """
244 Returns None as currently add_repo is not implemented
245 """
246 return None
247
248 """Reset"""
249
250 async def reset(
251 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
252 ) -> bool:
253 """Reset a cluster
254
255 Resets the Kubernetes cluster by removing the model that represents it.
256
257 :param cluster_uuid str: The UUID of the cluster to reset
258 :return: Returns True if successful or raises an exception.
259 """
260
261 try:
262 self.log.debug("[reset] Removing k8s cloud")
263
264 cloud_creds = await self.libjuju.get_cloud_credentials(
265 cluster_uuid,
266 self._get_credential_name(cluster_uuid),
267 )
268
269 await self.libjuju.remove_cloud(cluster_uuid)
270
271 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
272
273 kubecfg_file = tempfile.NamedTemporaryFile()
274 with open(kubecfg_file.name, "w") as f:
275 f.write(kubecfg)
276 kubectl = Kubectl(config_file=kubecfg_file.name)
277
278 delete_functions = [
279 self._delete_cluster_role_binding,
280 self._delete_service_account,
281 self._delete_cluster_role,
282 ]
283
284 credential_attrs = cloud_creds[0].result["attrs"]
285 if RBAC_LABEL_KEY_NAME in credential_attrs:
286 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
287 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
288 delete_args = (kubectl, metadata_name)
289 for delete_func in delete_functions:
290 try:
291 delete_func(*delete_args)
292 except Exception as e:
293 self.log.warning("Cannot remove resource in K8s {}".format(e))
294
295 except Exception as e:
296 self.log.debug("Caught exception during reset: {}".format(e))
297 raise e
298 return True
299
300 """Deployment"""
301
302 async def install(
303 self,
304 cluster_uuid: str,
305 kdu_model: str,
306 atomic: bool = True,
307 timeout: float = 1800,
308 params: dict = None,
309 db_dict: dict = None,
310 kdu_name: str = None,
311 namespace: str = None,
312 ) -> bool:
313 """Install a bundle
314
315 :param cluster_uuid str: The UUID of the cluster to install to
316 :param kdu_model str: The name or path of a bundle to install
317 :param atomic bool: If set, waits until the model is active and resets
318 the cluster on failure.
319 :param timeout int: The time, in seconds, to wait for the install
320 to finish
321 :param params dict: Key-value pairs of instantiation parameters
322 :param kdu_name: Name of the KDU instance to be installed
323 :param namespace: K8s namespace to use for the KDU instance
324
325 :return: If successful, returns ?
326 """
327 bundle = kdu_model
328
329 if not db_dict:
330 raise K8sException("db_dict must be set")
331 if not bundle:
332 raise K8sException("bundle must be set")
333
334 if bundle.startswith("cs:"):
335 pass
336 elif bundle.startswith("http"):
337 # Download the file
338 pass
339 else:
340 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
341 os.chdir(new_workdir)
342 bundle = "local:{}".format(kdu_model)
343
344 if kdu_name:
345 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
346 else:
347 kdu_instance = db_dict["filter"]["_id"]
348
349 self.log.debug("Checking for model named {}".format(kdu_instance))
350
351 # Create the new model
352 self.log.debug("Adding model: {}".format(kdu_instance))
353 await self.libjuju.add_model(
354 model_name=kdu_instance,
355 cloud_name=cluster_uuid,
356 credential_name=self._get_credential_name(cluster_uuid),
357 )
358
359 # if model:
360 # TODO: Instantiation parameters
361
362 """
363 "Juju bundle that models the KDU, in any of the following ways:
364 - <juju-repo>/<juju-bundle>
365 - <juju-bundle folder under k8s_models folder in the package>
366 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
367 in the package>
368 - <URL_where_to_fetch_juju_bundle>
369 """
370 try:
371 previous_workdir = os.getcwd()
372 except FileNotFoundError:
373 previous_workdir = "/app/storage"
374
375 self.log.debug("[install] deploying {}".format(bundle))
376 await self.libjuju.deploy(
377 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
378 )
379 os.chdir(previous_workdir)
380 return kdu_instance
381
382 async def instances_list(self, cluster_uuid: str) -> list:
383 """
384 returns a list of deployed releases in a cluster
385
386 :param cluster_uuid: the cluster
387 :return:
388 """
389 return []
390
391 async def upgrade(
392 self,
393 cluster_uuid: str,
394 kdu_instance: str,
395 kdu_model: str = None,
396 params: dict = None,
397 ) -> str:
398 """Upgrade a model
399
400 :param cluster_uuid str: The UUID of the cluster to upgrade
401 :param kdu_instance str: The unique name of the KDU instance
402 :param kdu_model str: The name or path of the bundle to upgrade to
403 :param params dict: Key-value pairs of instantiation parameters
404
405 :return: If successful, reference to the new revision number of the
406 KDU instance.
407 """
408
409 # TODO: Loop through the bundle and upgrade each charm individually
410
411 """
412 The API doesn't have a concept of bundle upgrades, because there are
413 many possible changes: charm revision, disk, number of units, etc.
414
415 As such, we are only supporting a limited subset of upgrades. We'll
416 upgrade the charm revision but leave storage and scale untouched.
417
418 Scale changes should happen through OSM constructs, and changes to
419 storage would require a redeployment of the service, at least in this
420 initial release.
421 """
422 raise MethodNotImplemented()
423
424 """Rollback"""
425
426 async def rollback(
427 self,
428 cluster_uuid: str,
429 kdu_instance: str,
430 revision: int = 0,
431 ) -> str:
432 """Rollback a model
433
434 :param cluster_uuid str: The UUID of the cluster to rollback
435 :param kdu_instance str: The unique name of the KDU instance
436 :param revision int: The revision to revert to. If omitted, rolls back
437 the previous upgrade.
438
439 :return: If successful, returns the revision of active KDU instance,
440 or raises an exception
441 """
442 raise MethodNotImplemented()
443
444 """Deletion"""
445
446 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
447 """Uninstall a KDU instance
448
449 :param cluster_uuid str: The UUID of the cluster
450 :param kdu_instance str: The unique name of the KDU instance
451
452 :return: Returns True if successful, or raises an exception
453 """
454
455 self.log.debug("[uninstall] Destroying model")
456
457 await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
458
459 # self.log.debug("[uninstall] Model destroyed and disconnecting")
460 # await controller.disconnect()
461
462 return True
463 # TODO: Remove these commented lines
464 # if not self.authenticated:
465 # self.log.debug("[uninstall] Connecting to controller")
466 # await self.login(cluster_uuid)
467
468 async def exec_primitive(
469 self,
470 cluster_uuid: str = None,
471 kdu_instance: str = None,
472 primitive_name: str = None,
473 timeout: float = 300,
474 params: dict = None,
475 db_dict: dict = None,
476 ) -> str:
477 """Exec primitive (Juju action)
478
479 :param cluster_uuid str: The UUID of the cluster
480 :param kdu_instance str: The unique name of the KDU instance
481 :param primitive_name: Name of action that will be executed
482 :param timeout: Timeout for action execution
483 :param params: Dictionary of all the parameters needed for the action
484 :db_dict: Dictionary for any additional data
485
486 :return: Returns the output of the action
487 """
488
489 if not params or "application-name" not in params:
490 raise K8sException(
491 "Missing application-name argument, \
492 argument needed for K8s actions"
493 )
494 try:
495 self.log.debug(
496 "[exec_primitive] Getting model "
497 "kdu_instance: {}".format(kdu_instance)
498 )
499 application_name = params["application-name"]
500 actions = await self.libjuju.get_actions(application_name, kdu_instance)
501 if primitive_name not in actions:
502 raise K8sException("Primitive {} not found".format(primitive_name))
503 output, status = await self.libjuju.execute_action(
504 application_name, kdu_instance, primitive_name, **params
505 )
506
507 if status != "completed":
508 raise K8sException(
509 "status is not completed: {} output: {}".format(status, output)
510 )
511
512 return output
513
514 except Exception as e:
515 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
516 self.log.error(error_msg)
517 raise K8sException(message=error_msg)
518
519 """Introspection"""
520
521 async def inspect_kdu(
522 self,
523 kdu_model: str,
524 ) -> dict:
525 """Inspect a KDU
526
527 Inspects a bundle and returns a dictionary of config parameters and
528 their default values.
529
530 :param kdu_model str: The name or path of the bundle to inspect.
531
532 :return: If successful, returns a dictionary of available parameters
533 and their default values.
534 """
535
536 kdu = {}
537 if not os.path.exists(kdu_model):
538 raise K8sException("file {} not found".format(kdu_model))
539
540 with open(kdu_model, "r") as f:
541 bundle = yaml.safe_load(f.read())
542
543 """
544 {
545 'description': 'Test bundle',
546 'bundle': 'kubernetes',
547 'applications': {
548 'mariadb-k8s': {
549 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
550 'scale': 1,
551 'options': {
552 'password': 'manopw',
553 'root_password': 'osm4u',
554 'user': 'mano'
555 },
556 'series': 'kubernetes'
557 }
558 }
559 }
560 """
561 # TODO: This should be returned in an agreed-upon format
562 kdu = bundle["applications"]
563
564 return kdu
565
566 async def help_kdu(
567 self,
568 kdu_model: str,
569 ) -> str:
570 """View the README
571
572 If available, returns the README of the bundle.
573
574 :param kdu_model str: The name or path of a bundle
575
576 :return: If found, returns the contents of the README.
577 """
578 readme = None
579
580 files = ["README", "README.txt", "README.md"]
581 path = os.path.dirname(kdu_model)
582 for file in os.listdir(path):
583 if file in files:
584 with open(file, "r") as f:
585 readme = f.read()
586 break
587
588 return readme
589
590 async def status_kdu(
591 self,
592 cluster_uuid: str,
593 kdu_instance: str,
594 ) -> dict:
595 """Get the status of the KDU
596
597 Get the current status of the KDU instance.
598
599 :param cluster_uuid str: The UUID of the cluster
600 :param kdu_instance str: The unique id of the KDU instance
601
602 :return: Returns a dictionary containing namespace, state, resources,
603 and deployment_time.
604 """
605 status = {}
606 model_status = await self.libjuju.get_model_status(kdu_instance)
607 for name in model_status.applications:
608 application = model_status.applications[name]
609 status[name] = {"status": application["status"]["status"]}
610
611 return status
612
613 async def get_services(
614 self, cluster_uuid: str, kdu_instance: str, namespace: str
615 ) -> list:
616 """Return a list of services of a kdu_instance"""
617
618 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
619
620 kubecfg = tempfile.NamedTemporaryFile()
621 with open(kubecfg.name, "w") as kubecfg_file:
622 kubecfg_file.write(credentials)
623 kubectl = Kubectl(config_file=kubecfg.name)
624
625 return kubectl.get_services(
626 field_selector="metadata.namespace={}".format(kdu_instance)
627 )
628
629 async def get_service(
630 self, cluster_uuid: str, service_name: str, namespace: str
631 ) -> object:
632 """Return data for a specific service inside a namespace"""
633
634 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
635
636 kubecfg = tempfile.NamedTemporaryFile()
637 with open(kubecfg.name, "w") as kubecfg_file:
638 kubecfg_file.write(credentials)
639 kubectl = Kubectl(config_file=kubecfg.name)
640
641 return kubectl.get_services(
642 field_selector="metadata.name={},metadata.namespace={}".format(
643 service_name, namespace
644 )
645 )[0]
646
647 def get_credentials(self, cluster_uuid: str) -> str:
648 """
649 Get Cluster Kubeconfig
650 """
651 k8scluster = self.db.get_one(
652 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
653 )
654
655 self.db.encrypt_decrypt_fields(
656 k8scluster.get("credentials"),
657 "decrypt",
658 ["password", "secret"],
659 schema_version=k8scluster["schema_version"],
660 salt=k8scluster["_id"],
661 )
662
663 return yaml.safe_dump(k8scluster.get("credentials"))
664
665 def _get_credential_name(self, cluster_uuid: str) -> str:
666 """
667 Get credential name for a k8s cloud
668
669 We cannot use the cluster_uuid for the credential name directly,
670 because it cannot start with a number, it must start with a letter.
671 Therefore, the k8s cloud credential name will be "cred-" followed
672 by the cluster uuid.
673
674 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
675
676 :return: Name to use for the credential name.
677 """
678 return "cred-{}".format(cluster_uuid)
679
680 def get_namespace(
681 self,
682 cluster_uuid: str,
683 ) -> str:
684 """Get the namespace UUID
685 Gets the namespace's unique name
686
687 :param cluster_uuid str: The UUID of the cluster
688 :returns: The namespace UUID, or raises an exception
689 """
690 pass
691
692 def _create_cluster_role(
693 self,
694 kubectl: Kubectl,
695 name: str,
696 labels: Dict[str, str],
697 ):
698 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
699 field_selector="metadata.name={}".format(name)
700 )
701
702 if len(cluster_roles.items) > 0:
703 raise Exception(
704 "Cluster role with metadata.name={} already exists".format(name)
705 )
706
707 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
708 # Cluster role
709 cluster_role = V1ClusterRole(
710 metadata=metadata,
711 rules=[
712 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
713 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
714 ],
715 )
716
717 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
718
719 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
720 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
721
722 def _create_service_account(
723 self,
724 kubectl: Kubectl,
725 name: str,
726 labels: Dict[str, str],
727 ):
728 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
729 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
730 )
731 if len(service_accounts.items) > 0:
732 raise Exception(
733 "Service account with metadata.name={} already exists".format(name)
734 )
735
736 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
737 service_account = V1ServiceAccount(metadata=metadata)
738
739 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
740 ADMIN_NAMESPACE, service_account
741 )
742
743 def _delete_service_account(self, kubectl: Kubectl, name: str):
744 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
745 name, ADMIN_NAMESPACE
746 )
747
748 def _create_cluster_role_binding(
749 self,
750 kubectl: Kubectl,
751 name: str,
752 labels: Dict[str, str],
753 ):
754 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
755 field_selector="metadata.name={}".format(name)
756 )
757 if len(role_bindings.items) > 0:
758 raise Exception("Generated rbac id already exists")
759
760 role_binding = V1ClusterRoleBinding(
761 metadata=V1ObjectMeta(name=name, labels=labels),
762 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
763 subjects=[
764 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
765 ],
766 )
767 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
768
769 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
770 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
771
772 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
773 v1_core = kubectl.clients[CORE_CLIENT]
774
775 retries_limit = 10
776 secret_name = None
777 while True:
778 retries_limit -= 1
779 service_accounts = v1_core.list_namespaced_service_account(
780 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
781 )
782 if len(service_accounts.items) == 0:
783 raise Exception(
784 "Service account not found with metadata.name={}".format(name)
785 )
786 service_account = service_accounts.items[0]
787 if service_account.secrets and len(service_account.secrets) > 0:
788 secret_name = service_account.secrets[0].name
789 if secret_name is not None or not retries_limit:
790 break
791 if not secret_name:
792 raise Exception(
793 "Failed getting the secret from service account {}".format(name)
794 )
795 secret = v1_core.list_namespaced_secret(
796 ADMIN_NAMESPACE,
797 field_selector="metadata.name={}".format(secret_name),
798 ).items[0]
799
800 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
801 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
802
803 return (
804 base64.b64decode(token).decode("utf-8"),
805 base64.b64decode(client_certificate_data).decode("utf-8"),
806 )