Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.config import ModelConfig
24 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
27 from .exceptions import MethodNotImplemented
28 from n2vc.utils import base64_to_cacert
29 from n2vc.libjuju import Libjuju
30 from n2vc.utils import obj_to_dict, obj_to_yaml
31
32 from kubernetes.client.models import (
33 V1ClusterRole,
34 V1ObjectMeta,
35 V1PolicyRule,
36 V1ServiceAccount,
37 V1ClusterRoleBinding,
38 V1RoleRef,
39 V1Subject,
40 )
41
42 from typing import Dict
43
44 SERVICE_ACCOUNT_TOKEN_KEY = "token"
45 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
46 RBAC_LABEL_KEY_NAME = "rbac-id"
47
48 ADMIN_NAMESPACE = "kube-system"
49 RBAC_STACK_PREFIX = "juju-credential"
50
51
52 def generate_rbac_id():
53 return binascii.hexlify(os.urandom(4)).decode()
54
55
56 class K8sJujuConnector(K8sConnector):
57 def __init__(
58 self,
59 fs: object,
60 db: object,
61 kubectl_command: str = "/usr/bin/kubectl",
62 juju_command: str = "/usr/bin/juju",
63 log: object = None,
64 loop: object = None,
65 on_update_db=None,
66 vca_config: dict = None,
67 ):
68 """
69 :param fs: file system for kubernetes and helm configuration
70 :param db: Database object
71 :param kubectl_command: path to kubectl executable
72 :param helm_command: path to helm executable
73 :param log: logger
74 :param: loop: Asyncio loop
75 """
76
77 # parent class
78 K8sConnector.__init__(
79 self,
80 db,
81 log=log,
82 on_update_db=on_update_db,
83 )
84
85 self.fs = fs
86 self.loop = loop or asyncio.get_event_loop()
87 self.log.debug("Initializing K8S Juju connector")
88
89 required_vca_config = [
90 "host",
91 "user",
92 "secret",
93 "ca_cert",
94 ]
95 if not vca_config or not all(k in vca_config for k in required_vca_config):
96 raise N2VCBadArgumentsException(
97 message="Missing arguments in vca_config: {}".format(vca_config),
98 bad_args=required_vca_config,
99 )
100 port = vca_config["port"] if "port" in vca_config else 17070
101 url = "{}:{}".format(vca_config["host"], port)
102 model_config = ModelConfig(vca_config)
103 username = vca_config["user"]
104 secret = vca_config["secret"]
105 ca_cert = base64_to_cacert(vca_config["ca_cert"])
106
107 self.libjuju = Libjuju(
108 endpoint=url,
109 api_proxy=None, # Not needed for k8s charms
110 model_config=model_config,
111 username=username,
112 password=secret,
113 cacert=ca_cert,
114 loop=self.loop,
115 log=self.log,
116 db=self.db,
117 )
118 self.log.debug("K8S Juju connector initialized")
119 # TODO: Remove these commented lines:
120 # self.authenticated = False
121 # self.models = {}
122 # self.juju_secret = ""
123
124 """Initialization"""
125
126 async def init_env(
127 self,
128 k8s_creds: str,
129 namespace: str = "kube-system",
130 reuse_cluster_uuid: str = None,
131 ) -> (str, bool):
132 """
133 It prepares a given K8s cluster environment to run Juju bundles.
134
135 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
136 '.kube/config'
137 :param namespace: optional namespace to be used for juju. By default,
138 'kube-system' will be used
139 :param reuse_cluster_uuid: existing cluster uuid for reuse
140 :return: uuid of the K8s cluster and True if connector has installed some
141 software in the cluster
142 (on error, an exception will be raised)
143 """
144
145 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
146
147 kubecfg = tempfile.NamedTemporaryFile()
148 with open(kubecfg.name, "w") as kubecfg_file:
149 kubecfg_file.write(k8s_creds)
150 kubectl = Kubectl(config_file=kubecfg.name)
151
152 # CREATING RESOURCES IN K8S
153 rbac_id = generate_rbac_id()
154 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
155 labels = {RBAC_STACK_PREFIX: rbac_id}
156
157 # Create cleanup dictionary to clean up created resources
158 # if it fails in the middle of the process
159 cleanup_data = []
160 try:
161 self._create_cluster_role(
162 kubectl,
163 name=metadata_name,
164 labels=labels,
165 )
166 cleanup_data.append(
167 {
168 "delete": self._delete_cluster_role,
169 "args": (kubectl, metadata_name),
170 }
171 )
172
173 self._create_service_account(
174 kubectl,
175 name=metadata_name,
176 labels=labels,
177 )
178 cleanup_data.append(
179 {
180 "delete": self._delete_service_account,
181 "args": (kubectl, metadata_name),
182 }
183 )
184
185 self._create_cluster_role_binding(
186 kubectl,
187 name=metadata_name,
188 labels=labels,
189 )
190 cleanup_data.append(
191 {
192 "delete": self._delete_service_account,
193 "args": (kubectl, metadata_name),
194 }
195 )
196 token, client_cert_data = await self._get_secret_data(
197 kubectl,
198 metadata_name,
199 )
200
201 default_storage_class = kubectl.get_default_storage_class()
202 await self.libjuju.add_k8s(
203 name=cluster_uuid,
204 rbac_id=rbac_id,
205 token=token,
206 client_cert_data=client_cert_data,
207 configuration=kubectl.configuration,
208 storage_class=default_storage_class,
209 credential_name=self._get_credential_name(cluster_uuid),
210 )
211 return cluster_uuid, True
212 except Exception as e:
213 self.log.error("Error initializing k8scluster: {}".format(e))
214 if len(cleanup_data) > 0:
215 self.log.debug("Cleaning up created resources in k8s cluster...")
216 for item in cleanup_data:
217 delete_function = item["delete"]
218 delete_args = item["args"]
219 delete_function(*delete_args)
220 self.log.debug("Cleanup finished")
221 raise e
222
223 """Repo Management"""
224
225 async def repo_add(
226 self,
227 name: str,
228 url: str,
229 _type: str = "charm",
230 ):
231 raise MethodNotImplemented()
232
233 async def repo_list(self):
234 raise MethodNotImplemented()
235
236 async def repo_remove(
237 self,
238 name: str,
239 ):
240 raise MethodNotImplemented()
241
242 async def synchronize_repos(self, cluster_uuid: str, name: str):
243 """
244 Returns None as currently add_repo is not implemented
245 """
246 return None
247
248 """Reset"""
249
250 async def reset(
251 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
252 ) -> bool:
253 """Reset a cluster
254
255 Resets the Kubernetes cluster by removing the model that represents it.
256
257 :param cluster_uuid str: The UUID of the cluster to reset
258 :return: Returns True if successful or raises an exception.
259 """
260
261 try:
262 self.log.debug("[reset] Removing k8s cloud")
263
264 cloud_creds = await self.libjuju.get_cloud_credentials(
265 cluster_uuid,
266 self._get_credential_name(cluster_uuid),
267 )
268
269 await self.libjuju.remove_cloud(cluster_uuid)
270
271 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
272
273 kubecfg_file = tempfile.NamedTemporaryFile()
274 with open(kubecfg_file.name, "w") as f:
275 f.write(kubecfg)
276 kubectl = Kubectl(config_file=kubecfg_file.name)
277
278 delete_functions = [
279 self._delete_cluster_role_binding,
280 self._delete_service_account,
281 self._delete_cluster_role,
282 ]
283
284 credential_attrs = cloud_creds[0].result["attrs"]
285 if RBAC_LABEL_KEY_NAME in credential_attrs:
286 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
287 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
288 delete_args = (kubectl, metadata_name)
289 for delete_func in delete_functions:
290 try:
291 delete_func(*delete_args)
292 except Exception as e:
293 self.log.warning("Cannot remove resource in K8s {}".format(e))
294
295 except Exception as e:
296 self.log.debug("Caught exception during reset: {}".format(e))
297 raise e
298 return True
299
300 """Deployment"""
301
302 async def install(
303 self,
304 cluster_uuid: str,
305 kdu_model: str,
306 kdu_instance: str,
307 atomic: bool = True,
308 timeout: float = 1800,
309 params: dict = None,
310 db_dict: dict = None,
311 kdu_name: str = None,
312 namespace: str = None,
313 ) -> bool:
314 """Install a bundle
315
316 :param cluster_uuid str: The UUID of the cluster to install to
317 :param kdu_model str: The name or path of a bundle to install
318 :param kdu_instance: Kdu instance name
319 :param atomic bool: If set, waits until the model is active and resets
320 the cluster on failure.
321 :param timeout int: The time, in seconds, to wait for the install
322 to finish
323 :param params dict: Key-value pairs of instantiation parameters
324 :param kdu_name: Name of the KDU instance to be installed
325 :param namespace: K8s namespace to use for the KDU instance
326
327 :return: If successful, returns ?
328 """
329 bundle = kdu_model
330
331 if not db_dict:
332 raise K8sException("db_dict must be set")
333 if not bundle:
334 raise K8sException("bundle must be set")
335
336 if bundle.startswith("cs:"):
337 pass
338 elif bundle.startswith("http"):
339 # Download the file
340 pass
341 else:
342 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
343 os.chdir(new_workdir)
344 bundle = "local:{}".format(kdu_model)
345
346 self.log.debug("Checking for model named {}".format(kdu_instance))
347
348 # Create the new model
349 self.log.debug("Adding model: {}".format(kdu_instance))
350 await self.libjuju.add_model(
351 model_name=kdu_instance,
352 cloud_name=cluster_uuid,
353 credential_name=self._get_credential_name(cluster_uuid),
354 )
355
356 # if model:
357 # TODO: Instantiation parameters
358
359 """
360 "Juju bundle that models the KDU, in any of the following ways:
361 - <juju-repo>/<juju-bundle>
362 - <juju-bundle folder under k8s_models folder in the package>
363 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
364 in the package>
365 - <URL_where_to_fetch_juju_bundle>
366 """
367 try:
368 previous_workdir = os.getcwd()
369 except FileNotFoundError:
370 previous_workdir = "/app/storage"
371
372 self.log.debug("[install] deploying {}".format(bundle))
373 await self.libjuju.deploy(
374 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
375 )
376 os.chdir(previous_workdir)
377 if self.on_update_db:
378 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
379 return True
380
381 async def instances_list(self, cluster_uuid: str) -> list:
382 """
383 returns a list of deployed releases in a cluster
384
385 :param cluster_uuid: the cluster
386 :return:
387 """
388 return []
389
390 async def upgrade(
391 self,
392 cluster_uuid: str,
393 kdu_instance: str,
394 kdu_model: str = None,
395 params: dict = None,
396 ) -> str:
397 """Upgrade a model
398
399 :param cluster_uuid str: The UUID of the cluster to upgrade
400 :param kdu_instance str: The unique name of the KDU instance
401 :param kdu_model str: The name or path of the bundle to upgrade to
402 :param params dict: Key-value pairs of instantiation parameters
403
404 :return: If successful, reference to the new revision number of the
405 KDU instance.
406 """
407
408 # TODO: Loop through the bundle and upgrade each charm individually
409
410 """
411 The API doesn't have a concept of bundle upgrades, because there are
412 many possible changes: charm revision, disk, number of units, etc.
413
414 As such, we are only supporting a limited subset of upgrades. We'll
415 upgrade the charm revision but leave storage and scale untouched.
416
417 Scale changes should happen through OSM constructs, and changes to
418 storage would require a redeployment of the service, at least in this
419 initial release.
420 """
421 raise MethodNotImplemented()
422
423 """Rollback"""
424
425 async def rollback(
426 self,
427 cluster_uuid: str,
428 kdu_instance: str,
429 revision: int = 0,
430 ) -> str:
431 """Rollback a model
432
433 :param cluster_uuid str: The UUID of the cluster to rollback
434 :param kdu_instance str: The unique name of the KDU instance
435 :param revision int: The revision to revert to. If omitted, rolls back
436 the previous upgrade.
437
438 :return: If successful, returns the revision of active KDU instance,
439 or raises an exception
440 """
441 raise MethodNotImplemented()
442
443 """Deletion"""
444
445 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
446 """Uninstall a KDU instance
447
448 :param cluster_uuid str: The UUID of the cluster
449 :param kdu_instance str: The unique name of the KDU instance
450
451 :return: Returns True if successful, or raises an exception
452 """
453
454 self.log.debug("[uninstall] Destroying model")
455
456 await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
457
458 # self.log.debug("[uninstall] Model destroyed and disconnecting")
459 # await controller.disconnect()
460
461 return True
462 # TODO: Remove these commented lines
463 # if not self.authenticated:
464 # self.log.debug("[uninstall] Connecting to controller")
465 # await self.login(cluster_uuid)
466
467 async def exec_primitive(
468 self,
469 cluster_uuid: str = None,
470 kdu_instance: str = None,
471 primitive_name: str = None,
472 timeout: float = 300,
473 params: dict = None,
474 db_dict: dict = None,
475 ) -> str:
476 """Exec primitive (Juju action)
477
478 :param cluster_uuid str: The UUID of the cluster
479 :param kdu_instance str: The unique name of the KDU instance
480 :param primitive_name: Name of action that will be executed
481 :param timeout: Timeout for action execution
482 :param params: Dictionary of all the parameters needed for the action
483 :db_dict: Dictionary for any additional data
484
485 :return: Returns the output of the action
486 """
487
488 if not params or "application-name" not in params:
489 raise K8sException(
490 "Missing application-name argument, \
491 argument needed for K8s actions"
492 )
493 try:
494 self.log.debug(
495 "[exec_primitive] Getting model "
496 "kdu_instance: {}".format(kdu_instance)
497 )
498 application_name = params["application-name"]
499 actions = await self.libjuju.get_actions(application_name, kdu_instance)
500 if primitive_name not in actions:
501 raise K8sException("Primitive {} not found".format(primitive_name))
502 output, status = await self.libjuju.execute_action(
503 application_name, kdu_instance, primitive_name, **params
504 )
505
506 if status != "completed":
507 raise K8sException(
508 "status is not completed: {} output: {}".format(status, output)
509 )
510 if self.on_update_db:
511 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
512
513 return output
514
515 except Exception as e:
516 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
517 self.log.error(error_msg)
518 raise K8sException(message=error_msg)
519
520 """Introspection"""
521
522 async def inspect_kdu(
523 self,
524 kdu_model: str,
525 ) -> dict:
526 """Inspect a KDU
527
528 Inspects a bundle and returns a dictionary of config parameters and
529 their default values.
530
531 :param kdu_model str: The name or path of the bundle to inspect.
532
533 :return: If successful, returns a dictionary of available parameters
534 and their default values.
535 """
536
537 kdu = {}
538 if not os.path.exists(kdu_model):
539 raise K8sException("file {} not found".format(kdu_model))
540
541 with open(kdu_model, "r") as f:
542 bundle = yaml.safe_load(f.read())
543
544 """
545 {
546 'description': 'Test bundle',
547 'bundle': 'kubernetes',
548 'applications': {
549 'mariadb-k8s': {
550 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
551 'scale': 1,
552 'options': {
553 'password': 'manopw',
554 'root_password': 'osm4u',
555 'user': 'mano'
556 },
557 'series': 'kubernetes'
558 }
559 }
560 }
561 """
562 # TODO: This should be returned in an agreed-upon format
563 kdu = bundle["applications"]
564
565 return kdu
566
567 async def help_kdu(
568 self,
569 kdu_model: str,
570 ) -> str:
571 """View the README
572
573 If available, returns the README of the bundle.
574
575 :param kdu_model str: The name or path of a bundle
576
577 :return: If found, returns the contents of the README.
578 """
579 readme = None
580
581 files = ["README", "README.txt", "README.md"]
582 path = os.path.dirname(kdu_model)
583 for file in os.listdir(path):
584 if file in files:
585 with open(file, "r") as f:
586 readme = f.read()
587 break
588
589 return readme
590
591 async def status_kdu(
592 self,
593 cluster_uuid: str,
594 kdu_instance: str,
595 complete_status: bool = False,
596 yaml_format: bool = False
597 ) -> dict:
598 """Get the status of the KDU
599
600 Get the current status of the KDU instance.
601
602 :param cluster_uuid str: The UUID of the cluster
603 :param kdu_instance str: The unique id of the KDU instance
604 :param complete_status: To get the complete_status of the KDU
605 :param yaml_format: To get the status in proper format for NSR record
606
607 :return: Returns a dictionary containing namespace, state, resources,
608 and deployment_time and returns complete_status if complete_status is True
609 """
610 status = {}
611
612 model_status = await self.libjuju.get_model_status(kdu_instance)
613
614 if not complete_status:
615 for name in model_status.applications:
616 application = model_status.applications[name]
617 status[name] = {"status": application["status"]["status"]}
618 else:
619 if yaml_format:
620 return obj_to_yaml(model_status)
621 else:
622 return obj_to_dict(model_status)
623
624 return status
625
626 async def update_vca_status(self, vcastatus: dict, kdu_instance: str):
627 """
628 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
629
630 :param vcastatus dict: dict containing vcastatus
631 :param kdu_instance str: The unique id of the KDU instance
632
633 :return: None
634 """
635 try:
636 for model_name in vcastatus:
637 # Adding executed actions
638 vcastatus[model_name]["executedActions"] = \
639 await self.libjuju.get_executed_actions(kdu_instance)
640
641 for application in vcastatus[model_name]["applications"]:
642 # Adding application actions
643 vcastatus[model_name]["applications"][application]["actions"] = \
644 await self.libjuju.get_actions(application, kdu_instance)
645 # Adding application configs
646 vcastatus[model_name]["applications"][application]["configs"] = \
647 await self.libjuju.get_application_configs(kdu_instance, application)
648
649 except Exception as e:
650 self.log.debug("Error in updating vca status: {}".format(str(e)))
651
652 async def get_services(
653 self, cluster_uuid: str, kdu_instance: str, namespace: str
654 ) -> list:
655 """Return a list of services of a kdu_instance"""
656
657 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
658
659 kubecfg = tempfile.NamedTemporaryFile()
660 with open(kubecfg.name, "w") as kubecfg_file:
661 kubecfg_file.write(credentials)
662 kubectl = Kubectl(config_file=kubecfg.name)
663
664 return kubectl.get_services(
665 field_selector="metadata.namespace={}".format(kdu_instance)
666 )
667
668 async def get_service(
669 self, cluster_uuid: str, service_name: str, namespace: str
670 ) -> object:
671 """Return data for a specific service inside a namespace"""
672
673 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
674
675 kubecfg = tempfile.NamedTemporaryFile()
676 with open(kubecfg.name, "w") as kubecfg_file:
677 kubecfg_file.write(credentials)
678 kubectl = Kubectl(config_file=kubecfg.name)
679
680 return kubectl.get_services(
681 field_selector="metadata.name={},metadata.namespace={}".format(
682 service_name, namespace
683 )
684 )[0]
685
686 def get_credentials(self, cluster_uuid: str) -> str:
687 """
688 Get Cluster Kubeconfig
689 """
690 k8scluster = self.db.get_one(
691 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
692 )
693
694 self.db.encrypt_decrypt_fields(
695 k8scluster.get("credentials"),
696 "decrypt",
697 ["password", "secret"],
698 schema_version=k8scluster["schema_version"],
699 salt=k8scluster["_id"],
700 )
701
702 return yaml.safe_dump(k8scluster.get("credentials"))
703
704 def _get_credential_name(self, cluster_uuid: str) -> str:
705 """
706 Get credential name for a k8s cloud
707
708 We cannot use the cluster_uuid for the credential name directly,
709 because it cannot start with a number, it must start with a letter.
710 Therefore, the k8s cloud credential name will be "cred-" followed
711 by the cluster uuid.
712
713 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
714
715 :return: Name to use for the credential name.
716 """
717 return "cred-{}".format(cluster_uuid)
718
719 def get_namespace(
720 self,
721 cluster_uuid: str,
722 ) -> str:
723 """Get the namespace UUID
724 Gets the namespace's unique name
725
726 :param cluster_uuid str: The UUID of the cluster
727 :returns: The namespace UUID, or raises an exception
728 """
729 pass
730
731 def _create_cluster_role(
732 self,
733 kubectl: Kubectl,
734 name: str,
735 labels: Dict[str, str],
736 ):
737 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
738 field_selector="metadata.name={}".format(name)
739 )
740
741 if len(cluster_roles.items) > 0:
742 raise Exception(
743 "Cluster role with metadata.name={} already exists".format(name)
744 )
745
746 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
747 # Cluster role
748 cluster_role = V1ClusterRole(
749 metadata=metadata,
750 rules=[
751 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
752 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
753 ],
754 )
755
756 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
757
758 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
759 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
760
761 def _create_service_account(
762 self,
763 kubectl: Kubectl,
764 name: str,
765 labels: Dict[str, str],
766 ):
767 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
768 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
769 )
770 if len(service_accounts.items) > 0:
771 raise Exception(
772 "Service account with metadata.name={} already exists".format(name)
773 )
774
775 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
776 service_account = V1ServiceAccount(metadata=metadata)
777
778 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
779 ADMIN_NAMESPACE, service_account
780 )
781
782 def _delete_service_account(self, kubectl: Kubectl, name: str):
783 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
784 name, ADMIN_NAMESPACE
785 )
786
787 def _create_cluster_role_binding(
788 self,
789 kubectl: Kubectl,
790 name: str,
791 labels: Dict[str, str],
792 ):
793 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
794 field_selector="metadata.name={}".format(name)
795 )
796 if len(role_bindings.items) > 0:
797 raise Exception("Generated rbac id already exists")
798
799 role_binding = V1ClusterRoleBinding(
800 metadata=V1ObjectMeta(name=name, labels=labels),
801 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
802 subjects=[
803 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
804 ],
805 )
806 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
807
808 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
809 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
810
811 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
812 v1_core = kubectl.clients[CORE_CLIENT]
813
814 retries_limit = 10
815 secret_name = None
816 while True:
817 retries_limit -= 1
818 service_accounts = v1_core.list_namespaced_service_account(
819 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
820 )
821 if len(service_accounts.items) == 0:
822 raise Exception(
823 "Service account not found with metadata.name={}".format(name)
824 )
825 service_account = service_accounts.items[0]
826 if service_account.secrets and len(service_account.secrets) > 0:
827 secret_name = service_account.secrets[0].name
828 if secret_name is not None or not retries_limit:
829 break
830 if not secret_name:
831 raise Exception(
832 "Failed getting the secret from service account {}".format(name)
833 )
834 secret = v1_core.list_namespaced_secret(
835 ADMIN_NAMESPACE,
836 field_selector="metadata.name={}".format(secret_name),
837 ).items[0]
838
839 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
840 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
841
842 return (
843 base64.b64decode(token).decode("utf-8"),
844 base64.b64decode(client_certificate_data).decode("utf-8"),
845 )
846
847 @staticmethod
848 def generate_kdu_instance_name(**kwargs):
849 db_dict = kwargs.get("db_dict")
850 kdu_name = kwargs.get("kdu_name", None)
851 if kdu_name:
852 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
853 else:
854 kdu_instance = db_dict["filter"]["_id"]
855 return kdu_instance