- def _create_cluster_role(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
- field_selector="metadata.name={}".format(name)
- )
-
- if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- # Cluster role
- cluster_role = V1ClusterRole(
- metadata=metadata,
- rules=[
- V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
- V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
- ],
- )
-
- kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
-
- def _delete_cluster_role(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
-
- def _create_service_account(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) > 0:
- raise Exception(
- "Service account with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- service_account = V1ServiceAccount(metadata=metadata)
-
- kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
- ADMIN_NAMESPACE, service_account
- )
-
- def _delete_service_account(self, kubectl: Kubectl, name: str):
- kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
- name, ADMIN_NAMESPACE
- )
-
- def _create_cluster_role_binding(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
- field_selector="metadata.name={}".format(name)
- )
- if len(role_bindings.items) > 0:
- raise Exception("Generated rbac id already exists")
-
- role_binding = V1ClusterRoleBinding(
- metadata=V1ObjectMeta(name=name, labels=labels),
- role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
- subjects=[
- V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
- ],
- )
- kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
-
- def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
-
- async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
- v1_core = kubectl.clients[CORE_CLIENT]
-
- retries_limit = 10
- secret_name = None
- while True:
- retries_limit -= 1
- service_accounts = v1_core.list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) == 0:
- raise Exception(
- "Service account not found with metadata.name={}".format(name)
- )
- service_account = service_accounts.items[0]
- if service_account.secrets and len(service_account.secrets) > 0:
- secret_name = service_account.secrets[0].name
- if secret_name is not None or not retries_limit:
- break
- if not secret_name:
- raise Exception(
- "Failed getting the secret from service account {}".format(name)
- )
- secret = v1_core.list_namespaced_secret(
- ADMIN_NAMESPACE,
- field_selector="metadata.name={}".format(secret_name),
- ).items[0]
-
- token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
- client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
-
- return (
- base64.b64decode(token).decode("utf-8"),
- base64.b64decode(client_certificate_data).decode("utf-8"),
- )
-