Code Coverage

Cobertura Coverage Report > n2vc >

k8s_juju_conn.py

Trend

Classes100%
 
Lines82%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
k8s_juju_conn.py
100%
1/1
82%
208/255
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_juju_conn.py
82%
208/255
N/A

Source

n2vc/k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 import asyncio
16 1 import os
17 1 import uuid
18 1 import yaml
19 1 import tempfile
20 1 import binascii
21 1 import base64
22
23 1 from n2vc.config import ModelConfig
24 1 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
25 1 from n2vc.k8s_conn import K8sConnector
26 1 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
27 1 from .exceptions import MethodNotImplemented
28 1 from n2vc.utils import base64_to_cacert
29 1 from n2vc.libjuju import Libjuju
30
31 1 from kubernetes.client.models import (
32     V1ClusterRole,
33     V1ObjectMeta,
34     V1PolicyRule,
35     V1ServiceAccount,
36     V1ClusterRoleBinding,
37     V1RoleRef,
38     V1Subject,
39 )
40
41 1 from typing import Dict
42
43 1 SERVICE_ACCOUNT_TOKEN_KEY = "token"
44 1 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
45 1 RBAC_LABEL_KEY_NAME = "rbac-id"
46
47 1 ADMIN_NAMESPACE = "kube-system"
48 1 RBAC_STACK_PREFIX = "juju-credential"
49
50 # from juju.bundle import BundleHandler
51 # import re
52 # import ssl
53 # from .vnf import N2VC
54
55
56 1 def generate_rbac_id():
57 1     return binascii.hexlify(os.urandom(4)).decode()
58
59
60 1 class K8sJujuConnector(K8sConnector):
61 1     def __init__(
62         self,
63         fs: object,
64         db: object,
65         kubectl_command: str = "/usr/bin/kubectl",
66         juju_command: str = "/usr/bin/juju",
67         log: object = None,
68         loop: object = None,
69         on_update_db=None,
70         vca_config: dict = None,
71     ):
72         """
73         :param fs: file system for kubernetes and helm configuration
74         :param db: Database object
75         :param kubectl_command: path to kubectl executable
76         :param helm_command: path to helm executable
77         :param log: logger
78         :param: loop: Asyncio loop
79         """
80
81         # parent class
82 1         K8sConnector.__init__(
83             self,
84             db,
85             log=log,
86             on_update_db=on_update_db,
87         )
88
89 1         self.fs = fs
90 1         self.loop = loop or asyncio.get_event_loop()
91 1         self.log.debug("Initializing K8S Juju connector")
92
93 1         required_vca_config = [
94             "host",
95             "user",
96             "secret",
97             "ca_cert",
98         ]
99 1         if not vca_config or not all(k in vca_config for k in required_vca_config):
100 1             raise N2VCBadArgumentsException(
101                 message="Missing arguments in vca_config: {}".format(vca_config),
102                 bad_args=required_vca_config,
103             )
104 1         port = vca_config["port"] if "port" in vca_config else 17070
105 1         url = "{}:{}".format(vca_config["host"], port)
106 1         model_config = ModelConfig(vca_config)
107 1         username = vca_config["user"]
108 1         secret = vca_config["secret"]
109 1         ca_cert = base64_to_cacert(vca_config["ca_cert"])
110
111 1         self.libjuju = Libjuju(
112             endpoint=url,
113             api_proxy=None,  # Not needed for k8s charms
114             model_config=model_config,
115             username=username,
116             password=secret,
117             cacert=ca_cert,
118             loop=self.loop,
119             log=self.log,
120             db=self.db,
121         )
122 1         self.log.debug("K8S Juju connector initialized")
123         # TODO: Remove these commented lines:
124         # self.authenticated = False
125         # self.models = {}
126         # self.juju_secret = ""
127
128     """Initialization"""
129
130 1     async def init_env(
131         self,
132         k8s_creds: str,
133         namespace: str = "kube-system",
134         reuse_cluster_uuid: str = None,
135     ) -> (str, bool):
136         """
137         It prepares a given K8s cluster environment to run Juju bundles.
138
139         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
140             '.kube/config'
141         :param namespace: optional namespace to be used for juju. By default,
142             'kube-system' will be used
143         :param reuse_cluster_uuid: existing cluster uuid for reuse
144         :return: uuid of the K8s cluster and True if connector has installed some
145             software in the cluster
146             (on error, an exception will be raised)
147         """
148
149         # """Bootstrapping
150
151         # Bootstrapping cannot be done, by design, through the API. We need to
152         # use the CLI tools.
153         # """
154
155         # """
156         # WIP: Workflow
157
158         # 1. Has the environment already been bootstrapped?
159         # - Check the database to see if we have a record for this env
160
161         # 2. If this is a new env, create it
162         # - Add the k8s cloud to Juju
163         # - Bootstrap
164         # - Record it in the database
165
166         # 3. Connect to the Juju controller for this cloud
167
168         # """
169         # cluster_uuid = reuse_cluster_uuid
170         # if not cluster_uuid:
171         #     cluster_uuid = str(uuid4())
172
173         ##################################################
174         # TODO: Pull info from db based on the namespace #
175         ##################################################
176
177         ###################################################
178         # TODO: Make it idempotent, calling add-k8s and   #
179         # bootstrap whenever reuse_cluster_uuid is passed #
180         # as parameter                                    #
181         # `init_env` is called to initialize the K8s      #
182         # cluster for juju. If this initialization fails, #
183         # it can be called again by LCM with the param    #
184         # reuse_cluster_uuid, e.g. to try to fix it.       #
185         ###################################################
186
187         # This is a new cluster, so bootstrap it
188
189 1         cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
190
191         # Is a local k8s cluster?
192         # localk8s = self.is_local_k8s(k8s_creds)
193
194         # If the k8s is external, the juju controller needs a loadbalancer
195         # loadbalancer = False if localk8s else True
196
197         # Name the new k8s cloud
198         # k8s_cloud = "k8s-{}".format(cluster_uuid)
199
200         # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
201         # await self.add_k8s(k8s_cloud, k8s_creds)
202
203         # Bootstrap Juju controller
204         # self.log.debug("Bootstrapping...")
205         # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
206         # self.log.debug("Bootstrap done.")
207
208         # Get the controller information
209
210         # Parse ~/.local/share/juju/controllers.yaml
211         # controllers.testing.api-endpoints|ca-cert|uuid
212         # self.log.debug("Getting controller endpoints")
213         # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
214         #     controllers = yaml.load(f, Loader=yaml.Loader)
215         #     controller = controllers["controllers"][cluster_uuid]
216         #     endpoints = controller["api-endpoints"]
217         #     juju_endpoint = endpoints[0]
218         #     juju_ca_cert = controller["ca-cert"]
219
220         # Parse ~/.local/share/juju/accounts
221         # controllers.testing.user|password
222         # self.log.debug("Getting accounts")
223         # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
224         #     controllers = yaml.load(f, Loader=yaml.Loader)
225         #     controller = controllers["controllers"][cluster_uuid]
226
227         #     juju_user = controller["user"]
228         #     juju_secret = controller["password"]
229
230         # config = {
231         #     "endpoint": juju_endpoint,
232         #     "username": juju_user,
233         #     "secret": juju_secret,
234         #     "cacert": juju_ca_cert,
235         #     "loadbalancer": loadbalancer,
236         # }
237
238         # Store the cluster configuration so it
239         # can be used for subsequent calls
240 1         kubecfg = tempfile.NamedTemporaryFile()
241 1         with open(kubecfg.name, "w") as kubecfg_file:
242 1             kubecfg_file.write(k8s_creds)
243 1         kubectl = Kubectl(config_file=kubecfg.name)
244
245         # CREATING RESOURCES IN K8S
246 1         rbac_id = generate_rbac_id()
247 1         metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
248 1         labels = {RBAC_STACK_PREFIX: rbac_id}
249
250         # Create cleanup dictionary to clean up created resources
251         # if it fails in the middle of the process
252 1         cleanup_data = []
253 1         try:
254 1             self._create_cluster_role(
255                 kubectl,
256                 name=metadata_name,
257                 labels=labels,
258             )
259 1             cleanup_data.append(
260                 {
261                     "delete": self._delete_cluster_role,
262                     "args": (kubectl, metadata_name),
263                 }
264             )
265
266 1             self._create_service_account(
267                 kubectl,
268                 name=metadata_name,
269                 labels=labels,
270             )
271 1             cleanup_data.append(
272                 {
273                     "delete": self._delete_service_account,
274                     "args": (kubectl, metadata_name),
275                 }
276             )
277
278 1             self._create_cluster_role_binding(
279                 kubectl,
280                 name=metadata_name,
281                 labels=labels,
282             )
283 1             cleanup_data.append(
284                 {
285                     "delete": self._delete_service_account,
286                     "args": (kubectl, metadata_name),
287                 }
288             )
289 1             token, client_cert_data = await self._get_secret_data(
290                 kubectl,
291                 metadata_name,
292             )
293
294 1             default_storage_class = kubectl.get_default_storage_class()
295 1             await self.libjuju.add_k8s(
296                 name=cluster_uuid,
297                 rbac_id=rbac_id,
298                 token=token,
299                 client_cert_data=client_cert_data,
300                 configuration=kubectl.configuration,
301                 storage_class=default_storage_class,
302                 credential_name=self._get_credential_name(cluster_uuid),
303             )
304             # self.log.debug("Setting config")
305             # await self.set_config(cluster_uuid, config)
306
307             # Test connection
308             # controller = await self.get_controller(cluster_uuid)
309             # await controller.disconnect()
310
311             # TODO: Remove these commented lines
312             # raise Exception("EOL")
313             # self.juju_public_key = None
314             # Login to the k8s cluster
315             # if not self.authenticated:
316             #     await self.login(cluster_uuid)
317
318             # We're creating a new cluster
319             # print("Getting model {}".format(self.get_namespace(cluster_uuid),
320             #    cluster_uuid=cluster_uuid))
321             # model = await self.get_model(
322             #    self.get_namespace(cluster_uuid),
323             #    cluster_uuid=cluster_uuid
324             # )
325
326             # Disconnect from the model
327             # if model and model.is_connected():
328             #    await model.disconnect()
329
330 1             return cluster_uuid, True
331 1         except Exception as e:
332 1             self.log.error("Error initializing k8scluster: {}".format(e))
333 1             if len(cleanup_data) > 0:
334 1                 self.log.debug("Cleaning up created resources in k8s cluster...")
335 1                 for item in cleanup_data:
336 1                     delete_function = item["delete"]
337 1                     delete_args = item["args"]
338 1                     delete_function(*delete_args)
339 1                 self.log.debug("Cleanup finished")
340 1             raise e
341
342     """Repo Management"""
343
344 1     async def repo_add(
345         self,
346         name: str,
347         url: str,
348         _type: str = "charm",
349     ):
350 1         raise MethodNotImplemented()
351
352 1     async def repo_list(self):
353 1         raise MethodNotImplemented()
354
355 1     async def repo_remove(
356         self,
357         name: str,
358     ):
359 1         raise MethodNotImplemented()
360
361 1     async def synchronize_repos(self, cluster_uuid: str, name: str):
362         """
363         Returns None as currently add_repo is not implemented
364         """
365 1         return None
366
367     """Reset"""
368
369 1     async def reset(
370         self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
371     ) -> bool:
372         """Reset a cluster
373
374         Resets the Kubernetes cluster by removing the model that represents it.
375
376         :param cluster_uuid str: The UUID of the cluster to reset
377         :return: Returns True if successful or raises an exception.
378         """
379
380 1         try:
381             # Remove k8scluster from database
382             # self.log.debug("[reset] Removing k8scluster from juju database")
383             # juju_db = self.db.get_one("admin", {"_id": "juju"})
384
385             # for k in juju_db["k8sclusters"]:
386             #     if k["_id"] == cluster_uuid:
387             #         juju_db["k8sclusters"].remove(k)
388             #         self.db.set_one(
389             #             table="admin",
390             #             q_filter={"_id": "juju"},
391             #             update_dict={"k8sclusters": juju_db["k8sclusters"]},
392             #         )
393             #         break
394
395             # Destroy the controller (via CLI)
396             # self.log.debug("[reset] Destroying controller")
397             # await self.destroy_controller(cluster_uuid)
398 1             self.log.debug("[reset] Removing k8s cloud")
399             # k8s_cloud = "k8s-{}".format(cluster_uuid)
400             # await self.remove_cloud(k8s_cloud)
401
402 1             cloud_creds = await self.libjuju.get_cloud_credentials(
403                 cluster_uuid,
404                 self._get_credential_name(cluster_uuid),
405             )
406
407 1             await self.libjuju.remove_cloud(cluster_uuid)
408
409 1             kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
410
411 1             kubecfg_file = tempfile.NamedTemporaryFile()
412 1             with open(kubecfg_file.name, "w") as f:
413 1                 f.write(kubecfg)
414 1             kubectl = Kubectl(config_file=kubecfg_file.name)
415
416 1             delete_functions = [
417                 self._delete_cluster_role_binding,
418                 self._delete_service_account,
419                 self._delete_cluster_role,
420             ]
421
422 1             credential_attrs = cloud_creds[0].result["attrs"]
423 1             if RBAC_LABEL_KEY_NAME in credential_attrs:
424 1                 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
425 1                 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
426 1                 delete_args = (kubectl, metadata_name)
427 1                 for delete_func in delete_functions:
428 1                     try:
429 1                         delete_func(*delete_args)
430 0                     except Exception as e:
431 0                         self.log.warning("Cannot remove resource in K8s {}".format(e))
432
433 1         except Exception as e:
434 1             self.log.debug("Caught exception during reset: {}".format(e))
435 1             raise e
436 1         return True
437         # TODO: Remove these commented lines
438         #     if not self.authenticated:
439         #         await self.login(cluster_uuid)
440
441         #     if self.controller.is_connected():
442         #         # Destroy the model
443         #         namespace = self.get_namespace(cluster_uuid)
444         #         if await self.has_model(namespace):
445         #             self.log.debug("[reset] Destroying model")
446         #             await self.controller.destroy_model(namespace, destroy_storage=True)
447
448         #         # Disconnect from the controller
449         #         self.log.debug("[reset] Disconnecting controller")
450         #         await self.logout()
451
452     """Deployment"""
453
454 1     async def install(
455         self,
456         cluster_uuid: str,
457         kdu_model: str,
458         kdu_instance: str,
459         atomic: bool = True,
460         timeout: float = 1800,
461         params: dict = None,
462         db_dict: dict = None,
463         kdu_name: str = None,
464         namespace: str = None,
465     ) -> bool:
466         """Install a bundle
467
468         :param cluster_uuid str: The UUID of the cluster to install to
469         :param kdu_model str: The name or path of a bundle to install
470         :param kdu_instance: Kdu instance name
471         :param atomic bool: If set, waits until the model is active and resets
472                             the cluster on failure.
473         :param timeout int: The time, in seconds, to wait for the install
474                             to finish
475         :param params dict: Key-value pairs of instantiation parameters
476         :param kdu_name: Name of the KDU instance to be installed
477         :param namespace: K8s namespace to use for the KDU instance
478
479         :return: If successful, returns ?
480         """
481 1         bundle = kdu_model
482
483         # controller = await self.get_controller(cluster_uuid)
484
485         ##
486         # Get or create the model, based on the NS
487         # uuid.
488
489 1         if not db_dict:
490 1             raise K8sException("db_dict must be set")
491 1         if not bundle:
492 1             raise K8sException("bundle must be set")
493
494 1         if bundle.startswith("cs:"):
495 1             pass
496 1         elif bundle.startswith("http"):
497             # Download the file
498 1             pass
499         else:
500 1             new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
501 1             os.chdir(new_workdir)
502 1             bundle = "local:{}".format(kdu_model)
503
504 1         self.log.debug("Checking for model named {}".format(kdu_instance))
505
506         # Create the new model
507 1         self.log.debug("Adding model: {}".format(kdu_instance))
508 1         await self.libjuju.add_model(
509             model_name=kdu_instance,
510             cloud_name=cluster_uuid,
511             credential_name=self._get_credential_name(cluster_uuid),
512         )
513
514         # if model:
515         # TODO: Instantiation parameters
516
517         """
518         "Juju bundle that models the KDU, in any of the following ways:
519             - <juju-repo>/<juju-bundle>
520             - <juju-bundle folder under k8s_models folder in the package>
521             - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
522                 in the package>
523             - <URL_where_to_fetch_juju_bundle>
524         """
525 1         try:
526 1             previous_workdir = os.getcwd()
527 1         except FileNotFoundError:
528 1             previous_workdir = "/app/storage"
529
530 1         self.log.debug("[install] deploying {}".format(bundle))
531 1         await self.libjuju.deploy(
532             bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
533         )
534
535         # Get the application
536         # if atomic:
537         #     # applications = model.applications
538         #     self.log.debug("[install] Applications: {}".format(model.applications))
539         #     for name in model.applications:
540         #         self.log.debug("[install] Waiting for {} to settle".format(name))
541         #         application = model.applications[name]
542         #         try:
543         #             # It's not enough to wait for all units to be active;
544         #             # the application status needs to be active as well.
545         #             self.log.debug("Waiting for all units to be active...")
546         #             await model.block_until(
547         #                 lambda: all(
548         #                     unit.agent_status == "idle"
549         #                     and application.status in ["active", "unknown"]
550         #                     and unit.workload_status in ["active", "unknown"]
551         #                     for unit in application.units
552         #                 ),
553         #                 timeout=timeout,
554         #             )
555         #             self.log.debug("All units active.")
556
557         #         # TODO use asyncio.TimeoutError
558         #         except concurrent.futures._base.TimeoutError:
559         #             os.chdir(previous_workdir)
560         #             self.log.debug("[install] Timeout exceeded; resetting cluster")
561         #             await self.reset(cluster_uuid)
562         #             return False
563
564         # Wait for the application to be active
565         # if model.is_connected():
566         #     self.log.debug("[install] Disconnecting model")
567         #     await model.disconnect()
568         # await controller.disconnect()
569 1         os.chdir(previous_workdir)
570 1         return True
571
572 1     async def instances_list(self, cluster_uuid: str) -> list:
573         """
574         returns a list of deployed releases in a cluster
575
576         :param cluster_uuid: the cluster
577         :return:
578         """
579 1         return []
580
581 1     async def upgrade(
582         self,
583         cluster_uuid: str,
584         kdu_instance: str,
585         kdu_model: str = None,
586         params: dict = None,
587     ) -> str:
588         """Upgrade a model
589
590         :param cluster_uuid str: The UUID of the cluster to upgrade
591         :param kdu_instance str: The unique name of the KDU instance
592         :param kdu_model str: The name or path of the bundle to upgrade to
593         :param params dict: Key-value pairs of instantiation parameters
594
595         :return: If successful, reference to the new revision number of the
596                  KDU instance.
597         """
598
599         # TODO: Loop through the bundle and upgrade each charm individually
600
601         """
602         The API doesn't have a concept of bundle upgrades, because there are
603         many possible changes: charm revision, disk, number of units, etc.
604
605         As such, we are only supporting a limited subset of upgrades. We'll
606         upgrade the charm revision but leave storage and scale untouched.
607
608         Scale changes should happen through OSM constructs, and changes to
609         storage would require a redeployment of the service, at least in this
610         initial release.
611         """
612 1         raise MethodNotImplemented()
613         # TODO: Remove these commented lines
614
615         # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
616
617         # model = None
618         # namespace = self.get_namespace(cluster_uuid)
619         # controller = await self.get_controller(cluster_uuid)
620
621         # try:
622         #     if namespace not in await controller.list_models():
623         #         raise N2VCNotFound(message="Model {} does not exist".format(namespace))
624
625         #     model = await controller.get_model(namespace)
626         #     with open(kdu_model, "r") as f:
627         #         bundle = yaml.safe_load(f)
628
629         #         """
630         #         {
631         #             'description': 'Test bundle',
632         #             'bundle': 'kubernetes',
633         #             'applications': {
634         #                 'mariadb-k8s': {
635         #                     'charm': 'cs:~charmed-osm/mariadb-k8s-20',
636         #                     'scale': 1,
637         #                     'options': {
638         #                         'password': 'manopw',
639         #                         'root_password': 'osm4u',
640         #                         'user': 'mano'
641         #                     },
642         #                     'series': 'kubernetes'
643         #                 }
644         #             }
645         #         }
646         #         """
647         #         # TODO: This should be returned in an agreed-upon format
648         #         for name in bundle["applications"]:
649         #             self.log.debug(model.applications)
650         #             application = model.applications[name]
651         #             self.log.debug(application)
652
653         #             path = bundle["applications"][name]["charm"]
654
655         #             try:
656         #                 await application.upgrade_charm(switch=path)
657         #             except juju.errors.JujuError as ex:
658         #                 if "already running charm" in str(ex):
659         #                     # We're already running this version
660         #                     pass
661         # finally:
662         #     if model:
663         #         await model.disconnect()
664         #     await controller.disconnect()
665         # return True
666
667     """Rollback"""
668
669 1     async def rollback(
670         self,
671         cluster_uuid: str,
672         kdu_instance: str,
673         revision: int = 0,
674     ) -> str:
675         """Rollback a model
676
677         :param cluster_uuid str: The UUID of the cluster to rollback
678         :param kdu_instance str: The unique name of the KDU instance
679         :param revision int: The revision to revert to. If omitted, rolls back
680                              the previous upgrade.
681
682         :return: If successful, returns the revision of active KDU instance,
683                  or raises an exception
684         """
685 1         raise MethodNotImplemented()
686
687     """Deletion"""
688
689 1     async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
690         """Uninstall a KDU instance
691
692         :param cluster_uuid str: The UUID of the cluster
693         :param kdu_instance str: The unique name of the KDU instance
694
695         :return: Returns True if successful, or raises an exception
696         """
697
698         # controller = await self.get_controller(cluster_uuid)
699
700 1         self.log.debug("[uninstall] Destroying model")
701
702 1         await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
703
704         # self.log.debug("[uninstall] Model destroyed and disconnecting")
705         # await controller.disconnect()
706
707 1         return True
708         # TODO: Remove these commented lines
709         # if not self.authenticated:
710         #     self.log.debug("[uninstall] Connecting to controller")
711         #     await self.login(cluster_uuid)
712
713 1     async def exec_primitive(
714         self,
715         cluster_uuid: str = None,
716         kdu_instance: str = None,
717         primitive_name: str = None,
718         timeout: float = 300,
719         params: dict = None,
720         db_dict: dict = None,
721     ) -> str:
722         """Exec primitive (Juju action)
723
724         :param cluster_uuid str: The UUID of the cluster
725         :param kdu_instance str: The unique name of the KDU instance
726         :param primitive_name: Name of action that will be executed
727         :param timeout: Timeout for action execution
728         :param params: Dictionary of all the parameters needed for the action
729         :db_dict: Dictionary for any additional data
730
731         :return: Returns the output of the action
732         """
733
734         # controller = await self.get_controller(cluster_uuid)
735
736 1         if not params or "application-name" not in params:
737 1             raise K8sException(
738                 "Missing application-name argument, \
739                                 argument needed for K8s actions"
740             )
741 1         try:
742 1             self.log.debug(
743                 "[exec_primitive] Getting model "
744                 "kdu_instance: {}".format(kdu_instance)
745             )
746 1             application_name = params["application-name"]
747 1             actions = await self.libjuju.get_actions(application_name, kdu_instance)
748 1             if primitive_name not in actions:
749 1                 raise K8sException("Primitive {} not found".format(primitive_name))
750 1             output, status = await self.libjuju.execute_action(
751                 application_name, kdu_instance, primitive_name, **params
752             )
753             # model = await self.get_model(kdu_instance, controller=controller)
754
755             # application_name = params["application-name"]
756             # application = model.applications[application_name]
757
758             # actions = await application.get_actions()
759             # if primitive_name not in actions:
760             #     raise K8sException("Primitive {} not found".format(primitive_name))
761
762             # unit = None
763             # for u in application.units:
764             #     if await u.is_leader_from_status():
765             #         unit = u
766             #         break
767
768             # if unit is None:
769             #     raise K8sException("No leader unit found to execute action")
770
771             # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
772             # action = await unit.run_action(primitive_name, **params)
773
774             # output = await model.get_action_output(action_uuid=action.entity_id)
775             # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
776
777             # status = (
778             #     status[action.entity_id] if action.entity_id in status else "failed"
779             # )
780
781 1             if status != "completed":
782 1                 raise K8sException(
783                     "status is not completed: {} output: {}".format(status, output)
784                 )
785
786 1             return output
787
788 1         except Exception as e:
789 1             error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
790 1             self.log.error(error_msg)
791 1             raise K8sException(message=error_msg)
792         # finally:
793         #     await controller.disconnect()
794         # TODO: Remove these commented lines:
795         # if not self.authenticated:
796         #     self.log.debug("[exec_primitive] Connecting to controller")
797         #     await self.login(cluster_uuid)
798
799     """Introspection"""
800
801 1     async def inspect_kdu(
802         self,
803         kdu_model: str,
804     ) -> dict:
805         """Inspect a KDU
806
807         Inspects a bundle and returns a dictionary of config parameters and
808         their default values.
809
810         :param kdu_model str: The name or path of the bundle to inspect.
811
812         :return: If successful, returns a dictionary of available parameters
813                  and their default values.
814         """
815
816 1         kdu = {}
817 1         if not os.path.exists(kdu_model):
818 1             raise K8sException("file {} not found".format(kdu_model))
819
820 1         with open(kdu_model, "r") as f:
821 1             bundle = yaml.safe_load(f.read())
822
823             """
824             {
825                 'description': 'Test bundle',
826                 'bundle': 'kubernetes',
827                 'applications': {
828                     'mariadb-k8s': {
829                         'charm': 'cs:~charmed-osm/mariadb-k8s-20',
830                         'scale': 1,
831                         'options': {
832                             'password': 'manopw',
833                             'root_password': 'osm4u',
834                             'user': 'mano'
835                         },
836                         'series': 'kubernetes'
837                     }
838                 }
839             }
840             """
841             # TODO: This should be returned in an agreed-upon format
842 1             kdu = bundle["applications"]
843
844 1         return kdu
845
846 1     async def help_kdu(
847         self,
848         kdu_model: str,
849     ) -> str:
850         """View the README
851
852         If available, returns the README of the bundle.
853
854         :param kdu_model str: The name or path of a bundle
855
856         :return: If found, returns the contents of the README.
857         """
858 1         readme = None
859
860 1         files = ["README", "README.txt", "README.md"]
861 1         path = os.path.dirname(kdu_model)
862 1         for file in os.listdir(path):
863 1             if file in files:
864 1                 with open(file, "r") as f:
865 1                     readme = f.read()
866 1                     break
867
868 1         return readme
869
870 1     async def status_kdu(
871         self,
872         cluster_uuid: str,
873         kdu_instance: str,
874     ) -> dict:
875         """Get the status of the KDU
876
877         Get the current status of the KDU instance.
878
879         :param cluster_uuid str: The UUID of the cluster
880         :param kdu_instance str: The unique id of the KDU instance
881
882         :return: Returns a dictionary containing namespace, state, resources,
883                  and deployment_time.
884         """
885 1         status = {}
886         # controller = await self.get_controller(cluster_uuid)
887         # model = await self.get_model(kdu_instance, controller=controller)
888
889         # model_status = await model.get_status()
890         # status = model_status.applications
891 1         model_status = await self.libjuju.get_model_status(kdu_instance)
892 1         for name in model_status.applications:
893 1             application = model_status.applications[name]
894 1             status[name] = {"status": application["status"]["status"]}
895
896         # await model.disconnect()
897         # await controller.disconnect()
898
899 1         return status
900
901 1     async def get_services(
902         self, cluster_uuid: str, kdu_instance: str, namespace: str
903     ) -> list:
904         """Return a list of services of a kdu_instance"""
905
906 1         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
907
908         # config_path = "/tmp/{}".format(cluster_uuid)
909         # config_file = "{}/config".format(config_path)
910
911         # if not os.path.exists(config_path):
912         #     os.makedirs(config_path)
913         # with open(config_file, "w") as f:
914         #     f.write(credentials)
915
916 1         kubecfg = tempfile.NamedTemporaryFile()
917 1         with open(kubecfg.name, "w") as kubecfg_file:
918 1             kubecfg_file.write(credentials)
919 1         kubectl = Kubectl(config_file=kubecfg.name)
920
921 1         return kubectl.get_services(
922             field_selector="metadata.namespace={}".format(kdu_instance)
923         )
924
925 1     async def get_service(
926         self, cluster_uuid: str, service_name: str, namespace: str
927     ) -> object:
928         """Return data for a specific service inside a namespace"""
929
930 1         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
931
932         # config_path = "/tmp/{}".format(cluster_uuid)
933         # config_file = "{}/config".format(config_path)
934
935         # if not os.path.exists(config_path):
936         #     os.makedirs(config_path)
937         # with open(config_file, "w") as f:
938         #     f.write(credentials)
939
940 1         kubecfg = tempfile.NamedTemporaryFile()
941 1         with open(kubecfg.name, "w") as kubecfg_file:
942 1             kubecfg_file.write(credentials)
943 1         kubectl = Kubectl(config_file=kubecfg.name)
944
945 1         return kubectl.get_services(
946             field_selector="metadata.name={},metadata.namespace={}".format(
947                 service_name, namespace
948             )
949         )[0]
950
951     # Private methods
952     # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
953     #     """Add a k8s cloud to Juju
954
955     #     Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
956     #     Juju Controller.
957
958     #     :param cloud_name str: The name of the cloud to add.
959     #     :param credentials dict: A dictionary representing the output of
960     #         `kubectl config view --raw`.
961
962     #     :returns: True if successful, otherwise raises an exception.
963     #     """
964
965     #     cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
966     #     self.log.debug(cmd)
967
968     #     process = await asyncio.create_subprocess_exec(
969     #         *cmd,
970     #         stdout=asyncio.subprocess.PIPE,
971     #         stderr=asyncio.subprocess.PIPE,
972     #         stdin=asyncio.subprocess.PIPE,
973     #     )
974
975     #     # Feed the process the credentials
976     #     process.stdin.write(credentials.encode("utf-8"))
977     #     await process.stdin.drain()
978     #     process.stdin.close()
979
980     #     _stdout, stderr = await process.communicate()
981
982     #     return_code = process.returncode
983
984     #     self.log.debug("add-k8s return code: {}".format(return_code))
985
986     #     if return_code > 0:
987     #         raise Exception(stderr)
988
989     #     return True
990
991     # async def add_model(
992     #     self, model_name: str, cluster_uuid: str, controller: Controller
993     # ) -> Model:
994     #     """Adds a model to the controller
995
996     #     Adds a new model to the Juju controller
997
998     #     :param model_name str: The name of the model to add.
999     #     :param cluster_uuid str: ID of the cluster.
1000     #     :param controller: Controller object in which the model will be added
1001     #     :returns: The juju.model.Model object of the new model upon success or
1002     #               raises an exception.
1003     #     """
1004
1005     #     self.log.debug(
1006     #         "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
1007     #     )
1008     #     model = None
1009     #     try:
1010     #         if self.juju_public_key is not None:
1011     #             model = await controller.add_model(
1012     #                 model_name, config={"authorized-keys": self.juju_public_key}
1013     #             )
1014     #         else:
1015     #             model = await controller.add_model(model_name)
1016     #     except Exception as ex:
1017     #         self.log.debug(ex)
1018     #         self.log.debug("Caught exception: {}".format(ex))
1019     #         pass
1020
1021     #     return model
1022
1023     # async def bootstrap(
1024     #     self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
1025     # ) -> bool:
1026     #     """Bootstrap a Kubernetes controller
1027
1028     #     Bootstrap a Juju controller inside the Kubernetes cluster
1029
1030     #     :param cloud_name str: The name of the cloud.
1031     #     :param cluster_uuid str: The UUID of the cluster to bootstrap.
1032     #     :param loadbalancer bool: If the controller should use loadbalancer or not.
1033     #     :returns: True upon success or raises an exception.
1034     #     """
1035
1036     #     if not loadbalancer:
1037     #         cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1038     #     else:
1039     #         """
1040     #         For public clusters, specify that the controller service is using a
1041     #         LoadBalancer.
1042     #         """
1043     #         cmd = [
1044     #             self.juju_command,
1045     #             "bootstrap",
1046     #             cloud_name,
1047     #             cluster_uuid,
1048     #             "--config",
1049     #             "controller-service-type=loadbalancer",
1050     #         ]
1051
1052     #     self.log.debug(
1053     #         "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1054     #     )
1055
1056     #     process = await asyncio.create_subprocess_exec(
1057     #         *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1058     #     )
1059
1060     #     _stdout, stderr = await process.communicate()
1061
1062     #     return_code = process.returncode
1063
1064     #     if return_code > 0:
1065     #         #
1066     #         if b"already exists" not in stderr:
1067     #             raise Exception(stderr)
1068
1069     #     return True
1070
1071     # async def destroy_controller(self, cluster_uuid: str) -> bool:
1072     #     """Destroy a Kubernetes controller
1073
1074     #     Destroy an existing Kubernetes controller.
1075
1076     #     :param cluster_uuid str: The UUID of the cluster to bootstrap.
1077     #     :returns: True upon success or raises an exception.
1078     #     """
1079     #     cmd = [
1080     #         self.juju_command,
1081     #         "destroy-controller",
1082     #         "--destroy-all-models",
1083     #         "--destroy-storage",
1084     #         "-y",
1085     #         cluster_uuid,
1086     #     ]
1087
1088     #     process = await asyncio.create_subprocess_exec(
1089     #         *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1090     #     )
1091
1092     #     _stdout, stderr = await process.communicate()
1093
1094     #     return_code = process.returncode
1095
1096     #     if return_code > 0:
1097     #         #
1098     #         if "already exists" not in stderr:
1099     #             raise Exception(stderr)
1100
1101 1     def get_credentials(self, cluster_uuid: str) -> str:
1102         """
1103         Get Cluster Kubeconfig
1104         """
1105 1         k8scluster = self.db.get_one(
1106             "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
1107         )
1108
1109 1         self.db.encrypt_decrypt_fields(
1110             k8scluster.get("credentials"),
1111             "decrypt",
1112             ["password", "secret"],
1113             schema_version=k8scluster["schema_version"],
1114             salt=k8scluster["_id"],
1115         )
1116
1117 1         return yaml.safe_dump(k8scluster.get("credentials"))
1118
1119 1     def _get_credential_name(self, cluster_uuid: str) -> str:
1120         """
1121         Get credential name for a k8s cloud
1122
1123         We cannot use the cluster_uuid for the credential name directly,
1124         because it cannot start with a number, it must start with a letter.
1125         Therefore, the k8s cloud credential name will be "cred-" followed
1126         by the cluster uuid.
1127
1128         :param: cluster_uuid:   Cluster UUID of the kubernetes cloud (=cloud_name)
1129
1130         :return:                Name to use for the credential name.
1131         """
1132 1         return "cred-{}".format(cluster_uuid)
1133
1134     # def get_config(self, cluster_uuid: str,) -> dict:
1135     #     """Get the cluster configuration
1136
1137     #     Gets the configuration of the cluster
1138
1139     #     :param cluster_uuid str: The UUID of the cluster.
1140     #     :return: A dict upon success, or raises an exception.
1141     #     """
1142
1143     #     juju_db = self.db.get_one("admin", {"_id": "juju"})
1144     #     config = None
1145     #     for k in juju_db["k8sclusters"]:
1146     #         if k["_id"] == cluster_uuid:
1147     #             config = k["config"]
1148     #             self.db.encrypt_decrypt_fields(
1149     #                 config,
1150     #                 "decrypt",
1151     #                 ["secret", "cacert"],
1152     #                 schema_version="1.1",
1153     #                 salt=k["_id"],
1154     #             )
1155     #             break
1156     #     if not config:
1157     #         raise Exception(
1158     #             "Unable to locate configuration for cluster {}".format(cluster_uuid)
1159     #         )
1160     #     return config
1161
1162     # async def get_model(self, model_name: str, controller: Controller) -> Model:
1163     #     """Get a model from the Juju Controller.
1164
1165     #     Note: Model objects returned must call disconnected() before it goes
1166     #     out of scope.
1167
1168     #     :param model_name str: The name of the model to get
1169     #     :param controller Controller: Controller object
1170     #     :return The juju.model.Model object if found, or None.
1171     #     """
1172
1173     #     models = await controller.list_models()
1174     #     if model_name not in models:
1175     #         raise N2VCNotFound("Model {} not found".format(model_name))
1176     #     self.log.debug("Found model: {}".format(model_name))
1177     #     return await controller.get_model(model_name)
1178
1179 1     def get_namespace(
1180         self,
1181         cluster_uuid: str,
1182     ) -> str:
1183         """Get the namespace UUID
1184         Gets the namespace's unique name
1185
1186         :param cluster_uuid str: The UUID of the cluster
1187         :returns: The namespace UUID, or raises an exception
1188         """
1189         # config = self.get_config(cluster_uuid)
1190
1191         # Make sure the name is in the config
1192         # if "namespace" not in config:
1193         #     raise Exception("Namespace not found.")
1194
1195         # TODO: We want to make sure this is unique to the cluster, in case
1196         # the cluster is being reused.
1197         # Consider pre/appending the cluster id to the namespace string
1198 1         pass
1199
1200     # TODO: Remove these lines of code
1201     # async def has_model(self, model_name: str) -> bool:
1202     #     """Check if a model exists in the controller
1203
1204     #     Checks to see if a model exists in the connected Juju controller.
1205
1206     #     :param model_name str: The name of the model
1207     #     :return: A boolean indicating if the model exists
1208     #     """
1209     #     models = await self.controller.list_models()
1210
1211     #     if model_name in models:
1212     #         return True
1213     #     return False
1214
1215     # def is_local_k8s(self, credentials: str,) -> bool:
1216     #     """Check if a cluster is local
1217
1218     #     Checks if a cluster is running in the local host
1219
1220     #     :param credentials dict: A dictionary containing the k8s credentials
1221     #     :returns: A boolean if the cluster is running locally
1222     #     """
1223
1224     #     creds = yaml.safe_load(credentials)
1225
1226     #     if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1227     #         for cluster in creds["clusters"]:
1228     #             if "server" in cluster["cluster"]:
1229     #                 if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1230     #                     return True
1231
1232     #     return False
1233
1234     # async def get_controller(self, cluster_uuid):
1235     #     """Login to the Juju controller."""
1236
1237     #     config = self.get_config(cluster_uuid)
1238
1239     #     juju_endpoint = config["endpoint"]
1240     #     juju_user = config["username"]
1241     #     juju_secret = config["secret"]
1242     #     juju_ca_cert = config["cacert"]
1243
1244     #     controller = Controller()
1245
1246     #     if juju_secret:
1247     #         self.log.debug(
1248     #             "Connecting to controller... ws://{} as {}".format(
1249     #                 juju_endpoint, juju_user,
1250     #             )
1251     #         )
1252     #         try:
1253     #             await controller.connect(
1254     #                 endpoint=juju_endpoint,
1255     #                 username=juju_user,
1256     #                 password=juju_secret,
1257     #                 cacert=juju_ca_cert,
1258     #             )
1259     #             self.log.debug("JujuApi: Logged into controller")
1260     #             return controller
1261     #         except Exception as ex:
1262     #             self.log.debug(ex)
1263     #             self.log.debug("Caught exception: {}".format(ex))
1264     #     else:
1265     #         self.log.fatal("VCA credentials not configured.")
1266
1267     # TODO: Remove these commented lines
1268     #         self.authenticated = False
1269     # if self.authenticated:
1270     #         return
1271
1272     #     self.connecting = True
1273     #     juju_public_key = None
1274     #     self.authenticated = True
1275     #     Test: Make sure we have the credentials loaded
1276     # async def logout(self):
1277     #     """Logout of the Juju controller."""
1278     #     self.log.debug("[logout]")
1279     #     if not self.authenticated:
1280     #         return False
1281
1282     #     for model in self.models:
1283     #         self.log.debug("Logging out of model {}".format(model))
1284     #         await self.models[model].disconnect()
1285
1286     #     if self.controller:
1287     #         self.log.debug("Disconnecting controller {}".format(self.controller))
1288     #         await self.controller.disconnect()
1289     #         self.controller = None
1290
1291     #     self.authenticated = False
1292
1293     # async def remove_cloud(self, cloud_name: str,) -> bool:
1294     #     """Remove a k8s cloud from Juju
1295
1296     #     Removes a Kubernetes cloud from Juju.
1297
1298     #     :param cloud_name str: The name of the cloud to add.
1299
1300     #     :returns: True if successful, otherwise raises an exception.
1301     #     """
1302
1303     #     # Remove the bootstrapped controller
1304     #     cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1305     #     process = await asyncio.create_subprocess_exec(
1306     #         *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1307     #     )
1308
1309     #     _stdout, stderr = await process.communicate()
1310
1311     #     return_code = process.returncode
1312
1313     #     if return_code > 0:
1314     #         raise Exception(stderr)
1315
1316     #     # Remove the cloud from the local config
1317     #     cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1318     #     process = await asyncio.create_subprocess_exec(
1319     #         *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1320     #     )
1321
1322     #     _stdout, stderr = await process.communicate()
1323
1324     #     return_code = process.returncode
1325
1326     #     if return_code > 0:
1327     #         raise Exception(stderr)
1328
1329     #     return True
1330
1331     # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1332     #     """Save the cluster configuration
1333
1334     #     Saves the cluster information to the Mongo database
1335
1336     #     :param cluster_uuid str: The UUID of the cluster
1337     #     :param config dict: A dictionary containing the cluster configuration
1338     #     """
1339
1340     #     juju_db = self.db.get_one("admin", {"_id": "juju"})
1341
1342     #     k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1343     #     self.db.encrypt_decrypt_fields(
1344     #         config,
1345     #         "encrypt",
1346     #         ["secret", "cacert"],
1347     #         schema_version="1.1",
1348     #         salt=cluster_uuid,
1349     #     )
1350     #     k8sclusters.append({"_id": cluster_uuid, "config": config})
1351     #     self.db.set_one(
1352     #         table="admin",
1353     #         q_filter={"_id": "juju"},
1354     #         update_dict={"k8sclusters": k8sclusters},
1355     #     )
1356
1357     # Private methods to create/delete needed resources in the
1358     # Kubernetes cluster to create the K8s cloud in Juju
1359
1360 1     def _create_cluster_role(
1361         self,
1362         kubectl: Kubectl,
1363         name: str,
1364         labels: Dict[str, str],
1365     ):
1366 0         cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
1367             field_selector="metadata.name={}".format(name)
1368         )
1369
1370 0         if len(cluster_roles.items) > 0:
1371 0             raise Exception(
1372                 "Cluster role with metadata.name={} already exists".format(name)
1373             )
1374
1375 0         metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
1376         # Cluster role
1377 0         cluster_role = V1ClusterRole(
1378             metadata=metadata,
1379             rules=[
1380                 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
1381                 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
1382             ],
1383         )
1384
1385 0         kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
1386
1387 1     def _delete_cluster_role(self, kubectl: Kubectl, name: str):
1388 0         kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
1389
1390 1     def _create_service_account(
1391         self,
1392         kubectl: Kubectl,
1393         name: str,
1394         labels: Dict[str, str],
1395     ):
1396 0         service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
1397             ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
1398         )
1399 0         if len(service_accounts.items) > 0:
1400 0             raise Exception(
1401                 "Service account with metadata.name={} already exists".format(name)
1402             )
1403
1404 0         metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
1405 0         service_account = V1ServiceAccount(metadata=metadata)
1406
1407 0         kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
1408             ADMIN_NAMESPACE, service_account
1409         )
1410
1411 1     def _delete_service_account(self, kubectl: Kubectl, name: str):
1412 0         kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
1413             name, ADMIN_NAMESPACE
1414         )
1415
1416 1     def _create_cluster_role_binding(
1417         self,
1418         kubectl: Kubectl,
1419         name: str,
1420         labels: Dict[str, str],
1421     ):
1422 0         role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
1423             field_selector="metadata.name={}".format(name)
1424         )
1425 0         if len(role_bindings.items) > 0:
1426 0             raise Exception("Generated rbac id already exists")
1427
1428 0         role_binding = V1ClusterRoleBinding(
1429             metadata=V1ObjectMeta(name=name, labels=labels),
1430             role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
1431             subjects=[
1432                 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
1433             ],
1434         )
1435 0         kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
1436
1437 1     def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
1438 0         kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
1439
1440 1     async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
1441 0         v1_core = kubectl.clients[CORE_CLIENT]
1442
1443 0         retries_limit = 10
1444 0         secret_name = None
1445 0         while True:
1446 0             retries_limit -= 1
1447 0             service_accounts = v1_core.list_namespaced_service_account(
1448                 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
1449             )
1450 0             if len(service_accounts.items) == 0:
1451 0                 raise Exception(
1452                     "Service account not found with metadata.name={}".format(name)
1453                 )
1454 0             service_account = service_accounts.items[0]
1455 0             if service_account.secrets and len(service_account.secrets) > 0:
1456 0                 secret_name = service_account.secrets[0].name
1457 0             if secret_name is not None or not retries_limit:
1458 0                 break
1459 0         if not secret_name:
1460 0             raise Exception(
1461                 "Failed getting the secret from service account {}".format(name)
1462             )
1463 0         secret = v1_core.list_namespaced_secret(
1464             ADMIN_NAMESPACE,
1465             field_selector="metadata.name={}".format(secret_name),
1466         ).items[0]
1467
1468 0         token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
1469 0         client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
1470
1471 0         return (
1472             base64.b64decode(token).decode("utf-8"),
1473             base64.b64decode(client_certificate_data).decode("utf-8"),
1474         )
1475
1476 1     @staticmethod
1477     def generate_kdu_instance_name(**kwargs):
1478 0         db_dict = kwargs.get("db_dict")
1479 0         kdu_name = kwargs.get("kdu_name", None)
1480 0         if kdu_name:
1481 0             kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
1482         else:
1483 0             kdu_instance = db_dict["filter"]["_id"]
1484 0         return kdu_instance