Feature 10957: Add methods for creation of namespace, secret and RBAC
[osm/N2VC.git] / n2vc / kubectl.py
index 3fe6b53..7cc6ac2 100644 (file)
@@ -25,14 +25,17 @@ from kubernetes import client, config
 from kubernetes.client.api import VersionApi
 from kubernetes.client.models import (
     V1ClusterRole,
+    V1Role,
     V1ObjectMeta,
     V1PolicyRule,
     V1ServiceAccount,
     V1ClusterRoleBinding,
+    V1RoleBinding,
     V1RoleRef,
     V1Subject,
     V1Secret,
     V1SecretReference,
+    V1Namespace,
 )
 from kubernetes.client.rest import ApiException
 from n2vc.libjuju import retry_callback
@@ -163,9 +166,7 @@ class Kubectl:
         )
 
         if len(cluster_roles.items) > 0:
-            raise Exception(
-                "Cluster role with metadata.name={} already exists".format(name)
-            )
+            raise Exception("Role with metadata.name={} already exists".format(name))
 
         metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
         # Cluster role
@@ -179,6 +180,46 @@ class Kubectl:
 
         self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
 
+    async def create_role(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        api_groups: list,
+        resources: list,
+        verbs: list,
+        namespace: str,
+    ):
+        """
+        Create a role with one PolicyRule
+
+        :param: name:       Name of the namespaced Role
+        :param: labels:     Labels for namespaced Role metadata
+        :param: api_groups: List with api-groups allowed in the policy rule
+        :param: resources:  List with resources allowed in the policy rule
+        :param: verbs:      List with verbs allowed in the policy rule
+        :param: namespace:  Kubernetes namespace for Role metadata
+
+        :return: None
+        """
+
+        roles = self.clients[RBAC_CLIENT].list_namespaced_role(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+
+        if len(roles.items) > 0:
+            raise Exception("Role with metadata.name={} already exists".format(name))
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+        role = V1Role(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
+            ],
+        )
+
+        self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
+
     def delete_cluster_role(self, name: str):
         """
         Delete a cluster role
@@ -308,6 +349,44 @@ class Kubectl:
         )
         self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
 
+    async def create_role_binding(
+        self,
+        name: str,
+        role_name: str,
+        sa_name: str,
+        labels: Dict[str, str],
+        namespace: str,
+    ):
+        """
+        Create a cluster role binding
+
+        :param: name:       Name of the namespaced Role Binding
+        :param: role_name:  Name of the namespaced Role to be bound
+        :param: sa_name:    Name of the Service Account to be bound
+        :param: labels:     Labels for Role Binding metadata
+        :param: namespace:  Kubernetes namespace for Role Binding metadata
+
+        :return: None
+        """
+        role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception(
+                "Role Binding with metadata.name={} already exists".format(name)
+            )
+
+        role_binding = V1RoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
+            subjects=[
+                V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
+            ],
+        )
+        self.clients[RBAC_CLIENT].create_namespaced_role_binding(
+            namespace, role_binding
+        )
+
     def delete_cluster_role_binding(self, name: str):
         """
         Delete a cluster role binding
@@ -351,6 +430,7 @@ class Kubectl:
             raise Exception(
                 "Failed getting the secret from service account {}".format(name)
             )
+        # TODO: refactor to use get_secret_content
         secret = v1_core.list_namespaced_secret(
             namespace, field_selector="metadata.name={}".format(secret_name)
         ).items[0]
@@ -363,6 +443,53 @@ class Kubectl:
             base64.b64decode(client_certificate_data).decode("utf-8"),
         )
 
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed getting data from the secret"),
+    )
+    async def get_secret_content(
+        self,
+        name: str,
+        namespace: str,
+    ) -> dict:
+        """
+        Get secret data
+
+        :param: name:       Name of the secret
+        :param: namespace:  Name of the namespace where the secret is stored
+
+        :return: Dictionary with secret's data
+        """
+        v1_core = self.clients[CORE_CLIENT]
+
+        secret = v1_core.read_namespaced_secret(name, namespace)
+
+        return secret.data
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the secret"),
+    )
+    async def create_secret(
+        self, name: str, data: dict, namespace: str, secret_type: str
+    ):
+        """
+        Get secret data
+
+        :param: name:        Name of the secret
+        :param: data:        Dict with data content. Values must be already base64 encoded
+        :param: namespace:   Name of the namespace where the secret will be stored
+        :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+        :return: None
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        metadata = V1ObjectMeta(name=name, namespace=namespace)
+        secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+        v1_core.create_namespaced_secret(namespace, secret)
+
     async def create_certificate(
         self,
         namespace: str,
@@ -441,3 +568,49 @@ class Kubectl:
                 self.logger.warning("Certificate already deleted: {}".format(e))
             else:
                 raise e
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the namespace"),
+    )
+    async def create_namespace(self, name: str):
+        """
+        Create a namespace
+
+        :param: name:       Name of the namespace to be created
+
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        metadata = V1ObjectMeta(name=name)
+        namespace = V1Namespace(
+            metadata=metadata,
+        )
+
+        try:
+            v1_core.create_namespace(namespace)
+            self.logger.debug("Namespace created: {}".format(name))
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "alreadyexists":
+                self.logger.warning("Namespace already exists: {}".format(e))
+            else:
+                raise e
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed deleting the namespace"),
+    )
+    async def delete_namespace(self, name: str):
+        """
+        Delete a namespace
+
+        :param: name:       Name of the namespace to be deleted
+
+        """
+        try:
+            self.clients[CORE_CLIENT].delete_namespace(name)
+        except ApiException as e:
+            if e.reason == "Not Found":
+                self.logger.warning("Namespace already deleted: {}".format(e))