from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
+ V1Role,
V1ObjectMeta,
V1PolicyRule,
V1ServiceAccount,
V1ClusterRoleBinding,
+ V1RoleBinding,
V1RoleRef,
V1Subject,
V1Secret,
V1SecretReference,
+ V1Namespace,
)
from kubernetes.client.rest import ApiException
from n2vc.libjuju import retry_callback
)
if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
+ raise Exception("Role with metadata.name={} already exists".format(name))
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
# Cluster role
self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+ async def create_role(
+ self,
+ name: str,
+ labels: Dict[str, str],
+ api_groups: list,
+ resources: list,
+ verbs: list,
+ namespace: str,
+ ):
+ """
+ Create a role with one PolicyRule
+
+ :param: name: Name of the namespaced Role
+ :param: labels: Labels for namespaced Role metadata
+ :param: api_groups: List with api-groups allowed in the policy rule
+ :param: resources: List with resources allowed in the policy rule
+ :param: verbs: List with verbs allowed in the policy rule
+ :param: namespace: Kubernetes namespace for Role metadata
+
+ :return: None
+ """
+
+ roles = self.clients[RBAC_CLIENT].list_namespaced_role(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+
+ if len(roles.items) > 0:
+ raise Exception("Role with metadata.name={} already exists".format(name))
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+ role = V1Role(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
+ ],
+ )
+
+ self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
+
def delete_cluster_role(self, name: str):
"""
Delete a cluster role
)
self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+ async def create_role_binding(
+ self,
+ name: str,
+ role_name: str,
+ sa_name: str,
+ labels: Dict[str, str],
+ namespace: str,
+ ):
+ """
+ Create a cluster role binding
+
+ :param: name: Name of the namespaced Role Binding
+ :param: role_name: Name of the namespaced Role to be bound
+ :param: sa_name: Name of the Service Account to be bound
+ :param: labels: Labels for Role Binding metadata
+ :param: namespace: Kubernetes namespace for Role Binding metadata
+
+ :return: None
+ """
+ role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception(
+ "Role Binding with metadata.name={} already exists".format(name)
+ )
+
+ role_binding = V1RoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
+ subjects=[
+ V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
+ ],
+ )
+ self.clients[RBAC_CLIENT].create_namespaced_role_binding(
+ namespace, role_binding
+ )
+
def delete_cluster_role_binding(self, name: str):
"""
Delete a cluster role binding
raise Exception(
"Failed getting the secret from service account {}".format(name)
)
+ # TODO: refactor to use get_secret_content
secret = v1_core.list_namespaced_secret(
namespace, field_selector="metadata.name={}".format(secret_name)
).items[0]
base64.b64decode(client_certificate_data).decode("utf-8"),
)
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed getting data from the secret"),
+ )
+ async def get_secret_content(
+ self,
+ name: str,
+ namespace: str,
+ ) -> dict:
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: namespace: Name of the namespace where the secret is stored
+
+ :return: Dictionary with secret's data
+ """
+ v1_core = self.clients[CORE_CLIENT]
+
+ secret = v1_core.read_namespaced_secret(name, namespace)
+
+ return secret.data
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the secret"),
+ )
+ async def create_secret(
+ self, name: str, data: dict, namespace: str, secret_type: str
+ ):
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: data: Dict with data content. Values must be already base64 encoded
+ :param: namespace: Name of the namespace where the secret will be stored
+ :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+ :return: None
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, namespace=namespace)
+ secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
async def create_certificate(
self,
namespace: str,
self.logger.warning("Certificate already deleted: {}".format(e))
else:
raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the namespace"),
+ )
+ async def create_namespace(self, name: str, labels: dict = None):
+ """
+ Create a namespace
+
+ :param: name: Name of the namespace to be created
+ :param: labels: Dictionary with labels for the new namespace
+
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, labels=labels)
+ namespace = V1Namespace(
+ metadata=metadata,
+ )
+
+ try:
+ v1_core.create_namespace(namespace)
+ self.logger.debug("Namespace created: {}".format(name))
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Namespace already exists: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed deleting the namespace"),
+ )
+ async def delete_namespace(self, name: str):
+ """
+ Delete a namespace
+
+ :param: name: Name of the namespace to be deleted
+
+ """
+ try:
+ self.clients[CORE_CLIENT].delete_namespace(name)
+ except ApiException as e:
+ if e.reason == "Not Found":
+ self.logger.warning("Namespace already deleted: {}".format(e))