Feature 10239: Distributed VCA
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.config import EnvironConfig
24 from n2vc.exceptions import K8sException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
27 from .exceptions import MethodNotImplemented
28 from n2vc.libjuju import Libjuju
29 from n2vc.utils import obj_to_dict, obj_to_yaml
30 from n2vc.store import MotorStore
31 from n2vc.vca.cloud import Cloud
32 from n2vc.vca.connection import get_connection
33 from kubernetes.client.models import (
34 V1ClusterRole,
35 V1ObjectMeta,
36 V1PolicyRule,
37 V1ServiceAccount,
38 V1ClusterRoleBinding,
39 V1RoleRef,
40 V1Subject,
41 )
42
43 from typing import Dict
44
45 SERVICE_ACCOUNT_TOKEN_KEY = "token"
46 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
47 RBAC_LABEL_KEY_NAME = "rbac-id"
48
49 ADMIN_NAMESPACE = "kube-system"
50 RBAC_STACK_PREFIX = "juju-credential"
51
52
53 def generate_rbac_id():
54 return binascii.hexlify(os.urandom(4)).decode()
55
56
57 class K8sJujuConnector(K8sConnector):
58 libjuju = None
59
60 def __init__(
61 self,
62 fs: object,
63 db: object,
64 kubectl_command: str = "/usr/bin/kubectl",
65 juju_command: str = "/usr/bin/juju",
66 log: object = None,
67 loop: object = None,
68 on_update_db=None,
69 ):
70 """
71 :param fs: file system for kubernetes and helm configuration
72 :param db: Database object
73 :param kubectl_command: path to kubectl executable
74 :param helm_command: path to helm executable
75 :param log: logger
76 :param: loop: Asyncio loop
77 """
78
79 # parent class
80 K8sConnector.__init__(
81 self,
82 db,
83 log=log,
84 on_update_db=on_update_db,
85 )
86
87 self.fs = fs
88 self.loop = loop or asyncio.get_event_loop()
89 self.log.debug("Initializing K8S Juju connector")
90
91 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
92 self._store = MotorStore(db_uri)
93 self.loading_libjuju = asyncio.Lock(loop=self.loop)
94
95 self.log.debug("K8S Juju connector initialized")
96 # TODO: Remove these commented lines:
97 # self.authenticated = False
98 # self.models = {}
99 # self.juju_secret = ""
100
101 """Initialization"""
102
103 async def init_env(
104 self,
105 k8s_creds: str,
106 namespace: str = "kube-system",
107 reuse_cluster_uuid: str = None,
108 **kwargs,
109 ) -> (str, bool):
110 """
111 It prepares a given K8s cluster environment to run Juju bundles.
112
113 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 '.kube/config'
115 :param namespace: optional namespace to be used for juju. By default,
116 'kube-system' will be used
117 :param reuse_cluster_uuid: existing cluster uuid for reuse
118 :param: kwargs: Additional parameters
119 vca_id (str): VCA ID
120
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
124 """
125 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
126
127 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
128
129 kubecfg = tempfile.NamedTemporaryFile()
130 with open(kubecfg.name, "w") as kubecfg_file:
131 kubecfg_file.write(k8s_creds)
132 kubectl = Kubectl(config_file=kubecfg.name)
133
134 # CREATING RESOURCES IN K8S
135 rbac_id = generate_rbac_id()
136 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
137 labels = {RBAC_STACK_PREFIX: rbac_id}
138
139 # Create cleanup dictionary to clean up created resources
140 # if it fails in the middle of the process
141 cleanup_data = []
142 try:
143 self._create_cluster_role(
144 kubectl,
145 name=metadata_name,
146 labels=labels,
147 )
148 cleanup_data.append(
149 {
150 "delete": self._delete_cluster_role,
151 "args": (kubectl, metadata_name),
152 }
153 )
154
155 self._create_service_account(
156 kubectl,
157 name=metadata_name,
158 labels=labels,
159 )
160 cleanup_data.append(
161 {
162 "delete": self._delete_service_account,
163 "args": (kubectl, metadata_name),
164 }
165 )
166
167 self._create_cluster_role_binding(
168 kubectl,
169 name=metadata_name,
170 labels=labels,
171 )
172 cleanup_data.append(
173 {
174 "delete": self._delete_service_account,
175 "args": (kubectl, metadata_name),
176 }
177 )
178 token, client_cert_data = await self._get_secret_data(
179 kubectl,
180 metadata_name,
181 )
182
183 default_storage_class = kubectl.get_default_storage_class()
184 await libjuju.add_k8s(
185 name=cluster_uuid,
186 rbac_id=rbac_id,
187 token=token,
188 client_cert_data=client_cert_data,
189 configuration=kubectl.configuration,
190 storage_class=default_storage_class,
191 credential_name=self._get_credential_name(cluster_uuid),
192 )
193 return cluster_uuid, True
194 except Exception as e:
195 self.log.error("Error initializing k8scluster: {}".format(e))
196 if len(cleanup_data) > 0:
197 self.log.debug("Cleaning up created resources in k8s cluster...")
198 for item in cleanup_data:
199 delete_function = item["delete"]
200 delete_args = item["args"]
201 delete_function(*delete_args)
202 self.log.debug("Cleanup finished")
203 raise e
204
205 """Repo Management"""
206
207 async def repo_add(
208 self,
209 name: str,
210 url: str,
211 _type: str = "charm",
212 ):
213 raise MethodNotImplemented()
214
215 async def repo_list(self):
216 raise MethodNotImplemented()
217
218 async def repo_remove(
219 self,
220 name: str,
221 ):
222 raise MethodNotImplemented()
223
224 async def synchronize_repos(self, cluster_uuid: str, name: str):
225 """
226 Returns None as currently add_repo is not implemented
227 """
228 return None
229
230 """Reset"""
231
232 async def reset(
233 self,
234 cluster_uuid: str,
235 force: bool = False,
236 uninstall_sw: bool = False,
237 **kwargs,
238 ) -> bool:
239 """Reset a cluster
240
241 Resets the Kubernetes cluster by removing the model that represents it.
242
243 :param cluster_uuid str: The UUID of the cluster to reset
244 :param force: Force reset
245 :param uninstall_sw: Boolean to uninstall sw
246 :param: kwargs: Additional parameters
247 vca_id (str): VCA ID
248
249 :return: Returns True if successful or raises an exception.
250 """
251
252 try:
253 self.log.debug("[reset] Removing k8s cloud")
254 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
255
256 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
257
258 cloud_creds = await libjuju.get_cloud_credentials(cloud)
259
260 await libjuju.remove_cloud(cluster_uuid)
261
262 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
263
264 kubecfg_file = tempfile.NamedTemporaryFile()
265 with open(kubecfg_file.name, "w") as f:
266 f.write(kubecfg)
267 kubectl = Kubectl(config_file=kubecfg_file.name)
268
269 delete_functions = [
270 self._delete_cluster_role_binding,
271 self._delete_service_account,
272 self._delete_cluster_role,
273 ]
274
275 credential_attrs = cloud_creds[0].result["attrs"]
276 if RBAC_LABEL_KEY_NAME in credential_attrs:
277 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
278 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
279 delete_args = (kubectl, metadata_name)
280 for delete_func in delete_functions:
281 try:
282 delete_func(*delete_args)
283 except Exception as e:
284 self.log.warning("Cannot remove resource in K8s {}".format(e))
285
286 except Exception as e:
287 self.log.debug("Caught exception during reset: {}".format(e))
288 raise e
289 return True
290
291 """Deployment"""
292
293 async def install(
294 self,
295 cluster_uuid: str,
296 kdu_model: str,
297 kdu_instance: str,
298 atomic: bool = True,
299 timeout: float = 1800,
300 params: dict = None,
301 db_dict: dict = None,
302 kdu_name: str = None,
303 namespace: str = None,
304 **kwargs,
305 ) -> bool:
306 """Install a bundle
307
308 :param cluster_uuid str: The UUID of the cluster to install to
309 :param kdu_model str: The name or path of a bundle to install
310 :param kdu_instance: Kdu instance name
311 :param atomic bool: If set, waits until the model is active and resets
312 the cluster on failure.
313 :param timeout int: The time, in seconds, to wait for the install
314 to finish
315 :param params dict: Key-value pairs of instantiation parameters
316 :param kdu_name: Name of the KDU instance to be installed
317 :param namespace: K8s namespace to use for the KDU instance
318 :param kwargs: Additional parameters
319 vca_id (str): VCA ID
320
321 :return: If successful, returns ?
322 """
323 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
324 bundle = kdu_model
325
326 if not db_dict:
327 raise K8sException("db_dict must be set")
328 if not bundle:
329 raise K8sException("bundle must be set")
330
331 if bundle.startswith("cs:"):
332 pass
333 elif bundle.startswith("http"):
334 # Download the file
335 pass
336 else:
337 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
338 os.chdir(new_workdir)
339 bundle = "local:{}".format(kdu_model)
340
341 self.log.debug("Checking for model named {}".format(kdu_instance))
342
343 # Create the new model
344 self.log.debug("Adding model: {}".format(kdu_instance))
345 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
346 await libjuju.add_model(kdu_instance, cloud)
347
348 # if model:
349 # TODO: Instantiation parameters
350
351 """
352 "Juju bundle that models the KDU, in any of the following ways:
353 - <juju-repo>/<juju-bundle>
354 - <juju-bundle folder under k8s_models folder in the package>
355 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
356 in the package>
357 - <URL_where_to_fetch_juju_bundle>
358 """
359 try:
360 previous_workdir = os.getcwd()
361 except FileNotFoundError:
362 previous_workdir = "/app/storage"
363
364 self.log.debug("[install] deploying {}".format(bundle))
365 await libjuju.deploy(
366 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
367 )
368 os.chdir(previous_workdir)
369 if self.on_update_db:
370 await self.on_update_db(
371 cluster_uuid,
372 kdu_instance,
373 filter=db_dict["filter"],
374 vca_id=kwargs.get("vca_id")
375 )
376 return True
377
378 async def instances_list(self, cluster_uuid: str) -> list:
379 """
380 returns a list of deployed releases in a cluster
381
382 :param cluster_uuid: the cluster
383 :return:
384 """
385 return []
386
387 async def upgrade(
388 self,
389 cluster_uuid: str,
390 kdu_instance: str,
391 kdu_model: str = None,
392 params: dict = None,
393 ) -> str:
394 """Upgrade a model
395
396 :param cluster_uuid str: The UUID of the cluster to upgrade
397 :param kdu_instance str: The unique name of the KDU instance
398 :param kdu_model str: The name or path of the bundle to upgrade to
399 :param params dict: Key-value pairs of instantiation parameters
400
401 :return: If successful, reference to the new revision number of the
402 KDU instance.
403 """
404
405 # TODO: Loop through the bundle and upgrade each charm individually
406
407 """
408 The API doesn't have a concept of bundle upgrades, because there are
409 many possible changes: charm revision, disk, number of units, etc.
410
411 As such, we are only supporting a limited subset of upgrades. We'll
412 upgrade the charm revision but leave storage and scale untouched.
413
414 Scale changes should happen through OSM constructs, and changes to
415 storage would require a redeployment of the service, at least in this
416 initial release.
417 """
418 raise MethodNotImplemented()
419
420 """Rollback"""
421
422 async def rollback(
423 self,
424 cluster_uuid: str,
425 kdu_instance: str,
426 revision: int = 0,
427 ) -> str:
428 """Rollback a model
429
430 :param cluster_uuid str: The UUID of the cluster to rollback
431 :param kdu_instance str: The unique name of the KDU instance
432 :param revision int: The revision to revert to. If omitted, rolls back
433 the previous upgrade.
434
435 :return: If successful, returns the revision of active KDU instance,
436 or raises an exception
437 """
438 raise MethodNotImplemented()
439
440 """Deletion"""
441
442 async def uninstall(
443 self,
444 cluster_uuid: str,
445 kdu_instance: str,
446 **kwargs,
447 ) -> bool:
448 """Uninstall a KDU instance
449
450 :param cluster_uuid str: The UUID of the cluster
451 :param kdu_instance str: The unique name of the KDU instance
452 :param kwargs: Additional parameters
453 vca_id (str): VCA ID
454
455 :return: Returns True if successful, or raises an exception
456 """
457
458 self.log.debug("[uninstall] Destroying model")
459 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
460
461 await libjuju.destroy_model(kdu_instance, total_timeout=3600)
462
463 # self.log.debug("[uninstall] Model destroyed and disconnecting")
464 # await controller.disconnect()
465
466 return True
467 # TODO: Remove these commented lines
468 # if not self.authenticated:
469 # self.log.debug("[uninstall] Connecting to controller")
470 # await self.login(cluster_uuid)
471
472 async def exec_primitive(
473 self,
474 cluster_uuid: str = None,
475 kdu_instance: str = None,
476 primitive_name: str = None,
477 timeout: float = 300,
478 params: dict = None,
479 db_dict: dict = None,
480 **kwargs,
481 ) -> str:
482 """Exec primitive (Juju action)
483
484 :param cluster_uuid str: The UUID of the cluster
485 :param kdu_instance str: The unique name of the KDU instance
486 :param primitive_name: Name of action that will be executed
487 :param timeout: Timeout for action execution
488 :param params: Dictionary of all the parameters needed for the action
489 :param db_dict: Dictionary for any additional data
490 :param kwargs: Additional parameters
491 vca_id (str): VCA ID
492
493 :return: Returns the output of the action
494 """
495 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
496
497 if not params or "application-name" not in params:
498 raise K8sException(
499 "Missing application-name argument, \
500 argument needed for K8s actions"
501 )
502 try:
503 self.log.debug(
504 "[exec_primitive] Getting model "
505 "kdu_instance: {}".format(kdu_instance)
506 )
507 application_name = params["application-name"]
508 actions = await libjuju.get_actions(application_name, kdu_instance)
509 if primitive_name not in actions:
510 raise K8sException("Primitive {} not found".format(primitive_name))
511 output, status = await libjuju.execute_action(
512 application_name, kdu_instance, primitive_name, **params
513 )
514
515 if status != "completed":
516 raise K8sException(
517 "status is not completed: {} output: {}".format(status, output)
518 )
519 if self.on_update_db:
520 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
521
522 return output
523
524 except Exception as e:
525 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
526 self.log.error(error_msg)
527 raise K8sException(message=error_msg)
528
529 """Introspection"""
530
531 async def inspect_kdu(
532 self,
533 kdu_model: str,
534 ) -> dict:
535 """Inspect a KDU
536
537 Inspects a bundle and returns a dictionary of config parameters and
538 their default values.
539
540 :param kdu_model str: The name or path of the bundle to inspect.
541
542 :return: If successful, returns a dictionary of available parameters
543 and their default values.
544 """
545
546 kdu = {}
547 if not os.path.exists(kdu_model):
548 raise K8sException("file {} not found".format(kdu_model))
549
550 with open(kdu_model, "r") as f:
551 bundle = yaml.safe_load(f.read())
552
553 """
554 {
555 'description': 'Test bundle',
556 'bundle': 'kubernetes',
557 'applications': {
558 'mariadb-k8s': {
559 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
560 'scale': 1,
561 'options': {
562 'password': 'manopw',
563 'root_password': 'osm4u',
564 'user': 'mano'
565 },
566 'series': 'kubernetes'
567 }
568 }
569 }
570 """
571 # TODO: This should be returned in an agreed-upon format
572 kdu = bundle["applications"]
573
574 return kdu
575
576 async def help_kdu(
577 self,
578 kdu_model: str,
579 ) -> str:
580 """View the README
581
582 If available, returns the README of the bundle.
583
584 :param kdu_model str: The name or path of a bundle
585
586 :return: If found, returns the contents of the README.
587 """
588 readme = None
589
590 files = ["README", "README.txt", "README.md"]
591 path = os.path.dirname(kdu_model)
592 for file in os.listdir(path):
593 if file in files:
594 with open(file, "r") as f:
595 readme = f.read()
596 break
597
598 return readme
599
600 async def status_kdu(
601 self,
602 cluster_uuid: str,
603 kdu_instance: str,
604 complete_status: bool = False,
605 yaml_format: bool = False,
606 **kwargs,
607 ) -> dict:
608 """Get the status of the KDU
609
610 Get the current status of the KDU instance.
611
612 :param cluster_uuid str: The UUID of the cluster
613 :param kdu_instance str: The unique id of the KDU instance
614 :param complete_status: To get the complete_status of the KDU
615 :param yaml_format: To get the status in proper format for NSR record
616 :param: kwargs: Additional parameters
617 vca_id (str): VCA ID
618
619 :return: Returns a dictionary containing namespace, state, resources,
620 and deployment_time and returns complete_status if complete_status is True
621 """
622 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
623 status = {}
624
625 model_status = await libjuju.get_model_status(kdu_instance)
626
627 if not complete_status:
628 for name in model_status.applications:
629 application = model_status.applications[name]
630 status[name] = {"status": application["status"]["status"]}
631 else:
632 if yaml_format:
633 return obj_to_yaml(model_status)
634 else:
635 return obj_to_dict(model_status)
636
637 return status
638
639 async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
640 """
641 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
642
643 :param vcastatus dict: dict containing vcastatus
644 :param kdu_instance str: The unique id of the KDU instance
645 :param: kwargs: Additional parameters
646 vca_id (str): VCA ID
647
648 :return: None
649 """
650 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
651 try:
652 for model_name in vcastatus:
653 # Adding executed actions
654 vcastatus[model_name]["executedActions"] = \
655 await libjuju.get_executed_actions(kdu_instance)
656
657 for application in vcastatus[model_name]["applications"]:
658 # Adding application actions
659 vcastatus[model_name]["applications"][application]["actions"] = \
660 await libjuju.get_actions(application, kdu_instance)
661 # Adding application configs
662 vcastatus[model_name]["applications"][application]["configs"] = \
663 await libjuju.get_application_configs(kdu_instance, application)
664
665 except Exception as e:
666 self.log.debug("Error in updating vca status: {}".format(str(e)))
667
668 async def get_services(
669 self, cluster_uuid: str, kdu_instance: str, namespace: str
670 ) -> list:
671 """Return a list of services of a kdu_instance"""
672
673 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
674
675 kubecfg = tempfile.NamedTemporaryFile()
676 with open(kubecfg.name, "w") as kubecfg_file:
677 kubecfg_file.write(credentials)
678 kubectl = Kubectl(config_file=kubecfg.name)
679
680 return kubectl.get_services(
681 field_selector="metadata.namespace={}".format(kdu_instance)
682 )
683
684 async def get_service(
685 self, cluster_uuid: str, service_name: str, namespace: str
686 ) -> object:
687 """Return data for a specific service inside a namespace"""
688
689 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
690
691 kubecfg = tempfile.NamedTemporaryFile()
692 with open(kubecfg.name, "w") as kubecfg_file:
693 kubecfg_file.write(credentials)
694 kubectl = Kubectl(config_file=kubecfg.name)
695
696 return kubectl.get_services(
697 field_selector="metadata.name={},metadata.namespace={}".format(
698 service_name, namespace
699 )
700 )[0]
701
702 def get_credentials(self, cluster_uuid: str) -> str:
703 """
704 Get Cluster Kubeconfig
705 """
706 k8scluster = self.db.get_one(
707 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
708 )
709
710 self.db.encrypt_decrypt_fields(
711 k8scluster.get("credentials"),
712 "decrypt",
713 ["password", "secret"],
714 schema_version=k8scluster["schema_version"],
715 salt=k8scluster["_id"],
716 )
717
718 return yaml.safe_dump(k8scluster.get("credentials"))
719
720 def _get_credential_name(self, cluster_uuid: str) -> str:
721 """
722 Get credential name for a k8s cloud
723
724 We cannot use the cluster_uuid for the credential name directly,
725 because it cannot start with a number, it must start with a letter.
726 Therefore, the k8s cloud credential name will be "cred-" followed
727 by the cluster uuid.
728
729 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
730
731 :return: Name to use for the credential name.
732 """
733 return "cred-{}".format(cluster_uuid)
734
735 def get_namespace(
736 self,
737 cluster_uuid: str,
738 ) -> str:
739 """Get the namespace UUID
740 Gets the namespace's unique name
741
742 :param cluster_uuid str: The UUID of the cluster
743 :returns: The namespace UUID, or raises an exception
744 """
745 pass
746
747 def _create_cluster_role(
748 self,
749 kubectl: Kubectl,
750 name: str,
751 labels: Dict[str, str],
752 ):
753 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
754 field_selector="metadata.name={}".format(name)
755 )
756
757 if len(cluster_roles.items) > 0:
758 raise Exception(
759 "Cluster role with metadata.name={} already exists".format(name)
760 )
761
762 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
763 # Cluster role
764 cluster_role = V1ClusterRole(
765 metadata=metadata,
766 rules=[
767 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
768 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
769 ],
770 )
771
772 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
773
774 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
775 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
776
777 def _create_service_account(
778 self,
779 kubectl: Kubectl,
780 name: str,
781 labels: Dict[str, str],
782 ):
783 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
784 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
785 )
786 if len(service_accounts.items) > 0:
787 raise Exception(
788 "Service account with metadata.name={} already exists".format(name)
789 )
790
791 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
792 service_account = V1ServiceAccount(metadata=metadata)
793
794 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
795 ADMIN_NAMESPACE, service_account
796 )
797
798 def _delete_service_account(self, kubectl: Kubectl, name: str):
799 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
800 name, ADMIN_NAMESPACE
801 )
802
803 def _create_cluster_role_binding(
804 self,
805 kubectl: Kubectl,
806 name: str,
807 labels: Dict[str, str],
808 ):
809 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
810 field_selector="metadata.name={}".format(name)
811 )
812 if len(role_bindings.items) > 0:
813 raise Exception("Generated rbac id already exists")
814
815 role_binding = V1ClusterRoleBinding(
816 metadata=V1ObjectMeta(name=name, labels=labels),
817 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
818 subjects=[
819 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
820 ],
821 )
822 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
823
824 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
825 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
826
827 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
828 v1_core = kubectl.clients[CORE_CLIENT]
829
830 retries_limit = 10
831 secret_name = None
832 while True:
833 retries_limit -= 1
834 service_accounts = v1_core.list_namespaced_service_account(
835 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
836 )
837 if len(service_accounts.items) == 0:
838 raise Exception(
839 "Service account not found with metadata.name={}".format(name)
840 )
841 service_account = service_accounts.items[0]
842 if service_account.secrets and len(service_account.secrets) > 0:
843 secret_name = service_account.secrets[0].name
844 if secret_name is not None or not retries_limit:
845 break
846 if not secret_name:
847 raise Exception(
848 "Failed getting the secret from service account {}".format(name)
849 )
850 secret = v1_core.list_namespaced_secret(
851 ADMIN_NAMESPACE,
852 field_selector="metadata.name={}".format(secret_name),
853 ).items[0]
854
855 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
856 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
857
858 return (
859 base64.b64decode(token).decode("utf-8"),
860 base64.b64decode(client_certificate_data).decode("utf-8"),
861 )
862
863 @staticmethod
864 def generate_kdu_instance_name(**kwargs):
865 db_dict = kwargs.get("db_dict")
866 kdu_name = kwargs.get("kdu_name", None)
867 if kdu_name:
868 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
869 else:
870 kdu_instance = db_dict["filter"]["_id"]
871 return kdu_instance
872
873 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
874 """
875 Get libjuju object
876
877 :param: vca_id: VCA ID
878 If None, get a libjuju object with a Connection to the default VCA
879 Else, geta libjuju object with a Connection to the specified VCA
880 """
881 if not vca_id:
882 while self.loading_libjuju.locked():
883 await asyncio.sleep(0.1)
884 if not self.libjuju:
885 async with self.loading_libjuju:
886 vca_connection = await get_connection(self._store)
887 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
888 return self.libjuju
889 else:
890 vca_connection = await get_connection(self._store, vca_id)
891 return Libjuju(
892 vca_connection,
893 loop=self.loop,
894 log=self.log,
895 n2vc=self,
896 )