if namespace and namespace != "kube-system":
if not await self._namespace_exists(cluster_uuid, namespace):
try:
+ # TODO: refactor to use kubernetes API client
await self._create_namespace(cluster_uuid, namespace)
except Exception as e:
if not await self._namespace_exists(cluster_uuid, namespace):
if namespace != "kube-system":
namespaces = await self._get_namespaces(cluster_id)
if namespace not in namespaces:
+ # TODO: refactor to use kubernetes API client
await self._create_namespace(cluster_id, namespace)
repo_list = await self.repo_list(cluster_id)
)
kubectl = Kubectl(config_file=paths["kube_config"])
await kubectl.delete_certificate(namespace, certificate_name)
+
+ async def create_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ ):
+ """
+ Create a namespace in a specific cluster
+
+ :param namespace: namespace to be created
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_namespace(
+ name=namespace,
+ )
+
+ async def delete_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ ):
+ """
+ Delete a namespace in a specific cluster
+
+ :param namespace: namespace to be deleted
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_namespace(
+ name=namespace,
+ )
+
+ async def copy_secret_data(
+ self,
+ src_secret: str,
+ dst_secret: str,
+ cluster_uuid: str,
+ data_key: str,
+ src_namespace: str = "osm",
+ dst_namespace: str = "osm",
+ ):
+ """
+ Copy a single key and value from an existing secret to a new one
+
+ :param src_secret: name of the existing secret
+ :param dst_secret: name of the new secret
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param data_key: key of the existing secret to be copied
+ :param src_namespace: Namespace of the existing secret
+ :param dst_namespace: Namespace of the new secret
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ secret_data = await kubectl.get_secret_content(
+ name=src_secret,
+ namespace=src_namespace,
+ )
+ # Only the corresponding data_key value needs to be copy
+ data = {data_key: secret_data.get(data_key)}
+ await kubectl.create_secret(
+ name=dst_secret,
+ data=data,
+ namespace=dst_namespace,
+ secret_type="Opaque",
+ )
+
+ async def setup_default_rbac(
+ self,
+ name,
+ namespace,
+ cluster_uuid,
+ api_groups,
+ resources,
+ verbs,
+ service_account,
+ ):
+ """
+ Create a basic RBAC for a new namespace.
+
+ :param name: name of both Role and Role Binding
+ :param namespace: K8s namespace
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param api_groups: Api groups to be allowed in Policy Rule
+ :param resources: Resources to be allowed in Policy Rule
+ :param verbs: Verbs to be allowed in Policy Rule
+ :param service_account: Service Account name used to bind the Role
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_role(
+ name=name,
+ labels={},
+ namespace=namespace,
+ api_groups=api_groups,
+ resources=resources,
+ verbs=verbs,
+ )
+ await kubectl.create_role_binding(
+ name=name,
+ labels={},
+ namespace=namespace,
+ role_name=name,
+ sa_name=service_account,
+ )
from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
+ V1Role,
V1ObjectMeta,
V1PolicyRule,
V1ServiceAccount,
V1ClusterRoleBinding,
+ V1RoleBinding,
V1RoleRef,
V1Subject,
V1Secret,
V1SecretReference,
+ V1Namespace,
)
from kubernetes.client.rest import ApiException
from n2vc.libjuju import retry_callback
)
if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
+ raise Exception("Role with metadata.name={} already exists".format(name))
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
# Cluster role
self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+ async def create_role(
+ self,
+ name: str,
+ labels: Dict[str, str],
+ api_groups: list,
+ resources: list,
+ verbs: list,
+ namespace: str,
+ ):
+ """
+ Create a role with one PolicyRule
+
+ :param: name: Name of the namespaced Role
+ :param: labels: Labels for namespaced Role metadata
+ :param: api_groups: List with api-groups allowed in the policy rule
+ :param: resources: List with resources allowed in the policy rule
+ :param: verbs: List with verbs allowed in the policy rule
+ :param: namespace: Kubernetes namespace for Role metadata
+
+ :return: None
+ """
+
+ roles = self.clients[RBAC_CLIENT].list_namespaced_role(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+
+ if len(roles.items) > 0:
+ raise Exception("Role with metadata.name={} already exists".format(name))
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+ role = V1Role(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
+ ],
+ )
+
+ self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
+
def delete_cluster_role(self, name: str):
"""
Delete a cluster role
)
self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+ async def create_role_binding(
+ self,
+ name: str,
+ role_name: str,
+ sa_name: str,
+ labels: Dict[str, str],
+ namespace: str,
+ ):
+ """
+ Create a cluster role binding
+
+ :param: name: Name of the namespaced Role Binding
+ :param: role_name: Name of the namespaced Role to be bound
+ :param: sa_name: Name of the Service Account to be bound
+ :param: labels: Labels for Role Binding metadata
+ :param: namespace: Kubernetes namespace for Role Binding metadata
+
+ :return: None
+ """
+ role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception(
+ "Role Binding with metadata.name={} already exists".format(name)
+ )
+
+ role_binding = V1RoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
+ subjects=[
+ V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
+ ],
+ )
+ self.clients[RBAC_CLIENT].create_namespaced_role_binding(
+ namespace, role_binding
+ )
+
def delete_cluster_role_binding(self, name: str):
"""
Delete a cluster role binding
raise Exception(
"Failed getting the secret from service account {}".format(name)
)
+ # TODO: refactor to use get_secret_content
secret = v1_core.list_namespaced_secret(
namespace, field_selector="metadata.name={}".format(secret_name)
).items[0]
base64.b64decode(client_certificate_data).decode("utf-8"),
)
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed getting data from the secret"),
+ )
+ async def get_secret_content(
+ self,
+ name: str,
+ namespace: str,
+ ) -> dict:
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: namespace: Name of the namespace where the secret is stored
+
+ :return: Dictionary with secret's data
+ """
+ v1_core = self.clients[CORE_CLIENT]
+
+ secret = v1_core.read_namespaced_secret(name, namespace)
+
+ return secret.data
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the secret"),
+ )
+ async def create_secret(
+ self, name: str, data: dict, namespace: str, secret_type: str
+ ):
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: data: Dict with data content. Values must be already base64 encoded
+ :param: namespace: Name of the namespace where the secret will be stored
+ :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+ :return: None
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, namespace=namespace)
+ secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
async def create_certificate(
self,
namespace: str,
self.logger.warning("Certificate already deleted: {}".format(e))
else:
raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the namespace"),
+ )
+ async def create_namespace(self, name: str):
+ """
+ Create a namespace
+
+ :param: name: Name of the namespace to be created
+
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name)
+ namespace = V1Namespace(
+ metadata=metadata,
+ )
+
+ try:
+ v1_core.create_namespace(namespace)
+ self.logger.debug("Namespace created: {}".format(name))
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Namespace already exists: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed deleting the namespace"),
+ )
+ async def delete_namespace(self, name: str):
+ """
+ Delete a namespace
+
+ :param: name: Name of the namespace to be deleted
+
+ """
+ try:
+ self.clients[CORE_CLIENT].delete_namespace(name)
+ except ApiException as e:
+ if e.reason == "Not Found":
+ self.logger.warning("Namespace already deleted: {}".format(e))
V1Secret,
V1ServiceAccount,
V1SecretReference,
+ V1Role,
+ V1RoleBinding,
+ V1RoleRef,
+ V1Subject,
+ V1PolicyRule,
+ V1Namespace,
)
return self._items
+class FakeK8sRoleList:
+ def __init__(self, items=[]):
+ self._items = items
+
+ @property
+ def items(self):
+ return self._items
+
+
+class FakeK8sRoleBindingList:
+ def __init__(self, items=[]):
+ self._items = items
+
+ @property
+ def items(self):
+ return self._items
+
+
class FakeK8sVersionApiCode:
def __init__(self, major: str, minor: str):
self._major = major
namespace=self.namespace,
object_name=self.object_name,
)
+
+
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role")
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role")
+class CreateRoleClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateRoleClass, self).setUp()
+ self.name = "role"
+ self.namespace = "osm"
+ self.resources = ["*"]
+ self.api_groups = ["*"]
+ self.verbs = ["*"]
+ self.labels = {}
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def assert_create_role(self, mock_create_role):
+ metadata = V1ObjectMeta(
+ name=self.name, labels=self.labels, namespace=self.namespace
+ )
+ role = V1Role(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(
+ api_groups=self.api_groups,
+ resources=self.resources,
+ verbs=self.verbs,
+ ),
+ ],
+ )
+ await self.kubectl.create_role(
+ namespace=self.namespace,
+ api_groups=self.api_groups,
+ name=self.name,
+ resources=self.resources,
+ verbs=self.verbs,
+ labels=self.labels,
+ )
+ mock_create_role.assert_called_once_with(self.namespace, role)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_raise_exception_if_role_already_exists(
+ self,
+ mock_list_role,
+ mock_create_role,
+ ):
+ mock_list_role.return_value = FakeK8sRoleList(items=[1])
+ with self.assertRaises(Exception) as context:
+ await self.kubectl.create_role(
+ self.name,
+ self.labels,
+ self.api_groups,
+ self.resources,
+ self.verbs,
+ self.namespace,
+ )
+ self.assertTrue(
+ "Role with metadata.name={} already exists".format(self.name)
+ in str(context.exception)
+ )
+ mock_create_role.assert_not_called()
+
+
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role_binding")
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role_binding")
+class CreateRoleBindingClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateRoleBindingClass, self).setUp()
+ self.name = "rolebinding"
+ self.namespace = "osm"
+ self.role_name = "role"
+ self.sa_name = "Default"
+ self.labels = {}
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def assert_create_role_binding(self, mock_create_role_binding):
+ role_binding = V1RoleBinding(
+ metadata=V1ObjectMeta(name=self.name, labels=self.labels),
+ role_ref=V1RoleRef(kind="Role", name=self.role_name, api_group=""),
+ subjects=[
+ V1Subject(
+ kind="ServiceAccount",
+ name=self.sa_name,
+ namespace=self.namespace,
+ )
+ ],
+ )
+ await self.kubectl.create_role_binding(
+ namespace=self.namespace,
+ role_name=self.role_name,
+ name=self.name,
+ sa_name=self.sa_name,
+ labels=self.labels,
+ )
+ mock_create_role_binding.assert_called_once_with(self.namespace, role_binding)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_raise_exception_if_role_binding_already_exists(
+ self,
+ mock_list_role_binding,
+ mock_create_role_binding,
+ ):
+ mock_list_role_binding.return_value = FakeK8sRoleBindingList(items=[1])
+ with self.assertRaises(Exception) as context:
+ await self.kubectl.create_role_binding(
+ self.name,
+ self.role_name,
+ self.sa_name,
+ self.labels,
+ self.namespace,
+ )
+ self.assertTrue(
+ "Role Binding with metadata.name={} already exists".format(self.name)
+ in str(context.exception)
+ )
+ mock_create_role_binding.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret")
+class CreateSecretClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateSecretClass, self).setUp()
+ self.name = "secret"
+ self.namespace = "osm"
+ self.data = {"test": "1234"}
+ self.secret_type = "Opaque"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def assert_create_secret(self, mock_create_secret):
+ secret_metadata = V1ObjectMeta(name=self.name, namespace=self.namespace)
+ secret = V1Secret(
+ metadata=secret_metadata,
+ data=self.data,
+ type=self.secret_type,
+ )
+ await self.kubectl.create_secret(
+ namespace=self.namespace,
+ data=self.data,
+ name=self.name,
+ secret_type=self.secret_type,
+ )
+ mock_create_secret.assert_called_once_with(self.namespace, secret)
+
+
+@mock.patch("kubernetes.client.CoreV1Api.create_namespace")
+class CreateNamespaceClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateNamespaceClass, self).setUp()
+ self.namespace = "osm"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_namespace_is_created(
+ self,
+ mock_create_namespace,
+ ):
+ metadata = V1ObjectMeta(name=self.namespace)
+ namespace = V1Namespace(
+ metadata=metadata,
+ )
+ await self.kubectl.create_namespace(
+ name=self.namespace,
+ )
+ mock_create_namespace.assert_called_once_with(namespace)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_alreadyexists(
+ self,
+ mock_create_namespace,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "AlreadyExists"}'
+ self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.create_namespace(
+ name=self.namespace,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_create_namespace,
+ ):
+ self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.create_namespace(
+ name=self.namespace,
+ )
+
+
+@mock.patch("kubernetes.client.CoreV1Api.delete_namespace")
+class DeleteNamespaceClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(DeleteNamespaceClass, self).setUp()
+ self.namespace = "osm"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_notfound(
+ self,
+ mock_delete_namespace,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "NotFound"}'
+ self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.delete_namespace(
+ name=self.namespace,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_delete_namespace,
+ ):
+ self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.delete_namespace(
+ name=self.namespace,
+ )
+
+
+@mock.patch("kubernetes.client.CoreV1Api.read_namespaced_secret")
+class GetSecretContentClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(GetSecretContentClass, self).setUp()
+ self.name = "my_secret"
+ self.namespace = "osm"
+ self.data = {"my_key": "my_value"}
+ self.type = "Opaque"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_return_type_is_dict(
+ self,
+ mock_read_namespaced_secret,
+ ):
+ metadata = V1ObjectMeta(name=self.name, namespace=self.namespace)
+ secret = V1Secret(metadata=metadata, data=self.data, type=self.type)
+ mock_read_namespaced_secret.return_value = secret
+ content = await self.kubectl.get_secret_content(self.name, self.namespace)
+ assert type(content) is dict