+
+ # Private methods to create/delete needed resources in the
+ # Kubernetes cluster to create the K8s cloud in Juju
+
+ def _create_cluster_role(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
+ field_selector="metadata.name={}".format(name)
+ )
+
+ if len(cluster_roles.items) > 0:
+ raise Exception(
+ "Cluster role with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+ # Cluster role
+ cluster_role = V1ClusterRole(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+ V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+ ],
+ )
+
+ kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+ def _delete_cluster_role(self, kubectl: Kubectl, name: str):
+ kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+ def _create_service_account(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
+ ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) > 0:
+ raise Exception(
+ "Service account with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+ service_account = V1ServiceAccount(metadata=metadata)
+
+ kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
+ ADMIN_NAMESPACE, service_account
+ )
+
+ def _delete_service_account(self, kubectl: Kubectl, name: str):
+ kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
+ name, ADMIN_NAMESPACE
+ )
+
+ def _create_cluster_role_binding(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
+ field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception("Generated rbac id already exists")
+
+ role_binding = V1ClusterRoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+ subjects=[
+ V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
+ ],
+ )
+ kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+ def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
+ kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+ async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
+ v1_core = kubectl.clients[CORE_CLIENT]
+
+ retries_limit = 10
+ secret_name = None
+ while True:
+ retries_limit -= 1
+ service_accounts = v1_core.list_namespaced_service_account(
+ ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) == 0:
+ raise Exception(
+ "Service account not found with metadata.name={}".format(name)
+ )
+ service_account = service_accounts.items[0]
+ if service_account.secrets and len(service_account.secrets) > 0:
+ secret_name = service_account.secrets[0].name
+ if secret_name is not None or not retries_limit:
+ break
+ if not secret_name:
+ raise Exception(
+ "Failed getting the secret from service account {}".format(name)
+ )
+ secret = v1_core.list_namespaced_secret(
+ ADMIN_NAMESPACE,
+ field_selector="metadata.name={}".format(secret_name),
+ ).items[0]
+
+ token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+ client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+ return (
+ base64.b64decode(token).decode("utf-8"),
+ base64.b64decode(client_certificate_data).decode("utf-8"),
+ )
+
+ @staticmethod
+ def generate_kdu_instance_name(**kwargs):
+ db_dict = kwargs.get("db_dict")
+ kdu_name = kwargs.get("kdu_name", None)
+ if kdu_name:
+ kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
+ else:
+ kdu_instance = db_dict["filter"]["_id"]
+ return kdu_instance