X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fkubectl.py;h=a56b6cdc2f98e39e0b7674e9d586a475ef956625;hp=bc8c3927d9537fbffb4e8bca3170def835c5683b;hb=ba1d07263821169bc36b3f9f9f93cbed93baad4a;hpb=f6e9b00b6f7cd35e45ace4c84b53fe8d12b2438c diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index bc8c392..a56b6cd 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -12,26 +12,43 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import logging +from typing import Dict +import typing + from kubernetes import client, config +from kubernetes.client.models import ( + V1ClusterRole, + V1ObjectMeta, + V1PolicyRule, + V1ServiceAccount, + V1ClusterRoleBinding, + V1RoleRef, + V1Subject, +) from kubernetes.client.rest import ApiException +from retrying_async import retry +SERVICE_ACCOUNT_TOKEN_KEY = "token" +SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" +# clients CORE_CLIENT = "core_v1" -STORAGE_CLIENT = "storage_v1" RBAC_CLIENT = "rbac_v1" +STORAGE_CLIENT = "storage_v1" class Kubectl: def __init__(self, config_file=None): config.load_kube_config(config_file=config_file) self._clients = { - "core_v1": client.CoreV1Api(), - "storage_v1": client.StorageV1Api(), - "rbac_v1": client.RbacAuthorizationV1Api(), + CORE_CLIENT: client.CoreV1Api(), + RBAC_CLIENT: client.RbacAuthorizationV1Api(), + STORAGE_CLIENT: client.StorageV1Api(), } - self._configuration = config.kube_config.Configuration() + self._configuration = config.kube_config.Configuration.get_default_copy() self.logger = logging.getLogger("Kubectl") @property @@ -42,7 +59,19 @@ class Kubectl: def clients(self): return self._clients - def get_services(self, field_selector=None, label_selector=None): + def get_services( + self, + field_selector: str = None, + label_selector: str = None, + ) -> typing.List[typing.Dict]: + """ + Get Service list from a namespace + + :param: field_selector: Kubernetes field selector for the namespace + :param: label_selector: Kubernetes label selector for the namespace + + :return: List of the services matching the selectors specified + """ kwargs = {} if field_selector: kwargs["field_selector"] = field_selector @@ -96,7 +125,7 @@ class Kubectl: if not selected_sc: # Select the first storage class in case there is no a default-class selected_sc = sc.metadata.name - annotations = sc.metadata.annotations + annotations = sc.metadata.annotations or {} if any( k in annotations and annotations[k] == v for k, v in default_sc_annotations.items() @@ -105,3 +134,163 @@ class Kubectl: selected_sc = sc.metadata.name break return selected_sc + + def create_cluster_role( + self, + name: str, + labels: Dict[str, str], + namespace: str = "kube-system", + ): + """ + Create a cluster role + + :param: name: Name of the cluster role + :param: labels: Labels for cluster role metadata + :param: namespace: Kubernetes namespace for cluster role metadata + Default: kube-system + """ + cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role( + field_selector="metadata.name={}".format(name) + ) + + if len(cluster_roles.items) > 0: + raise Exception( + "Cluster role with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + # Cluster role + cluster_role = V1ClusterRole( + metadata=metadata, + rules=[ + V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), + V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), + ], + ) + + self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) + + def delete_cluster_role(self, name: str): + """ + Delete a cluster role + + :param: name: Name of the cluster role + """ + self.clients[RBAC_CLIENT].delete_cluster_role(name) + + def create_service_account( + self, + name: str, + labels: Dict[str, str], + namespace: str = "kube-system", + ): + """ + Create a service account + + :param: name: Name of the service account + :param: labels: Labels for service account metadata + :param: namespace: Kubernetes namespace for service account metadata + Default: kube-system + """ + service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) > 0: + raise Exception( + "Service account with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + service_account = V1ServiceAccount(metadata=metadata) + + self.clients[CORE_CLIENT].create_namespaced_service_account( + namespace, service_account + ) + + def delete_service_account(self, name: str, namespace: str = "kube-system"): + """ + Delete a service account + + :param: name: Name of the service account + :param: namespace: Kubernetes namespace for service account metadata + Default: kube-system + """ + self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace) + + def create_cluster_role_binding( + self, name: str, labels: Dict[str, str], namespace: str = "kube-system" + ): + """ + Create a cluster role binding + + :param: name: Name of the cluster role + :param: labels: Labels for cluster role binding metadata + :param: namespace: Kubernetes namespace for cluster role binding metadata + Default: kube-system + """ + role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding( + field_selector="metadata.name={}".format(name) + ) + if len(role_bindings.items) > 0: + raise Exception("Generated rbac id already exists") + + role_binding = V1ClusterRoleBinding( + metadata=V1ObjectMeta(name=name, labels=labels), + role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), + subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)], + ) + self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) + + def delete_cluster_role_binding(self, name: str): + """ + Delete a cluster role binding + + :param: name: Name of the cluster role binding + """ + self.clients[RBAC_CLIENT].delete_cluster_role_binding(name) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed getting the secret from service account"), + ) + async def get_secret_data( + self, name: str, namespace: str = "kube-system" + ) -> (str, str): + """ + Get secret data + + :param: name: Name of the secret data + :param: namespace: Name of the namespace where the secret is stored + + :return: Tuple with the token and client certificate + """ + v1_core = self.clients[CORE_CLIENT] + + secret_name = None + + service_accounts = v1_core.list_namespaced_service_account( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) == 0: + raise Exception( + "Service account not found with metadata.name={}".format(name) + ) + service_account = service_accounts.items[0] + if service_account.secrets and len(service_account.secrets) > 0: + secret_name = service_account.secrets[0].name + if not secret_name: + raise Exception( + "Failed getting the secret from service account {}".format(name) + ) + secret = v1_core.list_namespaced_secret( + namespace, field_selector="metadata.name={}".format(secret_name) + ).items[0] + + token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] + client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] + + return ( + base64.b64decode(token).decode("utf-8"), + base64.b64decode(client_certificate_data).decode("utf-8"), + )