3a2d9e31f403a7f2da678c02701166ddebf04e73
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.config import EnvironConfig
24 from n2vc.exceptions import K8sException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
27 from .exceptions import MethodNotImplemented
28 from n2vc.libjuju import Libjuju
29 from n2vc.utils import obj_to_dict, obj_to_yaml
30 from n2vc.store import MotorStore
31 from n2vc.vca.cloud import Cloud
32 from n2vc.vca.connection import get_connection
33 from kubernetes.client.models import (
34 V1ClusterRole,
35 V1ObjectMeta,
36 V1PolicyRule,
37 V1ServiceAccount,
38 V1ClusterRoleBinding,
39 V1RoleRef,
40 V1Subject,
41 )
42
43 from typing import Dict
44
45 SERVICE_ACCOUNT_TOKEN_KEY = "token"
46 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
47 RBAC_LABEL_KEY_NAME = "rbac-id"
48
49 ADMIN_NAMESPACE = "kube-system"
50 RBAC_STACK_PREFIX = "juju-credential"
51
52
53 def generate_rbac_id():
54 return binascii.hexlify(os.urandom(4)).decode()
55
56
57 class K8sJujuConnector(K8sConnector):
58 libjuju = None
59
60 def __init__(
61 self,
62 fs: object,
63 db: object,
64 kubectl_command: str = "/usr/bin/kubectl",
65 juju_command: str = "/usr/bin/juju",
66 log: object = None,
67 loop: object = None,
68 on_update_db=None,
69 ):
70 """
71 :param fs: file system for kubernetes and helm configuration
72 :param db: Database object
73 :param kubectl_command: path to kubectl executable
74 :param helm_command: path to helm executable
75 :param log: logger
76 :param: loop: Asyncio loop
77 """
78
79 # parent class
80 K8sConnector.__init__(
81 self,
82 db,
83 log=log,
84 on_update_db=on_update_db,
85 )
86
87 self.fs = fs
88 self.loop = loop or asyncio.get_event_loop()
89 self.log.debug("Initializing K8S Juju connector")
90
91 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
92 self._store = MotorStore(db_uri)
93 self.loading_libjuju = asyncio.Lock(loop=self.loop)
94
95 self.log.debug("K8S Juju connector initialized")
96 # TODO: Remove these commented lines:
97 # self.authenticated = False
98 # self.models = {}
99 # self.juju_secret = ""
100
101 """Initialization"""
102
103 async def init_env(
104 self,
105 k8s_creds: str,
106 namespace: str = "kube-system",
107 reuse_cluster_uuid: str = None,
108 **kwargs,
109 ) -> (str, bool):
110 """
111 It prepares a given K8s cluster environment to run Juju bundles.
112
113 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 '.kube/config'
115 :param namespace: optional namespace to be used for juju. By default,
116 'kube-system' will be used
117 :param reuse_cluster_uuid: existing cluster uuid for reuse
118 :param: kwargs: Additional parameters
119 vca_id (str): VCA ID
120
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
124 """
125 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
126
127 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
128
129 kubecfg = tempfile.NamedTemporaryFile()
130 with open(kubecfg.name, "w") as kubecfg_file:
131 kubecfg_file.write(k8s_creds)
132 kubectl = Kubectl(config_file=kubecfg.name)
133
134 # CREATING RESOURCES IN K8S
135 rbac_id = generate_rbac_id()
136 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
137 labels = {RBAC_STACK_PREFIX: rbac_id}
138
139 # Create cleanup dictionary to clean up created resources
140 # if it fails in the middle of the process
141 cleanup_data = []
142 try:
143 self._create_cluster_role(
144 kubectl,
145 name=metadata_name,
146 labels=labels,
147 )
148 cleanup_data.append(
149 {
150 "delete": self._delete_cluster_role,
151 "args": (kubectl, metadata_name),
152 }
153 )
154
155 self._create_service_account(
156 kubectl,
157 name=metadata_name,
158 labels=labels,
159 )
160 cleanup_data.append(
161 {
162 "delete": self._delete_service_account,
163 "args": (kubectl, metadata_name),
164 }
165 )
166
167 self._create_cluster_role_binding(
168 kubectl,
169 name=metadata_name,
170 labels=labels,
171 )
172 cleanup_data.append(
173 {
174 "delete": self._delete_service_account,
175 "args": (kubectl, metadata_name),
176 }
177 )
178 token, client_cert_data = await self._get_secret_data(
179 kubectl,
180 metadata_name,
181 )
182
183 default_storage_class = kubectl.get_default_storage_class()
184 await libjuju.add_k8s(
185 name=cluster_uuid,
186 rbac_id=rbac_id,
187 token=token,
188 client_cert_data=client_cert_data,
189 configuration=kubectl.configuration,
190 storage_class=default_storage_class,
191 credential_name=self._get_credential_name(cluster_uuid),
192 )
193 return cluster_uuid, True
194 except Exception as e:
195 self.log.error("Error initializing k8scluster: {}".format(e))
196 if len(cleanup_data) > 0:
197 self.log.debug("Cleaning up created resources in k8s cluster...")
198 for item in cleanup_data:
199 delete_function = item["delete"]
200 delete_args = item["args"]
201 delete_function(*delete_args)
202 self.log.debug("Cleanup finished")
203 raise e
204
205 """Repo Management"""
206
207 async def repo_add(
208 self,
209 name: str,
210 url: str,
211 _type: str = "charm",
212 ):
213 raise MethodNotImplemented()
214
215 async def repo_list(self):
216 raise MethodNotImplemented()
217
218 async def repo_remove(
219 self,
220 name: str,
221 ):
222 raise MethodNotImplemented()
223
224 async def synchronize_repos(self, cluster_uuid: str, name: str):
225 """
226 Returns None as currently add_repo is not implemented
227 """
228 return None
229
230 """Reset"""
231
232 async def reset(
233 self,
234 cluster_uuid: str,
235 force: bool = False,
236 uninstall_sw: bool = False,
237 **kwargs,
238 ) -> bool:
239 """Reset a cluster
240
241 Resets the Kubernetes cluster by removing the model that represents it.
242
243 :param cluster_uuid str: The UUID of the cluster to reset
244 :param force: Force reset
245 :param uninstall_sw: Boolean to uninstall sw
246 :param: kwargs: Additional parameters
247 vca_id (str): VCA ID
248
249 :return: Returns True if successful or raises an exception.
250 """
251
252 try:
253 self.log.debug("[reset] Removing k8s cloud")
254 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
255
256 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
257
258 cloud_creds = await libjuju.get_cloud_credentials(cloud)
259
260 await libjuju.remove_cloud(cluster_uuid)
261
262 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
263
264 kubecfg_file = tempfile.NamedTemporaryFile()
265 with open(kubecfg_file.name, "w") as f:
266 f.write(kubecfg)
267 kubectl = Kubectl(config_file=kubecfg_file.name)
268
269 delete_functions = [
270 self._delete_cluster_role_binding,
271 self._delete_service_account,
272 self._delete_cluster_role,
273 ]
274
275 credential_attrs = cloud_creds[0].result["attrs"]
276 if RBAC_LABEL_KEY_NAME in credential_attrs:
277 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
278 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
279 delete_args = (kubectl, metadata_name)
280 for delete_func in delete_functions:
281 try:
282 delete_func(*delete_args)
283 except Exception as e:
284 self.log.warning("Cannot remove resource in K8s {}".format(e))
285
286 except Exception as e:
287 self.log.debug("Caught exception during reset: {}".format(e))
288 raise e
289 return True
290
291 """Deployment"""
292
293 async def install(
294 self,
295 cluster_uuid: str,
296 kdu_model: str,
297 kdu_instance: str,
298 atomic: bool = True,
299 timeout: float = 1800,
300 params: dict = None,
301 db_dict: dict = None,
302 kdu_name: str = None,
303 namespace: str = None,
304 **kwargs,
305 ) -> bool:
306 """Install a bundle
307
308 :param cluster_uuid str: The UUID of the cluster to install to
309 :param kdu_model str: The name or path of a bundle to install
310 :param kdu_instance: Kdu instance name
311 :param atomic bool: If set, waits until the model is active and resets
312 the cluster on failure.
313 :param timeout int: The time, in seconds, to wait for the install
314 to finish
315 :param params dict: Key-value pairs of instantiation parameters
316 :param kdu_name: Name of the KDU instance to be installed
317 :param namespace: K8s namespace to use for the KDU instance
318 :param kwargs: Additional parameters
319 vca_id (str): VCA ID
320
321 :return: If successful, returns ?
322 """
323 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
324 bundle = kdu_model
325
326 if not db_dict:
327 raise K8sException("db_dict must be set")
328 if not bundle:
329 raise K8sException("bundle must be set")
330
331 if bundle.startswith("cs:"):
332 pass
333 elif bundle.startswith("http"):
334 # Download the file
335 pass
336 else:
337 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
338 os.chdir(new_workdir)
339 bundle = "local:{}".format(kdu_model)
340
341 self.log.debug("Checking for model named {}".format(kdu_instance))
342
343 # Create the new model
344 self.log.debug("Adding model: {}".format(kdu_instance))
345 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
346 await libjuju.add_model(kdu_instance, cloud)
347
348 # if model:
349 # TODO: Instantiation parameters
350
351 """
352 "Juju bundle that models the KDU, in any of the following ways:
353 - <juju-repo>/<juju-bundle>
354 - <juju-bundle folder under k8s_models folder in the package>
355 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
356 in the package>
357 - <URL_where_to_fetch_juju_bundle>
358 """
359 try:
360 previous_workdir = os.getcwd()
361 except FileNotFoundError:
362 previous_workdir = "/app/storage"
363
364 self.log.debug("[install] deploying {}".format(bundle))
365 await libjuju.deploy(
366 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
367 )
368 os.chdir(previous_workdir)
369 if self.on_update_db:
370 await self.on_update_db(
371 cluster_uuid,
372 kdu_instance,
373 filter=db_dict["filter"],
374 vca_id=kwargs.get("vca_id")
375 )
376 return True
377
378 async def scale(
379 self,
380 kdu_instance: str,
381 scale: int,
382 resource_name: str,
383 total_timeout: float = 1800,
384 **kwargs,
385 ) -> bool:
386 """Scale an application in a model
387
388 :param: kdu_instance str: KDU instance name
389 :param: scale int: Scale to which to set this application
390 :param: resource_name str: Resource name (Application name)
391 :param: timeout float: The time, in seconds, to wait for the install
392 to finish
393 :param kwargs: Additional parameters
394 vca_id (str): VCA ID
395
396 :return: If successful, returns True
397 """
398
399 try:
400 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
401 await libjuju.scale_application(
402 model_name=kdu_instance,
403 application_name=resource_name,
404 scale=scale,
405 total_timeout=total_timeout
406 )
407 except Exception as e:
408 error_msg = "Error scaling application {} in kdu instance {}: {}".format(
409 resource_name, kdu_instance, e)
410 self.log.error(error_msg)
411 raise K8sException(message=error_msg)
412 return True
413
414 async def get_scale_count(
415 self,
416 resource_name: str,
417 kdu_instance: str,
418 **kwargs,
419 ) -> int:
420 """Get an application scale count
421
422 :param: resource_name str: Resource name (Application name)
423 :param: kdu_instance str: KDU instance name
424 :param kwargs: Additional parameters
425 vca_id (str): VCA ID
426 :return: Return application instance count
427 """
428 try:
429 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
430 status = await libjuju.get_model_status(kdu_instance)
431 return len(status.applications[resource_name].units)
432 except Exception as e:
433 error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
434 resource_name, kdu_instance, e)
435 self.log.error(error_msg)
436 raise K8sException(message=error_msg)
437
438 async def instances_list(self, cluster_uuid: str) -> list:
439 """
440 returns a list of deployed releases in a cluster
441
442 :param cluster_uuid: the cluster
443 :return:
444 """
445 return []
446
447 async def upgrade(
448 self,
449 cluster_uuid: str,
450 kdu_instance: str,
451 kdu_model: str = None,
452 params: dict = None,
453 ) -> str:
454 """Upgrade a model
455
456 :param cluster_uuid str: The UUID of the cluster to upgrade
457 :param kdu_instance str: The unique name of the KDU instance
458 :param kdu_model str: The name or path of the bundle to upgrade to
459 :param params dict: Key-value pairs of instantiation parameters
460
461 :return: If successful, reference to the new revision number of the
462 KDU instance.
463 """
464
465 # TODO: Loop through the bundle and upgrade each charm individually
466
467 """
468 The API doesn't have a concept of bundle upgrades, because there are
469 many possible changes: charm revision, disk, number of units, etc.
470
471 As such, we are only supporting a limited subset of upgrades. We'll
472 upgrade the charm revision but leave storage and scale untouched.
473
474 Scale changes should happen through OSM constructs, and changes to
475 storage would require a redeployment of the service, at least in this
476 initial release.
477 """
478 raise MethodNotImplemented()
479
480 """Rollback"""
481
482 async def rollback(
483 self,
484 cluster_uuid: str,
485 kdu_instance: str,
486 revision: int = 0,
487 ) -> str:
488 """Rollback a model
489
490 :param cluster_uuid str: The UUID of the cluster to rollback
491 :param kdu_instance str: The unique name of the KDU instance
492 :param revision int: The revision to revert to. If omitted, rolls back
493 the previous upgrade.
494
495 :return: If successful, returns the revision of active KDU instance,
496 or raises an exception
497 """
498 raise MethodNotImplemented()
499
500 """Deletion"""
501
502 async def uninstall(
503 self,
504 cluster_uuid: str,
505 kdu_instance: str,
506 **kwargs,
507 ) -> bool:
508 """Uninstall a KDU instance
509
510 :param cluster_uuid str: The UUID of the cluster
511 :param kdu_instance str: The unique name of the KDU instance
512 :param kwargs: Additional parameters
513 vca_id (str): VCA ID
514
515 :return: Returns True if successful, or raises an exception
516 """
517
518 self.log.debug("[uninstall] Destroying model")
519 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
520
521 await libjuju.destroy_model(kdu_instance, total_timeout=3600)
522
523 # self.log.debug("[uninstall] Model destroyed and disconnecting")
524 # await controller.disconnect()
525
526 return True
527 # TODO: Remove these commented lines
528 # if not self.authenticated:
529 # self.log.debug("[uninstall] Connecting to controller")
530 # await self.login(cluster_uuid)
531
532 async def exec_primitive(
533 self,
534 cluster_uuid: str = None,
535 kdu_instance: str = None,
536 primitive_name: str = None,
537 timeout: float = 300,
538 params: dict = None,
539 db_dict: dict = None,
540 **kwargs,
541 ) -> str:
542 """Exec primitive (Juju action)
543
544 :param cluster_uuid str: The UUID of the cluster
545 :param kdu_instance str: The unique name of the KDU instance
546 :param primitive_name: Name of action that will be executed
547 :param timeout: Timeout for action execution
548 :param params: Dictionary of all the parameters needed for the action
549 :param db_dict: Dictionary for any additional data
550 :param kwargs: Additional parameters
551 vca_id (str): VCA ID
552
553 :return: Returns the output of the action
554 """
555 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
556
557 if not params or "application-name" not in params:
558 raise K8sException(
559 "Missing application-name argument, \
560 argument needed for K8s actions"
561 )
562 try:
563 self.log.debug(
564 "[exec_primitive] Getting model "
565 "kdu_instance: {}".format(kdu_instance)
566 )
567 application_name = params["application-name"]
568 actions = await libjuju.get_actions(application_name, kdu_instance)
569 if primitive_name not in actions:
570 raise K8sException("Primitive {} not found".format(primitive_name))
571 output, status = await libjuju.execute_action(
572 application_name, kdu_instance, primitive_name, **params
573 )
574
575 if status != "completed":
576 raise K8sException(
577 "status is not completed: {} output: {}".format(status, output)
578 )
579 if self.on_update_db:
580 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
581
582 return output
583
584 except Exception as e:
585 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
586 self.log.error(error_msg)
587 raise K8sException(message=error_msg)
588
589 """Introspection"""
590
591 async def inspect_kdu(
592 self,
593 kdu_model: str,
594 ) -> dict:
595 """Inspect a KDU
596
597 Inspects a bundle and returns a dictionary of config parameters and
598 their default values.
599
600 :param kdu_model str: The name or path of the bundle to inspect.
601
602 :return: If successful, returns a dictionary of available parameters
603 and their default values.
604 """
605
606 kdu = {}
607 if not os.path.exists(kdu_model):
608 raise K8sException("file {} not found".format(kdu_model))
609
610 with open(kdu_model, "r") as f:
611 bundle = yaml.safe_load(f.read())
612
613 """
614 {
615 'description': 'Test bundle',
616 'bundle': 'kubernetes',
617 'applications': {
618 'mariadb-k8s': {
619 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
620 'scale': 1,
621 'options': {
622 'password': 'manopw',
623 'root_password': 'osm4u',
624 'user': 'mano'
625 },
626 'series': 'kubernetes'
627 }
628 }
629 }
630 """
631 # TODO: This should be returned in an agreed-upon format
632 kdu = bundle["applications"]
633
634 return kdu
635
636 async def help_kdu(
637 self,
638 kdu_model: str,
639 ) -> str:
640 """View the README
641
642 If available, returns the README of the bundle.
643
644 :param kdu_model str: The name or path of a bundle
645
646 :return: If found, returns the contents of the README.
647 """
648 readme = None
649
650 files = ["README", "README.txt", "README.md"]
651 path = os.path.dirname(kdu_model)
652 for file in os.listdir(path):
653 if file in files:
654 with open(file, "r") as f:
655 readme = f.read()
656 break
657
658 return readme
659
660 async def status_kdu(
661 self,
662 cluster_uuid: str,
663 kdu_instance: str,
664 complete_status: bool = False,
665 yaml_format: bool = False,
666 **kwargs,
667 ) -> dict:
668 """Get the status of the KDU
669
670 Get the current status of the KDU instance.
671
672 :param cluster_uuid str: The UUID of the cluster
673 :param kdu_instance str: The unique id of the KDU instance
674 :param complete_status: To get the complete_status of the KDU
675 :param yaml_format: To get the status in proper format for NSR record
676 :param: kwargs: Additional parameters
677 vca_id (str): VCA ID
678
679 :return: Returns a dictionary containing namespace, state, resources,
680 and deployment_time and returns complete_status if complete_status is True
681 """
682 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
683 status = {}
684
685 model_status = await libjuju.get_model_status(kdu_instance)
686
687 if not complete_status:
688 for name in model_status.applications:
689 application = model_status.applications[name]
690 status[name] = {"status": application["status"]["status"]}
691 else:
692 if yaml_format:
693 return obj_to_yaml(model_status)
694 else:
695 return obj_to_dict(model_status)
696
697 return status
698
699 async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
700 """
701 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
702
703 :param vcastatus dict: dict containing vcastatus
704 :param kdu_instance str: The unique id of the KDU instance
705 :param: kwargs: Additional parameters
706 vca_id (str): VCA ID
707
708 :return: None
709 """
710 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
711 try:
712 for model_name in vcastatus:
713 # Adding executed actions
714 vcastatus[model_name]["executedActions"] = \
715 await libjuju.get_executed_actions(kdu_instance)
716
717 for application in vcastatus[model_name]["applications"]:
718 # Adding application actions
719 vcastatus[model_name]["applications"][application]["actions"] = \
720 await libjuju.get_actions(application, kdu_instance)
721 # Adding application configs
722 vcastatus[model_name]["applications"][application]["configs"] = \
723 await libjuju.get_application_configs(kdu_instance, application)
724
725 except Exception as e:
726 self.log.debug("Error in updating vca status: {}".format(str(e)))
727
728 async def get_services(
729 self, cluster_uuid: str, kdu_instance: str, namespace: str
730 ) -> list:
731 """Return a list of services of a kdu_instance"""
732
733 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
734
735 kubecfg = tempfile.NamedTemporaryFile()
736 with open(kubecfg.name, "w") as kubecfg_file:
737 kubecfg_file.write(credentials)
738 kubectl = Kubectl(config_file=kubecfg.name)
739
740 return kubectl.get_services(
741 field_selector="metadata.namespace={}".format(kdu_instance)
742 )
743
744 async def get_service(
745 self, cluster_uuid: str, service_name: str, namespace: str
746 ) -> object:
747 """Return data for a specific service inside a namespace"""
748
749 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
750
751 kubecfg = tempfile.NamedTemporaryFile()
752 with open(kubecfg.name, "w") as kubecfg_file:
753 kubecfg_file.write(credentials)
754 kubectl = Kubectl(config_file=kubecfg.name)
755
756 return kubectl.get_services(
757 field_selector="metadata.name={},metadata.namespace={}".format(
758 service_name, namespace
759 )
760 )[0]
761
762 def get_credentials(self, cluster_uuid: str) -> str:
763 """
764 Get Cluster Kubeconfig
765 """
766 k8scluster = self.db.get_one(
767 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
768 )
769
770 self.db.encrypt_decrypt_fields(
771 k8scluster.get("credentials"),
772 "decrypt",
773 ["password", "secret"],
774 schema_version=k8scluster["schema_version"],
775 salt=k8scluster["_id"],
776 )
777
778 return yaml.safe_dump(k8scluster.get("credentials"))
779
780 def _get_credential_name(self, cluster_uuid: str) -> str:
781 """
782 Get credential name for a k8s cloud
783
784 We cannot use the cluster_uuid for the credential name directly,
785 because it cannot start with a number, it must start with a letter.
786 Therefore, the k8s cloud credential name will be "cred-" followed
787 by the cluster uuid.
788
789 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
790
791 :return: Name to use for the credential name.
792 """
793 return "cred-{}".format(cluster_uuid)
794
795 def get_namespace(
796 self,
797 cluster_uuid: str,
798 ) -> str:
799 """Get the namespace UUID
800 Gets the namespace's unique name
801
802 :param cluster_uuid str: The UUID of the cluster
803 :returns: The namespace UUID, or raises an exception
804 """
805 pass
806
807 def _create_cluster_role(
808 self,
809 kubectl: Kubectl,
810 name: str,
811 labels: Dict[str, str],
812 ):
813 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
814 field_selector="metadata.name={}".format(name)
815 )
816
817 if len(cluster_roles.items) > 0:
818 raise Exception(
819 "Cluster role with metadata.name={} already exists".format(name)
820 )
821
822 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
823 # Cluster role
824 cluster_role = V1ClusterRole(
825 metadata=metadata,
826 rules=[
827 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
828 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
829 ],
830 )
831
832 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
833
834 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
835 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
836
837 def _create_service_account(
838 self,
839 kubectl: Kubectl,
840 name: str,
841 labels: Dict[str, str],
842 ):
843 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
844 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
845 )
846 if len(service_accounts.items) > 0:
847 raise Exception(
848 "Service account with metadata.name={} already exists".format(name)
849 )
850
851 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
852 service_account = V1ServiceAccount(metadata=metadata)
853
854 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
855 ADMIN_NAMESPACE, service_account
856 )
857
858 def _delete_service_account(self, kubectl: Kubectl, name: str):
859 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
860 name, ADMIN_NAMESPACE
861 )
862
863 def _create_cluster_role_binding(
864 self,
865 kubectl: Kubectl,
866 name: str,
867 labels: Dict[str, str],
868 ):
869 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
870 field_selector="metadata.name={}".format(name)
871 )
872 if len(role_bindings.items) > 0:
873 raise Exception("Generated rbac id already exists")
874
875 role_binding = V1ClusterRoleBinding(
876 metadata=V1ObjectMeta(name=name, labels=labels),
877 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
878 subjects=[
879 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
880 ],
881 )
882 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
883
884 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
885 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
886
887 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
888 v1_core = kubectl.clients[CORE_CLIENT]
889
890 retries_limit = 10
891 secret_name = None
892 while True:
893 retries_limit -= 1
894 service_accounts = v1_core.list_namespaced_service_account(
895 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
896 )
897 if len(service_accounts.items) == 0:
898 raise Exception(
899 "Service account not found with metadata.name={}".format(name)
900 )
901 service_account = service_accounts.items[0]
902 if service_account.secrets and len(service_account.secrets) > 0:
903 secret_name = service_account.secrets[0].name
904 if secret_name is not None or not retries_limit:
905 break
906 if not secret_name:
907 raise Exception(
908 "Failed getting the secret from service account {}".format(name)
909 )
910 secret = v1_core.list_namespaced_secret(
911 ADMIN_NAMESPACE,
912 field_selector="metadata.name={}".format(secret_name),
913 ).items[0]
914
915 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
916 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
917
918 return (
919 base64.b64decode(token).decode("utf-8"),
920 base64.b64decode(client_certificate_data).decode("utf-8"),
921 )
922
923 @staticmethod
924 def generate_kdu_instance_name(**kwargs):
925 db_dict = kwargs.get("db_dict")
926 kdu_name = kwargs.get("kdu_name", None)
927 if kdu_name:
928 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
929 else:
930 kdu_instance = db_dict["filter"]["_id"]
931 return kdu_instance
932
933 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
934 """
935 Get libjuju object
936
937 :param: vca_id: VCA ID
938 If None, get a libjuju object with a Connection to the default VCA
939 Else, geta libjuju object with a Connection to the specified VCA
940 """
941 if not vca_id:
942 while self.loading_libjuju.locked():
943 await asyncio.sleep(0.1)
944 if not self.libjuju:
945 async with self.loading_libjuju:
946 vca_connection = await get_connection(self._store)
947 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
948 return self.libjuju
949 else:
950 vca_connection = await get_connection(self._store, vca_id)
951 return Libjuju(
952 vca_connection,
953 loop=self.loop,
954 log=self.log,
955 n2vc=self,
956 )