Code Coverage

Cobertura Coverage Report > n2vc >

k8s_juju_conn.py

Trend

Classes100%
 
Lines81%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
k8s_juju_conn.py
100%
1/1
81%
221/273
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_juju_conn.py
81%
221/273
N/A

Source

n2vc/k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 import asyncio
16 1 import os
17 1 import uuid
18 1 import yaml
19 1 import tempfile
20 1 import binascii
21 1 import base64
22
23 1 from n2vc.config import ModelConfig
24 1 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
25 1 from n2vc.k8s_conn import K8sConnector
26 1 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
27 1 from .exceptions import MethodNotImplemented
28 1 from n2vc.utils import base64_to_cacert
29 1 from n2vc.libjuju import Libjuju
30 1 from n2vc.utils import obj_to_dict, obj_to_yaml
31
32 1 from kubernetes.client.models import (
33     V1ClusterRole,
34     V1ObjectMeta,
35     V1PolicyRule,
36     V1ServiceAccount,
37     V1ClusterRoleBinding,
38     V1RoleRef,
39     V1Subject,
40 )
41
42 1 from typing import Dict
43
44 1 SERVICE_ACCOUNT_TOKEN_KEY = "token"
45 1 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
46 1 RBAC_LABEL_KEY_NAME = "rbac-id"
47
48 1 ADMIN_NAMESPACE = "kube-system"
49 1 RBAC_STACK_PREFIX = "juju-credential"
50
51
52 1 def generate_rbac_id():
53 1     return binascii.hexlify(os.urandom(4)).decode()
54
55
56 1 class K8sJujuConnector(K8sConnector):
57 1     def __init__(
58         self,
59         fs: object,
60         db: object,
61         kubectl_command: str = "/usr/bin/kubectl",
62         juju_command: str = "/usr/bin/juju",
63         log: object = None,
64         loop: object = None,
65         on_update_db=None,
66         vca_config: dict = None,
67     ):
68         """
69         :param fs: file system for kubernetes and helm configuration
70         :param db: Database object
71         :param kubectl_command: path to kubectl executable
72         :param helm_command: path to helm executable
73         :param log: logger
74         :param: loop: Asyncio loop
75         """
76
77         # parent class
78 1         K8sConnector.__init__(
79             self,
80             db,
81             log=log,
82             on_update_db=on_update_db,
83         )
84
85 1         self.fs = fs
86 1         self.loop = loop or asyncio.get_event_loop()
87 1         self.log.debug("Initializing K8S Juju connector")
88
89 1         required_vca_config = [
90             "host",
91             "user",
92             "secret",
93             "ca_cert",
94         ]
95 1         if not vca_config or not all(k in vca_config for k in required_vca_config):
96 1             raise N2VCBadArgumentsException(
97                 message="Missing arguments in vca_config: {}".format(vca_config),
98                 bad_args=required_vca_config,
99             )
100 1         port = vca_config["port"] if "port" in vca_config else 17070
101 1         url = "{}:{}".format(vca_config["host"], port)
102 1         model_config = ModelConfig(vca_config)
103 1         username = vca_config["user"]
104 1         secret = vca_config["secret"]
105 1         ca_cert = base64_to_cacert(vca_config["ca_cert"])
106
107 1         self.libjuju = Libjuju(
108             endpoint=url,
109             api_proxy=None,  # Not needed for k8s charms
110             model_config=model_config,
111             username=username,
112             password=secret,
113             cacert=ca_cert,
114             loop=self.loop,
115             log=self.log,
116             db=self.db,
117         )
118 1         self.log.debug("K8S Juju connector initialized")
119         # TODO: Remove these commented lines:
120         # self.authenticated = False
121         # self.models = {}
122         # self.juju_secret = ""
123
124     """Initialization"""
125
126 1     async def init_env(
127         self,
128         k8s_creds: str,
129         namespace: str = "kube-system",
130         reuse_cluster_uuid: str = None,
131     ) -> (str, bool):
132         """
133         It prepares a given K8s cluster environment to run Juju bundles.
134
135         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
136             '.kube/config'
137         :param namespace: optional namespace to be used for juju. By default,
138             'kube-system' will be used
139         :param reuse_cluster_uuid: existing cluster uuid for reuse
140         :return: uuid of the K8s cluster and True if connector has installed some
141             software in the cluster
142             (on error, an exception will be raised)
143         """
144
145 1         cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
146
147 1         kubecfg = tempfile.NamedTemporaryFile()
148 1         with open(kubecfg.name, "w") as kubecfg_file:
149 1             kubecfg_file.write(k8s_creds)
150 1         kubectl = Kubectl(config_file=kubecfg.name)
151
152         # CREATING RESOURCES IN K8S
153 1         rbac_id = generate_rbac_id()
154 1         metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
155 1         labels = {RBAC_STACK_PREFIX: rbac_id}
156
157         # Create cleanup dictionary to clean up created resources
158         # if it fails in the middle of the process
159 1         cleanup_data = []
160 1         try:
161 1             self._create_cluster_role(
162                 kubectl,
163                 name=metadata_name,
164                 labels=labels,
165             )
166 1             cleanup_data.append(
167                 {
168                     "delete": self._delete_cluster_role,
169                     "args": (kubectl, metadata_name),
170                 }
171             )
172
173 1             self._create_service_account(
174                 kubectl,
175                 name=metadata_name,
176                 labels=labels,
177             )
178 1             cleanup_data.append(
179                 {
180                     "delete": self._delete_service_account,
181                     "args": (kubectl, metadata_name),
182                 }
183             )
184
185 1             self._create_cluster_role_binding(
186                 kubectl,
187                 name=metadata_name,
188                 labels=labels,
189             )
190 1             cleanup_data.append(
191                 {
192                     "delete": self._delete_service_account,
193                     "args": (kubectl, metadata_name),
194                 }
195             )
196 1             token, client_cert_data = await self._get_secret_data(
197                 kubectl,
198                 metadata_name,
199             )
200
201 1             default_storage_class = kubectl.get_default_storage_class()
202 1             await self.libjuju.add_k8s(
203                 name=cluster_uuid,
204                 rbac_id=rbac_id,
205                 token=token,
206                 client_cert_data=client_cert_data,
207                 configuration=kubectl.configuration,
208                 storage_class=default_storage_class,
209                 credential_name=self._get_credential_name(cluster_uuid),
210             )
211 1             return cluster_uuid, True
212 1         except Exception as e:
213 1             self.log.error("Error initializing k8scluster: {}".format(e))
214 1             if len(cleanup_data) > 0:
215 1                 self.log.debug("Cleaning up created resources in k8s cluster...")
216 1                 for item in cleanup_data:
217 1                     delete_function = item["delete"]
218 1                     delete_args = item["args"]
219 1                     delete_function(*delete_args)
220 1                 self.log.debug("Cleanup finished")
221 1             raise e
222
223     """Repo Management"""
224
225 1     async def repo_add(
226         self,
227         name: str,
228         url: str,
229         _type: str = "charm",
230     ):
231 1         raise MethodNotImplemented()
232
233 1     async def repo_list(self):
234 1         raise MethodNotImplemented()
235
236 1     async def repo_remove(
237         self,
238         name: str,
239     ):
240 1         raise MethodNotImplemented()
241
242 1     async def synchronize_repos(self, cluster_uuid: str, name: str):
243         """
244         Returns None as currently add_repo is not implemented
245         """
246 1         return None
247
248     """Reset"""
249
250 1     async def reset(
251         self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
252     ) -> bool:
253         """Reset a cluster
254
255         Resets the Kubernetes cluster by removing the model that represents it.
256
257         :param cluster_uuid str: The UUID of the cluster to reset
258         :return: Returns True if successful or raises an exception.
259         """
260
261 1         try:
262 1             self.log.debug("[reset] Removing k8s cloud")
263
264 1             cloud_creds = await self.libjuju.get_cloud_credentials(
265                 cluster_uuid,
266                 self._get_credential_name(cluster_uuid),
267             )
268
269 1             await self.libjuju.remove_cloud(cluster_uuid)
270
271 1             kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
272
273 1             kubecfg_file = tempfile.NamedTemporaryFile()
274 1             with open(kubecfg_file.name, "w") as f:
275 1                 f.write(kubecfg)
276 1             kubectl = Kubectl(config_file=kubecfg_file.name)
277
278 1             delete_functions = [
279                 self._delete_cluster_role_binding,
280                 self._delete_service_account,
281                 self._delete_cluster_role,
282             ]
283
284 1             credential_attrs = cloud_creds[0].result["attrs"]
285 1             if RBAC_LABEL_KEY_NAME in credential_attrs:
286 1                 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
287 1                 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
288 1                 delete_args = (kubectl, metadata_name)
289 1                 for delete_func in delete_functions:
290 1                     try:
291 1                         delete_func(*delete_args)
292 0                     except Exception as e:
293 0                         self.log.warning("Cannot remove resource in K8s {}".format(e))
294
295 1         except Exception as e:
296 1             self.log.debug("Caught exception during reset: {}".format(e))
297 1             raise e
298 1         return True
299
300     """Deployment"""
301
302 1     async def install(
303         self,
304         cluster_uuid: str,
305         kdu_model: str,
306         kdu_instance: str,
307         atomic: bool = True,
308         timeout: float = 1800,
309         params: dict = None,
310         db_dict: dict = None,
311         kdu_name: str = None,
312         namespace: str = None,
313     ) -> bool:
314         """Install a bundle
315
316         :param cluster_uuid str: The UUID of the cluster to install to
317         :param kdu_model str: The name or path of a bundle to install
318         :param kdu_instance: Kdu instance name
319         :param atomic bool: If set, waits until the model is active and resets
320                             the cluster on failure.
321         :param timeout int: The time, in seconds, to wait for the install
322                             to finish
323         :param params dict: Key-value pairs of instantiation parameters
324         :param kdu_name: Name of the KDU instance to be installed
325         :param namespace: K8s namespace to use for the KDU instance
326
327         :return: If successful, returns ?
328         """
329 1         bundle = kdu_model
330
331 1         if not db_dict:
332 1             raise K8sException("db_dict must be set")
333 1         if not bundle:
334 1             raise K8sException("bundle must be set")
335
336 1         if bundle.startswith("cs:"):
337 1             pass
338 1         elif bundle.startswith("http"):
339             # Download the file
340 1             pass
341         else:
342 1             new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
343 1             os.chdir(new_workdir)
344 1             bundle = "local:{}".format(kdu_model)
345
346 1         self.log.debug("Checking for model named {}".format(kdu_instance))
347
348         # Create the new model
349 1         self.log.debug("Adding model: {}".format(kdu_instance))
350 1         await self.libjuju.add_model(
351             model_name=kdu_instance,
352             cloud_name=cluster_uuid,
353             credential_name=self._get_credential_name(cluster_uuid),
354         )
355
356         # if model:
357         # TODO: Instantiation parameters
358
359         """
360         "Juju bundle that models the KDU, in any of the following ways:
361             - <juju-repo>/<juju-bundle>
362             - <juju-bundle folder under k8s_models folder in the package>
363             - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
364                 in the package>
365             - <URL_where_to_fetch_juju_bundle>
366         """
367 1         try:
368 1             previous_workdir = os.getcwd()
369 1         except FileNotFoundError:
370 1             previous_workdir = "/app/storage"
371
372 1         self.log.debug("[install] deploying {}".format(bundle))
373 1         await self.libjuju.deploy(
374             bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
375         )
376 1         os.chdir(previous_workdir)
377 1         if self.on_update_db:
378 0             await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
379 1         return True
380
381 1     async def instances_list(self, cluster_uuid: str) -> list:
382         """
383         returns a list of deployed releases in a cluster
384
385         :param cluster_uuid: the cluster
386         :return:
387         """
388 1         return []
389
390 1     async def upgrade(
391         self,
392         cluster_uuid: str,
393         kdu_instance: str,
394         kdu_model: str = None,
395         params: dict = None,
396     ) -> str:
397         """Upgrade a model
398
399         :param cluster_uuid str: The UUID of the cluster to upgrade
400         :param kdu_instance str: The unique name of the KDU instance
401         :param kdu_model str: The name or path of the bundle to upgrade to
402         :param params dict: Key-value pairs of instantiation parameters
403
404         :return: If successful, reference to the new revision number of the
405                  KDU instance.
406         """
407
408         # TODO: Loop through the bundle and upgrade each charm individually
409
410         """
411         The API doesn't have a concept of bundle upgrades, because there are
412         many possible changes: charm revision, disk, number of units, etc.
413
414         As such, we are only supporting a limited subset of upgrades. We'll
415         upgrade the charm revision but leave storage and scale untouched.
416
417         Scale changes should happen through OSM constructs, and changes to
418         storage would require a redeployment of the service, at least in this
419         initial release.
420         """
421 1         raise MethodNotImplemented()
422
423     """Rollback"""
424
425 1     async def rollback(
426         self,
427         cluster_uuid: str,
428         kdu_instance: str,
429         revision: int = 0,
430     ) -> str:
431         """Rollback a model
432
433         :param cluster_uuid str: The UUID of the cluster to rollback
434         :param kdu_instance str: The unique name of the KDU instance
435         :param revision int: The revision to revert to. If omitted, rolls back
436                              the previous upgrade.
437
438         :return: If successful, returns the revision of active KDU instance,
439                  or raises an exception
440         """
441 1         raise MethodNotImplemented()
442
443     """Deletion"""
444
445 1     async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
446         """Uninstall a KDU instance
447
448         :param cluster_uuid str: The UUID of the cluster
449         :param kdu_instance str: The unique name of the KDU instance
450
451         :return: Returns True if successful, or raises an exception
452         """
453
454 1         self.log.debug("[uninstall] Destroying model")
455
456 1         await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
457
458         # self.log.debug("[uninstall] Model destroyed and disconnecting")
459         # await controller.disconnect()
460
461 1         return True
462         # TODO: Remove these commented lines
463         # if not self.authenticated:
464         #     self.log.debug("[uninstall] Connecting to controller")
465         #     await self.login(cluster_uuid)
466
467 1     async def exec_primitive(
468         self,
469         cluster_uuid: str = None,
470         kdu_instance: str = None,
471         primitive_name: str = None,
472         timeout: float = 300,
473         params: dict = None,
474         db_dict: dict = None,
475     ) -> str:
476         """Exec primitive (Juju action)
477
478         :param cluster_uuid str: The UUID of the cluster
479         :param kdu_instance str: The unique name of the KDU instance
480         :param primitive_name: Name of action that will be executed
481         :param timeout: Timeout for action execution
482         :param params: Dictionary of all the parameters needed for the action
483         :db_dict: Dictionary for any additional data
484
485         :return: Returns the output of the action
486         """
487
488 1         if not params or "application-name" not in params:
489 1             raise K8sException(
490                 "Missing application-name argument, \
491                                 argument needed for K8s actions"
492             )
493 1         try:
494 1             self.log.debug(
495                 "[exec_primitive] Getting model "
496                 "kdu_instance: {}".format(kdu_instance)
497             )
498 1             application_name = params["application-name"]
499 1             actions = await self.libjuju.get_actions(application_name, kdu_instance)
500 1             if primitive_name not in actions:
501 1                 raise K8sException("Primitive {} not found".format(primitive_name))
502 1             output, status = await self.libjuju.execute_action(
503                 application_name, kdu_instance, primitive_name, **params
504             )
505
506 1             if status != "completed":
507 1                 raise K8sException(
508                     "status is not completed: {} output: {}".format(status, output)
509                 )
510 1             if self.on_update_db:
511 0                 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
512
513 1             return output
514
515 1         except Exception as e:
516 1             error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
517 1             self.log.error(error_msg)
518 1             raise K8sException(message=error_msg)
519
520     """Introspection"""
521
522 1     async def inspect_kdu(
523         self,
524         kdu_model: str,
525     ) -> dict:
526         """Inspect a KDU
527
528         Inspects a bundle and returns a dictionary of config parameters and
529         their default values.
530
531         :param kdu_model str: The name or path of the bundle to inspect.
532
533         :return: If successful, returns a dictionary of available parameters
534                  and their default values.
535         """
536
537 1         kdu = {}
538 1         if not os.path.exists(kdu_model):
539 1             raise K8sException("file {} not found".format(kdu_model))
540
541 1         with open(kdu_model, "r") as f:
542 1             bundle = yaml.safe_load(f.read())
543
544             """
545             {
546                 'description': 'Test bundle',
547                 'bundle': 'kubernetes',
548                 'applications': {
549                     'mariadb-k8s': {
550                         'charm': 'cs:~charmed-osm/mariadb-k8s-20',
551                         'scale': 1,
552                         'options': {
553                             'password': 'manopw',
554                             'root_password': 'osm4u',
555                             'user': 'mano'
556                         },
557                         'series': 'kubernetes'
558                     }
559                 }
560             }
561             """
562             # TODO: This should be returned in an agreed-upon format
563 1             kdu = bundle["applications"]
564
565 1         return kdu
566
567 1     async def help_kdu(
568         self,
569         kdu_model: str,
570     ) -> str:
571         """View the README
572
573         If available, returns the README of the bundle.
574
575         :param kdu_model str: The name or path of a bundle
576
577         :return: If found, returns the contents of the README.
578         """
579 1         readme = None
580
581 1         files = ["README", "README.txt", "README.md"]
582 1         path = os.path.dirname(kdu_model)
583 1         for file in os.listdir(path):
584 1             if file in files:
585 1                 with open(file, "r") as f:
586 1                     readme = f.read()
587 1                     break
588
589 1         return readme
590
591 1     async def status_kdu(
592         self,
593         cluster_uuid: str,
594         kdu_instance: str,
595         complete_status: bool = False,
596         yaml_format: bool = False
597     ) -> dict:
598         """Get the status of the KDU
599
600         Get the current status of the KDU instance.
601
602         :param cluster_uuid str: The UUID of the cluster
603         :param kdu_instance str: The unique id of the KDU instance
604         :param complete_status: To get the complete_status of the KDU
605         :param yaml_format: To get the status in proper format for NSR record
606
607         :return: Returns a dictionary containing namespace, state, resources,
608                  and deployment_time and returns complete_status if complete_status is True
609         """
610 1         status = {}
611
612 1         model_status = await self.libjuju.get_model_status(kdu_instance)
613
614 1         if not complete_status:
615 1             for name in model_status.applications:
616 1                 application = model_status.applications[name]
617 1                 status[name] = {"status": application["status"]["status"]}
618         else:
619 0             if yaml_format:
620 0                 return obj_to_yaml(model_status)
621             else:
622 0                 return obj_to_dict(model_status)
623
624 1         return status
625
626 1     async def update_vca_status(self, vcastatus: dict, kdu_instance: str):
627         """
628         Add all configs, actions, executed actions of all applications in a model to vcastatus dict
629
630         :param vcastatus dict: dict containing vcastatus
631         :param kdu_instance str: The unique id of the KDU instance
632
633         :return: None
634         """
635 1         try:
636 1             for model_name in vcastatus:
637                 # Adding executed actions
638 1                 vcastatus[model_name]["executedActions"] = \
639                     await self.libjuju.get_executed_actions(kdu_instance)
640
641 1                 for application in vcastatus[model_name]["applications"]:
642                     # Adding application actions
643 1                     vcastatus[model_name]["applications"][application]["actions"] = \
644                         await self.libjuju.get_actions(application, kdu_instance)
645                     # Adding application configs
646 1                     vcastatus[model_name]["applications"][application]["configs"] = \
647                         await self.libjuju.get_application_configs(kdu_instance, application)
648
649 1         except Exception as e:
650 1             self.log.debug("Error in updating vca status: {}".format(str(e)))
651
652 1     async def get_services(
653         self, cluster_uuid: str, kdu_instance: str, namespace: str
654     ) -> list:
655         """Return a list of services of a kdu_instance"""
656
657 1         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
658
659 1         kubecfg = tempfile.NamedTemporaryFile()
660 1         with open(kubecfg.name, "w") as kubecfg_file:
661 1             kubecfg_file.write(credentials)
662 1         kubectl = Kubectl(config_file=kubecfg.name)
663
664 1         return kubectl.get_services(
665             field_selector="metadata.namespace={}".format(kdu_instance)
666         )
667
668 1     async def get_service(
669         self, cluster_uuid: str, service_name: str, namespace: str
670     ) -> object:
671         """Return data for a specific service inside a namespace"""
672
673 1         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
674
675 1         kubecfg = tempfile.NamedTemporaryFile()
676 1         with open(kubecfg.name, "w") as kubecfg_file:
677 1             kubecfg_file.write(credentials)
678 1         kubectl = Kubectl(config_file=kubecfg.name)
679
680 1         return kubectl.get_services(
681             field_selector="metadata.name={},metadata.namespace={}".format(
682                 service_name, namespace
683             )
684         )[0]
685
686 1     def get_credentials(self, cluster_uuid: str) -> str:
687         """
688         Get Cluster Kubeconfig
689         """
690 1         k8scluster = self.db.get_one(
691             "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
692         )
693
694 1         self.db.encrypt_decrypt_fields(
695             k8scluster.get("credentials"),
696             "decrypt",
697             ["password", "secret"],
698             schema_version=k8scluster["schema_version"],
699             salt=k8scluster["_id"],
700         )
701
702 1         return yaml.safe_dump(k8scluster.get("credentials"))
703
704 1     def _get_credential_name(self, cluster_uuid: str) -> str:
705         """
706         Get credential name for a k8s cloud
707
708         We cannot use the cluster_uuid for the credential name directly,
709         because it cannot start with a number, it must start with a letter.
710         Therefore, the k8s cloud credential name will be "cred-" followed
711         by the cluster uuid.
712
713         :param: cluster_uuid:   Cluster UUID of the kubernetes cloud (=cloud_name)
714
715         :return:                Name to use for the credential name.
716         """
717 1         return "cred-{}".format(cluster_uuid)
718
719 1     def get_namespace(
720         self,
721         cluster_uuid: str,
722     ) -> str:
723         """Get the namespace UUID
724         Gets the namespace's unique name
725
726         :param cluster_uuid str: The UUID of the cluster
727         :returns: The namespace UUID, or raises an exception
728         """
729 1         pass
730
731 1     def _create_cluster_role(
732         self,
733         kubectl: Kubectl,
734         name: str,
735         labels: Dict[str, str],
736     ):
737 0         cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
738             field_selector="metadata.name={}".format(name)
739         )
740
741 0         if len(cluster_roles.items) > 0:
742 0             raise Exception(
743                 "Cluster role with metadata.name={} already exists".format(name)
744             )
745
746 0         metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
747         # Cluster role
748 0         cluster_role = V1ClusterRole(
749             metadata=metadata,
750             rules=[
751                 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
752                 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
753             ],
754         )
755
756 0         kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
757
758 1     def _delete_cluster_role(self, kubectl: Kubectl, name: str):
759 0         kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
760
761 1     def _create_service_account(
762         self,
763         kubectl: Kubectl,
764         name: str,
765         labels: Dict[str, str],
766     ):
767 0         service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
768             ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
769         )
770 0         if len(service_accounts.items) > 0:
771 0             raise Exception(
772                 "Service account with metadata.name={} already exists".format(name)
773             )
774
775 0         metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
776 0         service_account = V1ServiceAccount(metadata=metadata)
777
778 0         kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
779             ADMIN_NAMESPACE, service_account
780         )
781
782 1     def _delete_service_account(self, kubectl: Kubectl, name: str):
783 0         kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
784             name, ADMIN_NAMESPACE
785         )
786
787 1     def _create_cluster_role_binding(
788         self,
789         kubectl: Kubectl,
790         name: str,
791         labels: Dict[str, str],
792     ):
793 0         role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
794             field_selector="metadata.name={}".format(name)
795         )
796 0         if len(role_bindings.items) > 0:
797 0             raise Exception("Generated rbac id already exists")
798
799 0         role_binding = V1ClusterRoleBinding(
800             metadata=V1ObjectMeta(name=name, labels=labels),
801             role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
802             subjects=[
803                 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
804             ],
805         )
806 0         kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
807
808 1     def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
809 0         kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
810
811 1     async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
812 0         v1_core = kubectl.clients[CORE_CLIENT]
813
814 0         retries_limit = 10
815 0         secret_name = None
816 0         while True:
817 0             retries_limit -= 1
818 0             service_accounts = v1_core.list_namespaced_service_account(
819                 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
820             )
821 0             if len(service_accounts.items) == 0:
822 0                 raise Exception(
823                     "Service account not found with metadata.name={}".format(name)
824                 )
825 0             service_account = service_accounts.items[0]
826 0             if service_account.secrets and len(service_account.secrets) > 0:
827 0                 secret_name = service_account.secrets[0].name
828 0             if secret_name is not None or not retries_limit:
829 0                 break
830 0         if not secret_name:
831 0             raise Exception(
832                 "Failed getting the secret from service account {}".format(name)
833             )
834 0         secret = v1_core.list_namespaced_secret(
835             ADMIN_NAMESPACE,
836             field_selector="metadata.name={}".format(secret_name),
837         ).items[0]
838
839 0         token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
840 0         client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
841
842 0         return (
843             base64.b64decode(token).decode("utf-8"),
844             base64.b64decode(client_certificate_data).decode("utf-8"),
845         )
846
847 1     @staticmethod
848     def generate_kdu_instance_name(**kwargs):
849 0         db_dict = kwargs.get("db_dict")
850 0         kdu_name = kwargs.get("kdu_name", None)
851 0         if kdu_name:
852 0             kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
853         else:
854 0             kdu_instance = db_dict["filter"]["_id"]
855 0         return kdu_instance