Feature 10957: Add methods for creation of namespace, secret and RBAC 75/13275/4
authorGabriel Cuba <gcuba@whitestack.com>
Wed, 26 Apr 2023 00:26:19 +0000 (19:26 -0500)
committerGabriel Cuba <gcuba@whitestack.com>
Mon, 22 May 2023 20:31:30 +0000 (15:31 -0500)
Change-Id: Iada9d3caa9e8a926e421fe96894be618f36fb37e
Signed-off-by: Gabriel Cuba <gcuba@whitestack.com>
n2vc/k8s_helm3_conn.py
n2vc/k8s_helm_base_conn.py
n2vc/kubectl.py
n2vc/tests/unit/test_kubectl.py

index 037ed66..4baadae 100644 (file)
@@ -118,6 +118,7 @@ class K8sHelm3Connector(K8sHelmBaseConnector):
         if namespace and namespace != "kube-system":
             if not await self._namespace_exists(cluster_uuid, namespace):
                 try:
+                    # TODO: refactor to use kubernetes API client
                     await self._create_namespace(cluster_uuid, namespace)
                 except Exception as e:
                     if not await self._namespace_exists(cluster_uuid, namespace):
@@ -314,6 +315,7 @@ class K8sHelm3Connector(K8sHelmBaseConnector):
         if namespace != "kube-system":
             namespaces = await self._get_namespaces(cluster_id)
             if namespace not in namespaces:
+                # TODO: refactor to use kubernetes API client
                 await self._create_namespace(cluster_id, namespace)
 
         repo_list = await self.repo_list(cluster_id)
index 5588c3d..34d3129 100644 (file)
@@ -2113,3 +2113,122 @@ class K8sHelmBaseConnector(K8sConnector):
         )
         kubectl = Kubectl(config_file=paths["kube_config"])
         await kubectl.delete_certificate(namespace, certificate_name)
+
+    async def create_namespace(
+        self,
+        namespace,
+        cluster_uuid,
+    ):
+        """
+        Create a namespace in a specific cluster
+
+        :param namespace: namespace to be created
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_namespace(
+            name=namespace,
+        )
+
+    async def delete_namespace(
+        self,
+        namespace,
+        cluster_uuid,
+    ):
+        """
+        Delete a namespace in a specific cluster
+
+        :param namespace: namespace to be deleted
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.delete_namespace(
+            name=namespace,
+        )
+
+    async def copy_secret_data(
+        self,
+        src_secret: str,
+        dst_secret: str,
+        cluster_uuid: str,
+        data_key: str,
+        src_namespace: str = "osm",
+        dst_namespace: str = "osm",
+    ):
+        """
+        Copy a single key and value from an existing secret to a new one
+
+        :param src_secret: name of the existing secret
+        :param dst_secret: name of the new secret
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param data_key: key of the existing secret to be copied
+        :param src_namespace: Namespace of the existing secret
+        :param dst_namespace: Namespace of the new secret
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        secret_data = await kubectl.get_secret_content(
+            name=src_secret,
+            namespace=src_namespace,
+        )
+        # Only the corresponding data_key value needs to be copy
+        data = {data_key: secret_data.get(data_key)}
+        await kubectl.create_secret(
+            name=dst_secret,
+            data=data,
+            namespace=dst_namespace,
+            secret_type="Opaque",
+        )
+
+    async def setup_default_rbac(
+        self,
+        name,
+        namespace,
+        cluster_uuid,
+        api_groups,
+        resources,
+        verbs,
+        service_account,
+    ):
+        """
+        Create a basic RBAC for a new namespace.
+
+        :param name: name of both Role and Role Binding
+        :param namespace: K8s namespace
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param api_groups: Api groups to be allowed in Policy Rule
+        :param resources: Resources to be allowed in Policy Rule
+        :param verbs: Verbs to be allowed in Policy Rule
+        :param service_account: Service Account name used to bind the Role
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_role(
+            name=name,
+            labels={},
+            namespace=namespace,
+            api_groups=api_groups,
+            resources=resources,
+            verbs=verbs,
+        )
+        await kubectl.create_role_binding(
+            name=name,
+            labels={},
+            namespace=namespace,
+            role_name=name,
+            sa_name=service_account,
+        )
index 3fe6b53..7cc6ac2 100644 (file)
@@ -25,14 +25,17 @@ from kubernetes import client, config
 from kubernetes.client.api import VersionApi
 from kubernetes.client.models import (
     V1ClusterRole,
+    V1Role,
     V1ObjectMeta,
     V1PolicyRule,
     V1ServiceAccount,
     V1ClusterRoleBinding,
+    V1RoleBinding,
     V1RoleRef,
     V1Subject,
     V1Secret,
     V1SecretReference,
+    V1Namespace,
 )
 from kubernetes.client.rest import ApiException
 from n2vc.libjuju import retry_callback
@@ -163,9 +166,7 @@ class Kubectl:
         )
 
         if len(cluster_roles.items) > 0:
-            raise Exception(
-                "Cluster role with metadata.name={} already exists".format(name)
-            )
+            raise Exception("Role with metadata.name={} already exists".format(name))
 
         metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
         # Cluster role
@@ -179,6 +180,46 @@ class Kubectl:
 
         self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
 
+    async def create_role(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        api_groups: list,
+        resources: list,
+        verbs: list,
+        namespace: str,
+    ):
+        """
+        Create a role with one PolicyRule
+
+        :param: name:       Name of the namespaced Role
+        :param: labels:     Labels for namespaced Role metadata
+        :param: api_groups: List with api-groups allowed in the policy rule
+        :param: resources:  List with resources allowed in the policy rule
+        :param: verbs:      List with verbs allowed in the policy rule
+        :param: namespace:  Kubernetes namespace for Role metadata
+
+        :return: None
+        """
+
+        roles = self.clients[RBAC_CLIENT].list_namespaced_role(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+
+        if len(roles.items) > 0:
+            raise Exception("Role with metadata.name={} already exists".format(name))
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+        role = V1Role(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
+            ],
+        )
+
+        self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
+
     def delete_cluster_role(self, name: str):
         """
         Delete a cluster role
@@ -308,6 +349,44 @@ class Kubectl:
         )
         self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
 
+    async def create_role_binding(
+        self,
+        name: str,
+        role_name: str,
+        sa_name: str,
+        labels: Dict[str, str],
+        namespace: str,
+    ):
+        """
+        Create a cluster role binding
+
+        :param: name:       Name of the namespaced Role Binding
+        :param: role_name:  Name of the namespaced Role to be bound
+        :param: sa_name:    Name of the Service Account to be bound
+        :param: labels:     Labels for Role Binding metadata
+        :param: namespace:  Kubernetes namespace for Role Binding metadata
+
+        :return: None
+        """
+        role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception(
+                "Role Binding with metadata.name={} already exists".format(name)
+            )
+
+        role_binding = V1RoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
+            subjects=[
+                V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
+            ],
+        )
+        self.clients[RBAC_CLIENT].create_namespaced_role_binding(
+            namespace, role_binding
+        )
+
     def delete_cluster_role_binding(self, name: str):
         """
         Delete a cluster role binding
@@ -351,6 +430,7 @@ class Kubectl:
             raise Exception(
                 "Failed getting the secret from service account {}".format(name)
             )
+        # TODO: refactor to use get_secret_content
         secret = v1_core.list_namespaced_secret(
             namespace, field_selector="metadata.name={}".format(secret_name)
         ).items[0]
@@ -363,6 +443,53 @@ class Kubectl:
             base64.b64decode(client_certificate_data).decode("utf-8"),
         )
 
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed getting data from the secret"),
+    )
+    async def get_secret_content(
+        self,
+        name: str,
+        namespace: str,
+    ) -> dict:
+        """
+        Get secret data
+
+        :param: name:       Name of the secret
+        :param: namespace:  Name of the namespace where the secret is stored
+
+        :return: Dictionary with secret's data
+        """
+        v1_core = self.clients[CORE_CLIENT]
+
+        secret = v1_core.read_namespaced_secret(name, namespace)
+
+        return secret.data
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the secret"),
+    )
+    async def create_secret(
+        self, name: str, data: dict, namespace: str, secret_type: str
+    ):
+        """
+        Get secret data
+
+        :param: name:        Name of the secret
+        :param: data:        Dict with data content. Values must be already base64 encoded
+        :param: namespace:   Name of the namespace where the secret will be stored
+        :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+        :return: None
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        metadata = V1ObjectMeta(name=name, namespace=namespace)
+        secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+        v1_core.create_namespaced_secret(namespace, secret)
+
     async def create_certificate(
         self,
         namespace: str,
@@ -441,3 +568,49 @@ class Kubectl:
                 self.logger.warning("Certificate already deleted: {}".format(e))
             else:
                 raise e
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the namespace"),
+    )
+    async def create_namespace(self, name: str):
+        """
+        Create a namespace
+
+        :param: name:       Name of the namespace to be created
+
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        metadata = V1ObjectMeta(name=name)
+        namespace = V1Namespace(
+            metadata=metadata,
+        )
+
+        try:
+            v1_core.create_namespace(namespace)
+            self.logger.debug("Namespace created: {}".format(name))
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "alreadyexists":
+                self.logger.warning("Namespace already exists: {}".format(e))
+            else:
+                raise e
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed deleting the namespace"),
+    )
+    async def delete_namespace(self, name: str):
+        """
+        Delete a namespace
+
+        :param: name:       Name of the namespace to be deleted
+
+        """
+        try:
+            self.clients[CORE_CLIENT].delete_namespace(name)
+        except ApiException as e:
+            if e.reason == "Not Found":
+                self.logger.warning("Namespace already deleted: {}".format(e))
index d970bf0..6ba074f 100644 (file)
@@ -24,6 +24,12 @@ from kubernetes.client import (
     V1Secret,
     V1ServiceAccount,
     V1SecretReference,
+    V1Role,
+    V1RoleBinding,
+    V1RoleRef,
+    V1Subject,
+    V1PolicyRule,
+    V1Namespace,
 )
 
 
@@ -93,6 +99,24 @@ class FakeK8sSecretList:
         return self._items
 
 
+class FakeK8sRoleList:
+    def __init__(self, items=[]):
+        self._items = items
+
+    @property
+    def items(self):
+        return self._items
+
+
+class FakeK8sRoleBindingList:
+    def __init__(self, items=[]):
+        self._items = items
+
+    @property
+    def items(self):
+        return self._items
+
+
 class FakeK8sVersionApiCode:
     def __init__(self, major: str, minor: str):
         self._major = major
@@ -555,3 +579,261 @@ class DeleteCertificateClass(asynctest.TestCase):
                 namespace=self.namespace,
                 object_name=self.object_name,
             )
+
+
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role")
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role")
+class CreateRoleClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateRoleClass, self).setUp()
+        self.name = "role"
+        self.namespace = "osm"
+        self.resources = ["*"]
+        self.api_groups = ["*"]
+        self.verbs = ["*"]
+        self.labels = {}
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def assert_create_role(self, mock_create_role):
+        metadata = V1ObjectMeta(
+            name=self.name, labels=self.labels, namespace=self.namespace
+        )
+        role = V1Role(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(
+                    api_groups=self.api_groups,
+                    resources=self.resources,
+                    verbs=self.verbs,
+                ),
+            ],
+        )
+        await self.kubectl.create_role(
+            namespace=self.namespace,
+            api_groups=self.api_groups,
+            name=self.name,
+            resources=self.resources,
+            verbs=self.verbs,
+            labels=self.labels,
+        )
+        mock_create_role.assert_called_once_with(self.namespace, role)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_raise_exception_if_role_already_exists(
+        self,
+        mock_list_role,
+        mock_create_role,
+    ):
+        mock_list_role.return_value = FakeK8sRoleList(items=[1])
+        with self.assertRaises(Exception) as context:
+            await self.kubectl.create_role(
+                self.name,
+                self.labels,
+                self.api_groups,
+                self.resources,
+                self.verbs,
+                self.namespace,
+            )
+        self.assertTrue(
+            "Role with metadata.name={} already exists".format(self.name)
+            in str(context.exception)
+        )
+        mock_create_role.assert_not_called()
+
+
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role_binding")
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role_binding")
+class CreateRoleBindingClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateRoleBindingClass, self).setUp()
+        self.name = "rolebinding"
+        self.namespace = "osm"
+        self.role_name = "role"
+        self.sa_name = "Default"
+        self.labels = {}
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def assert_create_role_binding(self, mock_create_role_binding):
+        role_binding = V1RoleBinding(
+            metadata=V1ObjectMeta(name=self.name, labels=self.labels),
+            role_ref=V1RoleRef(kind="Role", name=self.role_name, api_group=""),
+            subjects=[
+                V1Subject(
+                    kind="ServiceAccount",
+                    name=self.sa_name,
+                    namespace=self.namespace,
+                )
+            ],
+        )
+        await self.kubectl.create_role_binding(
+            namespace=self.namespace,
+            role_name=self.role_name,
+            name=self.name,
+            sa_name=self.sa_name,
+            labels=self.labels,
+        )
+        mock_create_role_binding.assert_called_once_with(self.namespace, role_binding)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_raise_exception_if_role_binding_already_exists(
+        self,
+        mock_list_role_binding,
+        mock_create_role_binding,
+    ):
+        mock_list_role_binding.return_value = FakeK8sRoleBindingList(items=[1])
+        with self.assertRaises(Exception) as context:
+            await self.kubectl.create_role_binding(
+                self.name,
+                self.role_name,
+                self.sa_name,
+                self.labels,
+                self.namespace,
+            )
+        self.assertTrue(
+            "Role Binding with metadata.name={} already exists".format(self.name)
+            in str(context.exception)
+        )
+        mock_create_role_binding.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret")
+class CreateSecretClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateSecretClass, self).setUp()
+        self.name = "secret"
+        self.namespace = "osm"
+        self.data = {"test": "1234"}
+        self.secret_type = "Opaque"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def assert_create_secret(self, mock_create_secret):
+        secret_metadata = V1ObjectMeta(name=self.name, namespace=self.namespace)
+        secret = V1Secret(
+            metadata=secret_metadata,
+            data=self.data,
+            type=self.secret_type,
+        )
+        await self.kubectl.create_secret(
+            namespace=self.namespace,
+            data=self.data,
+            name=self.name,
+            secret_type=self.secret_type,
+        )
+        mock_create_secret.assert_called_once_with(self.namespace, secret)
+
+
+@mock.patch("kubernetes.client.CoreV1Api.create_namespace")
+class CreateNamespaceClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateNamespaceClass, self).setUp()
+        self.namespace = "osm"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_namespace_is_created(
+        self,
+        mock_create_namespace,
+    ):
+        metadata = V1ObjectMeta(name=self.namespace)
+        namespace = V1Namespace(
+            metadata=metadata,
+        )
+        await self.kubectl.create_namespace(
+            name=self.namespace,
+        )
+        mock_create_namespace.assert_called_once_with(namespace)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_no_exception_if_alreadyexists(
+        self,
+        mock_create_namespace,
+    ):
+        api_exception = ApiException()
+        api_exception.body = '{"reason": "AlreadyExists"}'
+        self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = api_exception
+        raised = False
+        try:
+            await self.kubectl.create_namespace(
+                name=self.namespace,
+            )
+        except Exception:
+            raised = True
+        self.assertFalse(raised, "An exception was raised")
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_other_exceptions(
+        self,
+        mock_create_namespace,
+    ):
+        self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = Exception()
+        with self.assertRaises(Exception):
+            await self.kubectl.create_namespace(
+                name=self.namespace,
+            )
+
+
+@mock.patch("kubernetes.client.CoreV1Api.delete_namespace")
+class DeleteNamespaceClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(DeleteNamespaceClass, self).setUp()
+        self.namespace = "osm"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_no_exception_if_notfound(
+        self,
+        mock_delete_namespace,
+    ):
+        api_exception = ApiException()
+        api_exception.body = '{"reason": "NotFound"}'
+        self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = api_exception
+        raised = False
+        try:
+            await self.kubectl.delete_namespace(
+                name=self.namespace,
+            )
+        except Exception:
+            raised = True
+        self.assertFalse(raised, "An exception was raised")
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_other_exceptions(
+        self,
+        mock_delete_namespace,
+    ):
+        self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = Exception()
+        with self.assertRaises(Exception):
+            await self.kubectl.delete_namespace(
+                name=self.namespace,
+            )
+
+
+@mock.patch("kubernetes.client.CoreV1Api.read_namespaced_secret")
+class GetSecretContentClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(GetSecretContentClass, self).setUp()
+        self.name = "my_secret"
+        self.namespace = "osm"
+        self.data = {"my_key": "my_value"}
+        self.type = "Opaque"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_return_type_is_dict(
+        self,
+        mock_read_namespaced_secret,
+    ):
+        metadata = V1ObjectMeta(name=self.name, namespace=self.namespace)
+        secret = V1Secret(metadata=metadata, data=self.data, type=self.type)
+        mock_read_namespaced_secret.return_value = secret
+        content = await self.kubectl.get_secret_content(self.name, self.namespace)
+        assert type(content) is dict