X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fkubectl.py;h=7cc6ac20a0a12a6b4c9c8cc863cebc562c73a7ae;hp=3fe6b53d8643fb5317d10394cd13f4eefc3573e4;hb=5f069332606e512f74791c6497af40326b611344;hpb=fb79786bd154505ea9c7578e6247dea565ea9c41 diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index 3fe6b53..7cc6ac2 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -25,14 +25,17 @@ from kubernetes import client, config from kubernetes.client.api import VersionApi from kubernetes.client.models import ( V1ClusterRole, + V1Role, V1ObjectMeta, V1PolicyRule, V1ServiceAccount, V1ClusterRoleBinding, + V1RoleBinding, V1RoleRef, V1Subject, V1Secret, V1SecretReference, + V1Namespace, ) from kubernetes.client.rest import ApiException from n2vc.libjuju import retry_callback @@ -163,9 +166,7 @@ class Kubectl: ) if len(cluster_roles.items) > 0: - raise Exception( - "Cluster role with metadata.name={} already exists".format(name) - ) + raise Exception("Role with metadata.name={} already exists".format(name)) metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) # Cluster role @@ -179,6 +180,46 @@ class Kubectl: self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) + async def create_role( + self, + name: str, + labels: Dict[str, str], + api_groups: list, + resources: list, + verbs: list, + namespace: str, + ): + """ + Create a role with one PolicyRule + + :param: name: Name of the namespaced Role + :param: labels: Labels for namespaced Role metadata + :param: api_groups: List with api-groups allowed in the policy rule + :param: resources: List with resources allowed in the policy rule + :param: verbs: List with verbs allowed in the policy rule + :param: namespace: Kubernetes namespace for Role metadata + + :return: None + """ + + roles = self.clients[RBAC_CLIENT].list_namespaced_role( + namespace, field_selector="metadata.name={}".format(name) + ) + + if len(roles.items) > 0: + raise Exception("Role with metadata.name={} already exists".format(name)) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + + role = V1Role( + metadata=metadata, + rules=[ + V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs), + ], + ) + + self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role) + def delete_cluster_role(self, name: str): """ Delete a cluster role @@ -308,6 +349,44 @@ class Kubectl: ) self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) + async def create_role_binding( + self, + name: str, + role_name: str, + sa_name: str, + labels: Dict[str, str], + namespace: str, + ): + """ + Create a cluster role binding + + :param: name: Name of the namespaced Role Binding + :param: role_name: Name of the namespaced Role to be bound + :param: sa_name: Name of the Service Account to be bound + :param: labels: Labels for Role Binding metadata + :param: namespace: Kubernetes namespace for Role Binding metadata + + :return: None + """ + role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(role_bindings.items) > 0: + raise Exception( + "Role Binding with metadata.name={} already exists".format(name) + ) + + role_binding = V1RoleBinding( + metadata=V1ObjectMeta(name=name, labels=labels), + role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""), + subjects=[ + V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace) + ], + ) + self.clients[RBAC_CLIENT].create_namespaced_role_binding( + namespace, role_binding + ) + def delete_cluster_role_binding(self, name: str): """ Delete a cluster role binding @@ -351,6 +430,7 @@ class Kubectl: raise Exception( "Failed getting the secret from service account {}".format(name) ) + # TODO: refactor to use get_secret_content secret = v1_core.list_namespaced_secret( namespace, field_selector="metadata.name={}".format(secret_name) ).items[0] @@ -363,6 +443,53 @@ class Kubectl: base64.b64decode(client_certificate_data).decode("utf-8"), ) + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed getting data from the secret"), + ) + async def get_secret_content( + self, + name: str, + namespace: str, + ) -> dict: + """ + Get secret data + + :param: name: Name of the secret + :param: namespace: Name of the namespace where the secret is stored + + :return: Dictionary with secret's data + """ + v1_core = self.clients[CORE_CLIENT] + + secret = v1_core.read_namespaced_secret(name, namespace) + + return secret.data + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the secret"), + ) + async def create_secret( + self, name: str, data: dict, namespace: str, secret_type: str + ): + """ + Get secret data + + :param: name: Name of the secret + :param: data: Dict with data content. Values must be already base64 encoded + :param: namespace: Name of the namespace where the secret will be stored + :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls + + :return: None + """ + v1_core = self.clients[CORE_CLIENT] + metadata = V1ObjectMeta(name=name, namespace=namespace) + secret = V1Secret(metadata=metadata, data=data, type=secret_type) + v1_core.create_namespaced_secret(namespace, secret) + async def create_certificate( self, namespace: str, @@ -441,3 +568,49 @@ class Kubectl: self.logger.warning("Certificate already deleted: {}".format(e)) else: raise e + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the namespace"), + ) + async def create_namespace(self, name: str): + """ + Create a namespace + + :param: name: Name of the namespace to be created + + """ + v1_core = self.clients[CORE_CLIENT] + metadata = V1ObjectMeta(name=name) + namespace = V1Namespace( + metadata=metadata, + ) + + try: + v1_core.create_namespace(namespace) + self.logger.debug("Namespace created: {}".format(name)) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "alreadyexists": + self.logger.warning("Namespace already exists: {}".format(e)) + else: + raise e + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed deleting the namespace"), + ) + async def delete_namespace(self, name: str): + """ + Delete a namespace + + :param: name: Name of the namespace to be deleted + + """ + try: + self.clients[CORE_CLIENT].delete_namespace(name) + except ApiException as e: + if e.reason == "Not Found": + self.logger.warning("Namespace already deleted: {}".format(e))