24b314289756c95cdc3ff84ce7d521cc3c23511e
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.config import EnvironConfig
24 from n2vc.exceptions import K8sException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
27 from .exceptions import MethodNotImplemented
28 from n2vc.libjuju import Libjuju
29 from n2vc.utils import obj_to_dict, obj_to_yaml
30 from n2vc.store import MotorStore
31 from n2vc.vca.cloud import Cloud
32 from n2vc.vca.connection import get_connection
33 from kubernetes.client.models import (
34 V1ClusterRole,
35 V1ObjectMeta,
36 V1PolicyRule,
37 V1ServiceAccount,
38 V1ClusterRoleBinding,
39 V1RoleRef,
40 V1Subject,
41 )
42
43 from typing import Dict
44
45 SERVICE_ACCOUNT_TOKEN_KEY = "token"
46 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
47 RBAC_LABEL_KEY_NAME = "rbac-id"
48
49 ADMIN_NAMESPACE = "kube-system"
50 RBAC_STACK_PREFIX = "juju-credential"
51
52
53 def generate_rbac_id():
54 return binascii.hexlify(os.urandom(4)).decode()
55
56
57 class K8sJujuConnector(K8sConnector):
58 libjuju = None
59
60 def __init__(
61 self,
62 fs: object,
63 db: object,
64 kubectl_command: str = "/usr/bin/kubectl",
65 juju_command: str = "/usr/bin/juju",
66 log: object = None,
67 loop: object = None,
68 on_update_db=None,
69 ):
70 """
71 :param fs: file system for kubernetes and helm configuration
72 :param db: Database object
73 :param kubectl_command: path to kubectl executable
74 :param helm_command: path to helm executable
75 :param log: logger
76 :param: loop: Asyncio loop
77 """
78
79 # parent class
80 K8sConnector.__init__(
81 self,
82 db,
83 log=log,
84 on_update_db=on_update_db,
85 )
86
87 self.fs = fs
88 self.loop = loop or asyncio.get_event_loop()
89 self.log.debug("Initializing K8S Juju connector")
90
91 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
92 self._store = MotorStore(db_uri)
93 self.loading_libjuju = asyncio.Lock(loop=self.loop)
94
95 self.log.debug("K8S Juju connector initialized")
96 # TODO: Remove these commented lines:
97 # self.authenticated = False
98 # self.models = {}
99 # self.juju_secret = ""
100
101 """Initialization"""
102
103 async def init_env(
104 self,
105 k8s_creds: str,
106 namespace: str = "kube-system",
107 reuse_cluster_uuid: str = None,
108 **kwargs,
109 ) -> (str, bool):
110 """
111 It prepares a given K8s cluster environment to run Juju bundles.
112
113 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 '.kube/config'
115 :param namespace: optional namespace to be used for juju. By default,
116 'kube-system' will be used
117 :param reuse_cluster_uuid: existing cluster uuid for reuse
118 :param: kwargs: Additional parameters
119 vca_id (str): VCA ID
120
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
124 """
125 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
126
127 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
128
129 kubecfg = tempfile.NamedTemporaryFile()
130 with open(kubecfg.name, "w") as kubecfg_file:
131 kubecfg_file.write(k8s_creds)
132 kubectl = Kubectl(config_file=kubecfg.name)
133
134 # CREATING RESOURCES IN K8S
135 rbac_id = generate_rbac_id()
136 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
137 labels = {RBAC_STACK_PREFIX: rbac_id}
138
139 # Create cleanup dictionary to clean up created resources
140 # if it fails in the middle of the process
141 cleanup_data = []
142 try:
143 self._create_cluster_role(
144 kubectl,
145 name=metadata_name,
146 labels=labels,
147 )
148 cleanup_data.append(
149 {
150 "delete": self._delete_cluster_role,
151 "args": (kubectl, metadata_name),
152 }
153 )
154
155 self._create_service_account(
156 kubectl,
157 name=metadata_name,
158 labels=labels,
159 )
160 cleanup_data.append(
161 {
162 "delete": self._delete_service_account,
163 "args": (kubectl, metadata_name),
164 }
165 )
166
167 self._create_cluster_role_binding(
168 kubectl,
169 name=metadata_name,
170 labels=labels,
171 )
172 cleanup_data.append(
173 {
174 "delete": self._delete_service_account,
175 "args": (kubectl, metadata_name),
176 }
177 )
178 token, client_cert_data = await self._get_secret_data(
179 kubectl,
180 metadata_name,
181 )
182
183 default_storage_class = kubectl.get_default_storage_class()
184 await libjuju.add_k8s(
185 name=cluster_uuid,
186 rbac_id=rbac_id,
187 token=token,
188 client_cert_data=client_cert_data,
189 configuration=kubectl.configuration,
190 storage_class=default_storage_class,
191 credential_name=self._get_credential_name(cluster_uuid),
192 )
193 return cluster_uuid, True
194 except Exception as e:
195 self.log.error("Error initializing k8scluster: {}".format(e))
196 if len(cleanup_data) > 0:
197 self.log.debug("Cleaning up created resources in k8s cluster...")
198 for item in cleanup_data:
199 delete_function = item["delete"]
200 delete_args = item["args"]
201 delete_function(*delete_args)
202 self.log.debug("Cleanup finished")
203 raise e
204
205 """Repo Management"""
206
207 async def repo_add(
208 self,
209 name: str,
210 url: str,
211 _type: str = "charm",
212 ):
213 raise MethodNotImplemented()
214
215 async def repo_list(self):
216 raise MethodNotImplemented()
217
218 async def repo_remove(
219 self,
220 name: str,
221 ):
222 raise MethodNotImplemented()
223
224 async def synchronize_repos(self, cluster_uuid: str, name: str):
225 """
226 Returns None as currently add_repo is not implemented
227 """
228 return None
229
230 """Reset"""
231
232 async def reset(
233 self,
234 cluster_uuid: str,
235 force: bool = False,
236 uninstall_sw: bool = False,
237 **kwargs,
238 ) -> bool:
239 """Reset a cluster
240
241 Resets the Kubernetes cluster by removing the model that represents it.
242
243 :param cluster_uuid str: The UUID of the cluster to reset
244 :param force: Force reset
245 :param uninstall_sw: Boolean to uninstall sw
246 :param: kwargs: Additional parameters
247 vca_id (str): VCA ID
248
249 :return: Returns True if successful or raises an exception.
250 """
251
252 try:
253 self.log.debug("[reset] Removing k8s cloud")
254 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
255
256 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
257
258 cloud_creds = await libjuju.get_cloud_credentials(cloud)
259
260 await libjuju.remove_cloud(cluster_uuid)
261
262 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
263
264 kubecfg_file = tempfile.NamedTemporaryFile()
265 with open(kubecfg_file.name, "w") as f:
266 f.write(kubecfg)
267 kubectl = Kubectl(config_file=kubecfg_file.name)
268
269 delete_functions = [
270 self._delete_cluster_role_binding,
271 self._delete_service_account,
272 self._delete_cluster_role,
273 ]
274
275 credential_attrs = cloud_creds[0].result["attrs"]
276 if RBAC_LABEL_KEY_NAME in credential_attrs:
277 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
278 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
279 delete_args = (kubectl, metadata_name)
280 for delete_func in delete_functions:
281 try:
282 delete_func(*delete_args)
283 except Exception as e:
284 self.log.warning("Cannot remove resource in K8s {}".format(e))
285
286 except Exception as e:
287 self.log.debug("Caught exception during reset: {}".format(e))
288 raise e
289 return True
290
291 """Deployment"""
292
293 async def install(
294 self,
295 cluster_uuid: str,
296 kdu_model: str,
297 kdu_instance: str,
298 atomic: bool = True,
299 timeout: float = 1800,
300 params: dict = None,
301 db_dict: dict = None,
302 kdu_name: str = None,
303 namespace: str = None,
304 **kwargs,
305 ) -> bool:
306 """Install a bundle
307
308 :param cluster_uuid str: The UUID of the cluster to install to
309 :param kdu_model str: The name or path of a bundle to install
310 :param kdu_instance: Kdu instance name
311 :param atomic bool: If set, waits until the model is active and resets
312 the cluster on failure.
313 :param timeout int: The time, in seconds, to wait for the install
314 to finish
315 :param params dict: Key-value pairs of instantiation parameters
316 :param kdu_name: Name of the KDU instance to be installed
317 :param namespace: K8s namespace to use for the KDU instance
318 :param kwargs: Additional parameters
319 vca_id (str): VCA ID
320
321 :return: If successful, returns ?
322 """
323 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
324 bundle = kdu_model
325
326 if not db_dict:
327 raise K8sException("db_dict must be set")
328 if not bundle:
329 raise K8sException("bundle must be set")
330
331 if bundle.startswith("cs:"):
332 pass
333 elif bundle.startswith("http"):
334 # Download the file
335 pass
336 else:
337 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
338 os.chdir(new_workdir)
339 bundle = "local:{}".format(kdu_model)
340
341 self.log.debug("Checking for model named {}".format(kdu_instance))
342
343 # Create the new model
344 self.log.debug("Adding model: {}".format(kdu_instance))
345 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
346 await libjuju.add_model(kdu_instance, cloud)
347
348 # if model:
349 # TODO: Instantiation parameters
350
351 """
352 "Juju bundle that models the KDU, in any of the following ways:
353 - <juju-repo>/<juju-bundle>
354 - <juju-bundle folder under k8s_models folder in the package>
355 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
356 in the package>
357 - <URL_where_to_fetch_juju_bundle>
358 """
359 try:
360 previous_workdir = os.getcwd()
361 except FileNotFoundError:
362 previous_workdir = "/app/storage"
363
364 self.log.debug("[install] deploying {}".format(bundle))
365 await libjuju.deploy(
366 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
367 )
368 os.chdir(previous_workdir)
369 if self.on_update_db:
370 await self.on_update_db(
371 cluster_uuid,
372 kdu_instance,
373 filter=db_dict["filter"],
374 vca_id=kwargs.get("vca_id"),
375 )
376 return True
377
378 async def scale(
379 self,
380 kdu_instance: str,
381 scale: int,
382 resource_name: str,
383 total_timeout: float = 1800,
384 **kwargs,
385 ) -> bool:
386 """Scale an application in a model
387
388 :param: kdu_instance str: KDU instance name
389 :param: scale int: Scale to which to set this application
390 :param: resource_name str: Resource name (Application name)
391 :param: timeout float: The time, in seconds, to wait for the install
392 to finish
393 :param kwargs: Additional parameters
394 vca_id (str): VCA ID
395
396 :return: If successful, returns True
397 """
398
399 try:
400 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
401 await libjuju.scale_application(
402 model_name=kdu_instance,
403 application_name=resource_name,
404 scale=scale,
405 total_timeout=total_timeout,
406 )
407 except Exception as e:
408 error_msg = "Error scaling application {} in kdu instance {}: {}".format(
409 resource_name, kdu_instance, e
410 )
411 self.log.error(error_msg)
412 raise K8sException(message=error_msg)
413 return True
414
415 async def get_scale_count(
416 self,
417 resource_name: str,
418 kdu_instance: str,
419 **kwargs,
420 ) -> int:
421 """Get an application scale count
422
423 :param: resource_name str: Resource name (Application name)
424 :param: kdu_instance str: KDU instance name
425 :param kwargs: Additional parameters
426 vca_id (str): VCA ID
427 :return: Return application instance count
428 """
429 try:
430 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
431 status = await libjuju.get_model_status(kdu_instance)
432 return len(status.applications[resource_name].units)
433 except Exception as e:
434 error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
435 resource_name, kdu_instance, e
436 )
437 self.log.error(error_msg)
438 raise K8sException(message=error_msg)
439
440 async def instances_list(self, cluster_uuid: str) -> list:
441 """
442 returns a list of deployed releases in a cluster
443
444 :param cluster_uuid: the cluster
445 :return:
446 """
447 return []
448
449 async def upgrade(
450 self,
451 cluster_uuid: str,
452 kdu_instance: str,
453 kdu_model: str = None,
454 params: dict = None,
455 ) -> str:
456 """Upgrade a model
457
458 :param cluster_uuid str: The UUID of the cluster to upgrade
459 :param kdu_instance str: The unique name of the KDU instance
460 :param kdu_model str: The name or path of the bundle to upgrade to
461 :param params dict: Key-value pairs of instantiation parameters
462
463 :return: If successful, reference to the new revision number of the
464 KDU instance.
465 """
466
467 # TODO: Loop through the bundle and upgrade each charm individually
468
469 """
470 The API doesn't have a concept of bundle upgrades, because there are
471 many possible changes: charm revision, disk, number of units, etc.
472
473 As such, we are only supporting a limited subset of upgrades. We'll
474 upgrade the charm revision but leave storage and scale untouched.
475
476 Scale changes should happen through OSM constructs, and changes to
477 storage would require a redeployment of the service, at least in this
478 initial release.
479 """
480 raise MethodNotImplemented()
481
482 """Rollback"""
483
484 async def rollback(
485 self,
486 cluster_uuid: str,
487 kdu_instance: str,
488 revision: int = 0,
489 ) -> str:
490 """Rollback a model
491
492 :param cluster_uuid str: The UUID of the cluster to rollback
493 :param kdu_instance str: The unique name of the KDU instance
494 :param revision int: The revision to revert to. If omitted, rolls back
495 the previous upgrade.
496
497 :return: If successful, returns the revision of active KDU instance,
498 or raises an exception
499 """
500 raise MethodNotImplemented()
501
502 """Deletion"""
503
504 async def uninstall(
505 self,
506 cluster_uuid: str,
507 kdu_instance: str,
508 **kwargs,
509 ) -> bool:
510 """Uninstall a KDU instance
511
512 :param cluster_uuid str: The UUID of the cluster
513 :param kdu_instance str: The unique name of the KDU instance
514 :param kwargs: Additional parameters
515 vca_id (str): VCA ID
516
517 :return: Returns True if successful, or raises an exception
518 """
519
520 self.log.debug("[uninstall] Destroying model")
521 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
522
523 await libjuju.destroy_model(kdu_instance, total_timeout=3600)
524
525 # self.log.debug("[uninstall] Model destroyed and disconnecting")
526 # await controller.disconnect()
527
528 return True
529 # TODO: Remove these commented lines
530 # if not self.authenticated:
531 # self.log.debug("[uninstall] Connecting to controller")
532 # await self.login(cluster_uuid)
533
534 async def exec_primitive(
535 self,
536 cluster_uuid: str = None,
537 kdu_instance: str = None,
538 primitive_name: str = None,
539 timeout: float = 300,
540 params: dict = None,
541 db_dict: dict = None,
542 **kwargs,
543 ) -> str:
544 """Exec primitive (Juju action)
545
546 :param cluster_uuid str: The UUID of the cluster
547 :param kdu_instance str: The unique name of the KDU instance
548 :param primitive_name: Name of action that will be executed
549 :param timeout: Timeout for action execution
550 :param params: Dictionary of all the parameters needed for the action
551 :param db_dict: Dictionary for any additional data
552 :param kwargs: Additional parameters
553 vca_id (str): VCA ID
554
555 :return: Returns the output of the action
556 """
557 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
558
559 if not params or "application-name" not in params:
560 raise K8sException(
561 "Missing application-name argument, \
562 argument needed for K8s actions"
563 )
564 try:
565 self.log.debug(
566 "[exec_primitive] Getting model "
567 "kdu_instance: {}".format(kdu_instance)
568 )
569 application_name = params["application-name"]
570 actions = await libjuju.get_actions(application_name, kdu_instance)
571 if primitive_name not in actions:
572 raise K8sException("Primitive {} not found".format(primitive_name))
573 output, status = await libjuju.execute_action(
574 application_name, kdu_instance, primitive_name, **params
575 )
576
577 if status != "completed":
578 raise K8sException(
579 "status is not completed: {} output: {}".format(status, output)
580 )
581 if self.on_update_db:
582 await self.on_update_db(
583 cluster_uuid, kdu_instance, filter=db_dict["filter"]
584 )
585
586 return output
587
588 except Exception as e:
589 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
590 self.log.error(error_msg)
591 raise K8sException(message=error_msg)
592
593 """Introspection"""
594
595 async def inspect_kdu(
596 self,
597 kdu_model: str,
598 ) -> dict:
599 """Inspect a KDU
600
601 Inspects a bundle and returns a dictionary of config parameters and
602 their default values.
603
604 :param kdu_model str: The name or path of the bundle to inspect.
605
606 :return: If successful, returns a dictionary of available parameters
607 and their default values.
608 """
609
610 kdu = {}
611 if not os.path.exists(kdu_model):
612 raise K8sException("file {} not found".format(kdu_model))
613
614 with open(kdu_model, "r") as f:
615 bundle = yaml.safe_load(f.read())
616
617 """
618 {
619 'description': 'Test bundle',
620 'bundle': 'kubernetes',
621 'applications': {
622 'mariadb-k8s': {
623 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
624 'scale': 1,
625 'options': {
626 'password': 'manopw',
627 'root_password': 'osm4u',
628 'user': 'mano'
629 },
630 'series': 'kubernetes'
631 }
632 }
633 }
634 """
635 # TODO: This should be returned in an agreed-upon format
636 kdu = bundle["applications"]
637
638 return kdu
639
640 async def help_kdu(
641 self,
642 kdu_model: str,
643 ) -> str:
644 """View the README
645
646 If available, returns the README of the bundle.
647
648 :param kdu_model str: The name or path of a bundle
649
650 :return: If found, returns the contents of the README.
651 """
652 readme = None
653
654 files = ["README", "README.txt", "README.md"]
655 path = os.path.dirname(kdu_model)
656 for file in os.listdir(path):
657 if file in files:
658 with open(file, "r") as f:
659 readme = f.read()
660 break
661
662 return readme
663
664 async def status_kdu(
665 self,
666 cluster_uuid: str,
667 kdu_instance: str,
668 complete_status: bool = False,
669 yaml_format: bool = False,
670 **kwargs,
671 ) -> dict:
672 """Get the status of the KDU
673
674 Get the current status of the KDU instance.
675
676 :param cluster_uuid str: The UUID of the cluster
677 :param kdu_instance str: The unique id of the KDU instance
678 :param complete_status: To get the complete_status of the KDU
679 :param yaml_format: To get the status in proper format for NSR record
680 :param: kwargs: Additional parameters
681 vca_id (str): VCA ID
682
683 :return: Returns a dictionary containing namespace, state, resources,
684 and deployment_time and returns complete_status if complete_status is True
685 """
686 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
687 status = {}
688
689 model_status = await libjuju.get_model_status(kdu_instance)
690
691 if not complete_status:
692 for name in model_status.applications:
693 application = model_status.applications[name]
694 status[name] = {"status": application["status"]["status"]}
695 else:
696 if yaml_format:
697 return obj_to_yaml(model_status)
698 else:
699 return obj_to_dict(model_status)
700
701 return status
702
703 async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
704 """
705 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
706
707 :param vcastatus dict: dict containing vcastatus
708 :param kdu_instance str: The unique id of the KDU instance
709 :param: kwargs: Additional parameters
710 vca_id (str): VCA ID
711
712 :return: None
713 """
714 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
715 try:
716 for model_name in vcastatus:
717 # Adding executed actions
718 vcastatus[model_name][
719 "executedActions"
720 ] = await libjuju.get_executed_actions(kdu_instance)
721
722 for application in vcastatus[model_name]["applications"]:
723 # Adding application actions
724 vcastatus[model_name]["applications"][application][
725 "actions"
726 ] = await libjuju.get_actions(application, kdu_instance)
727 # Adding application configs
728 vcastatus[model_name]["applications"][application][
729 "configs"
730 ] = await libjuju.get_application_configs(kdu_instance, application)
731
732 except Exception as e:
733 self.log.debug("Error in updating vca status: {}".format(str(e)))
734
735 async def get_services(
736 self, cluster_uuid: str, kdu_instance: str, namespace: str
737 ) -> list:
738 """Return a list of services of a kdu_instance"""
739
740 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
741
742 kubecfg = tempfile.NamedTemporaryFile()
743 with open(kubecfg.name, "w") as kubecfg_file:
744 kubecfg_file.write(credentials)
745 kubectl = Kubectl(config_file=kubecfg.name)
746
747 return kubectl.get_services(
748 field_selector="metadata.namespace={}".format(kdu_instance)
749 )
750
751 async def get_service(
752 self, cluster_uuid: str, service_name: str, namespace: str
753 ) -> object:
754 """Return data for a specific service inside a namespace"""
755
756 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
757
758 kubecfg = tempfile.NamedTemporaryFile()
759 with open(kubecfg.name, "w") as kubecfg_file:
760 kubecfg_file.write(credentials)
761 kubectl = Kubectl(config_file=kubecfg.name)
762
763 return kubectl.get_services(
764 field_selector="metadata.name={},metadata.namespace={}".format(
765 service_name, namespace
766 )
767 )[0]
768
769 def get_credentials(self, cluster_uuid: str) -> str:
770 """
771 Get Cluster Kubeconfig
772 """
773 k8scluster = self.db.get_one(
774 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
775 )
776
777 self.db.encrypt_decrypt_fields(
778 k8scluster.get("credentials"),
779 "decrypt",
780 ["password", "secret"],
781 schema_version=k8scluster["schema_version"],
782 salt=k8scluster["_id"],
783 )
784
785 return yaml.safe_dump(k8scluster.get("credentials"))
786
787 def _get_credential_name(self, cluster_uuid: str) -> str:
788 """
789 Get credential name for a k8s cloud
790
791 We cannot use the cluster_uuid for the credential name directly,
792 because it cannot start with a number, it must start with a letter.
793 Therefore, the k8s cloud credential name will be "cred-" followed
794 by the cluster uuid.
795
796 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
797
798 :return: Name to use for the credential name.
799 """
800 return "cred-{}".format(cluster_uuid)
801
802 def get_namespace(
803 self,
804 cluster_uuid: str,
805 ) -> str:
806 """Get the namespace UUID
807 Gets the namespace's unique name
808
809 :param cluster_uuid str: The UUID of the cluster
810 :returns: The namespace UUID, or raises an exception
811 """
812 pass
813
814 def _create_cluster_role(
815 self,
816 kubectl: Kubectl,
817 name: str,
818 labels: Dict[str, str],
819 ):
820 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
821 field_selector="metadata.name={}".format(name)
822 )
823
824 if len(cluster_roles.items) > 0:
825 raise Exception(
826 "Cluster role with metadata.name={} already exists".format(name)
827 )
828
829 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
830 # Cluster role
831 cluster_role = V1ClusterRole(
832 metadata=metadata,
833 rules=[
834 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
835 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
836 ],
837 )
838
839 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
840
841 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
842 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
843
844 def _create_service_account(
845 self,
846 kubectl: Kubectl,
847 name: str,
848 labels: Dict[str, str],
849 ):
850 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
851 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
852 )
853 if len(service_accounts.items) > 0:
854 raise Exception(
855 "Service account with metadata.name={} already exists".format(name)
856 )
857
858 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
859 service_account = V1ServiceAccount(metadata=metadata)
860
861 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
862 ADMIN_NAMESPACE, service_account
863 )
864
865 def _delete_service_account(self, kubectl: Kubectl, name: str):
866 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
867 name, ADMIN_NAMESPACE
868 )
869
870 def _create_cluster_role_binding(
871 self,
872 kubectl: Kubectl,
873 name: str,
874 labels: Dict[str, str],
875 ):
876 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
877 field_selector="metadata.name={}".format(name)
878 )
879 if len(role_bindings.items) > 0:
880 raise Exception("Generated rbac id already exists")
881
882 role_binding = V1ClusterRoleBinding(
883 metadata=V1ObjectMeta(name=name, labels=labels),
884 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
885 subjects=[
886 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
887 ],
888 )
889 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
890
891 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
892 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
893
894 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
895 v1_core = kubectl.clients[CORE_CLIENT]
896
897 retries_limit = 10
898 secret_name = None
899 while True:
900 retries_limit -= 1
901 service_accounts = v1_core.list_namespaced_service_account(
902 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
903 )
904 if len(service_accounts.items) == 0:
905 raise Exception(
906 "Service account not found with metadata.name={}".format(name)
907 )
908 service_account = service_accounts.items[0]
909 if service_account.secrets and len(service_account.secrets) > 0:
910 secret_name = service_account.secrets[0].name
911 if secret_name is not None or not retries_limit:
912 break
913 if not secret_name:
914 raise Exception(
915 "Failed getting the secret from service account {}".format(name)
916 )
917 secret = v1_core.list_namespaced_secret(
918 ADMIN_NAMESPACE,
919 field_selector="metadata.name={}".format(secret_name),
920 ).items[0]
921
922 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
923 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
924
925 return (
926 base64.b64decode(token).decode("utf-8"),
927 base64.b64decode(client_certificate_data).decode("utf-8"),
928 )
929
930 @staticmethod
931 def generate_kdu_instance_name(**kwargs):
932 db_dict = kwargs.get("db_dict")
933 kdu_name = kwargs.get("kdu_name", None)
934 if kdu_name:
935 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
936 else:
937 kdu_instance = db_dict["filter"]["_id"]
938 return kdu_instance
939
940 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
941 """
942 Get libjuju object
943
944 :param: vca_id: VCA ID
945 If None, get a libjuju object with a Connection to the default VCA
946 Else, geta libjuju object with a Connection to the specified VCA
947 """
948 if not vca_id:
949 while self.loading_libjuju.locked():
950 await asyncio.sleep(0.1)
951 if not self.libjuju:
952 async with self.loading_libjuju:
953 vca_connection = await get_connection(self._store)
954 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
955 return self.libjuju
956 else:
957 vca_connection = await get_connection(self._store, vca_id)
958 return Libjuju(
959 vca_connection,
960 loop=self.loop,
961 log=self.log,
962 n2vc=self,
963 )