X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Fkubectl.py;h=8b8008efa1bf18ef8ea723113efcf8ce9fe19593;hb=6343d434fa3cec28d8b9b470054d3a13ada8865a;hp=58367569a3b05beedd8f074382a30d97ad0ba55c;hpb=5d79939ab780d4717fe36bfd62f398b922f84829;p=osm%2FN2VC.git diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index 5836756..8b8008e 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -12,32 +12,95 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 +import logging +from typing import Dict +import typing +import uuid + +from distutils.version import LooseVersion + from kubernetes import client, config +from kubernetes.client.api import VersionApi +from kubernetes.client.models import ( + V1ClusterRole, + V1ObjectMeta, + V1PolicyRule, + V1ServiceAccount, + V1ClusterRoleBinding, + V1RoleRef, + V1Subject, + V1Secret, + V1SecretReference, +) from kubernetes.client.rest import ApiException -import logging +from retrying_async import retry + + +SERVICE_ACCOUNT_TOKEN_KEY = "token" +SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" +# clients +CORE_CLIENT = "core_v1" +RBAC_CLIENT = "rbac_v1" +STORAGE_CLIENT = "storage_v1" class Kubectl: def __init__(self, config_file=None): config.load_kube_config(config_file=config_file) + self._clients = { + CORE_CLIENT: client.CoreV1Api(), + RBAC_CLIENT: client.RbacAuthorizationV1Api(), + STORAGE_CLIENT: client.StorageV1Api(), + } + self._configuration = config.kube_config.Configuration.get_default_copy() self.logger = logging.getLogger("Kubectl") - def get_services(self, field_selector=None, label_selector=None): + @property + def configuration(self): + return self._configuration + + @property + def clients(self): + return self._clients + + def get_services( + self, + field_selector: str = None, + label_selector: str = None, + ) -> typing.List[typing.Dict]: + """ + Get Service list from a namespace + + :param: field_selector: Kubernetes field selector for the namespace + :param: label_selector: Kubernetes label selector for the namespace + + :return: List of the services matching the selectors specified + """ kwargs = {} if field_selector: kwargs["field_selector"] = field_selector if label_selector: kwargs["label_selector"] = label_selector - try: - v1 = client.CoreV1Api() - result = v1.list_service_for_all_namespaces(**kwargs) + result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs) return [ { "name": i.metadata.name, "cluster_ip": i.spec.cluster_ip, "type": i.spec.type, - "ports": i.spec.ports, + "ports": [ + { + "name": p.name, + "node_port": p.node_port, + "port": p.port, + "protocol": p.protocol, + "target_port": p.target_port, + } + for p in i.spec.ports + ] + if i.spec.ports + else [], "external_ip": [i.ip for i in i.status.load_balancer.ingress] if i.status.load_balancer.ingress else None, @@ -47,3 +110,250 @@ class Kubectl: except ApiException as e: self.logger.error("Error calling get services: {}".format(e)) raise e + + def get_default_storage_class(self) -> str: + """ + Default storage class + + :return: Returns the default storage class name, if exists. + If not, it returns the first storage class. + If there are not storage classes, returns None + """ + storage_classes = self.clients[STORAGE_CLIENT].list_storage_class() + selected_sc = None + default_sc_annotations = { + "storageclass.kubernetes.io/is-default-class": "true", + # Older clusters still use the beta annotation. + "storageclass.beta.kubernetes.io/is-default-class": "true", + } + for sc in storage_classes.items: + if not selected_sc: + # Select the first storage class in case there is no a default-class + selected_sc = sc.metadata.name + annotations = sc.metadata.annotations or {} + if any( + k in annotations and annotations[k] == v + for k, v in default_sc_annotations.items() + ): + # Default storage + selected_sc = sc.metadata.name + break + return selected_sc + + def create_cluster_role( + self, + name: str, + labels: Dict[str, str], + namespace: str = "kube-system", + ): + """ + Create a cluster role + + :param: name: Name of the cluster role + :param: labels: Labels for cluster role metadata + :param: namespace: Kubernetes namespace for cluster role metadata + Default: kube-system + """ + cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role( + field_selector="metadata.name={}".format(name) + ) + + if len(cluster_roles.items) > 0: + raise Exception( + "Cluster role with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + # Cluster role + cluster_role = V1ClusterRole( + metadata=metadata, + rules=[ + V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), + V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), + ], + ) + + self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) + + def delete_cluster_role(self, name: str): + """ + Delete a cluster role + + :param: name: Name of the cluster role + """ + self.clients[RBAC_CLIENT].delete_cluster_role(name) + + def _get_kubectl_version(self): + version = VersionApi().get_code() + return "{}.{}".format(version.major, version.minor) + + def _need_to_create_new_secret(self): + min_k8s_version = "1.24" + current_k8s_version = self._get_kubectl_version() + return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version) + + def _get_secret_name(self, service_account_name: str): + random_alphanum = str(uuid.uuid4())[:5] + return "{}-token-{}".format(service_account_name, random_alphanum) + + def _create_service_account_secret( + self, service_account_name: str, namespace: str, secret_name: str + ): + """ + Create a secret for the service account. K8s version >= 1.24 + + :param: service_account_name: Name of the service account + :param: namespace: Kubernetes namespace for service account metadata + :param: secret_name: Name of the secret + """ + v1_core = self.clients[CORE_CLIENT] + secrets = v1_core.list_namespaced_secret( + namespace, field_selector="metadata.name={}".format(secret_name) + ).items + + if len(secrets) > 0: + raise Exception( + "Secret with metadata.name={} already exists".format(secret_name) + ) + + annotations = {"kubernetes.io/service-account.name": service_account_name} + metadata = V1ObjectMeta( + name=secret_name, namespace=namespace, annotations=annotations + ) + type = "kubernetes.io/service-account-token" + secret = V1Secret(metadata=metadata, type=type) + v1_core.create_namespaced_secret(namespace, secret) + + def _get_secret_reference_list(self, namespace: str, secret_name: str): + """ + Return a secret reference list with one secret. + K8s version >= 1.24 + + :param: namespace: Kubernetes namespace for service account metadata + :param: secret_name: Name of the secret + :rtype: list[V1SecretReference] + """ + return [V1SecretReference(name=secret_name, namespace=namespace)] + + def create_service_account( + self, + name: str, + labels: Dict[str, str], + namespace: str = "kube-system", + ): + """ + Create a service account + + :param: name: Name of the service account + :param: labels: Labels for service account metadata + :param: namespace: Kubernetes namespace for service account metadata + Default: kube-system + """ + v1_core = self.clients[CORE_CLIENT] + service_accounts = v1_core.list_namespaced_service_account( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) > 0: + raise Exception( + "Service account with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + + if self._need_to_create_new_secret(): + secret_name = self._get_secret_name(name) + secrets = self._get_secret_reference_list(namespace, secret_name) + service_account = V1ServiceAccount(metadata=metadata, secrets=secrets) + v1_core.create_namespaced_service_account(namespace, service_account) + self._create_service_account_secret(name, namespace, secret_name) + else: + service_account = V1ServiceAccount(metadata=metadata) + v1_core.create_namespaced_service_account(namespace, service_account) + + def delete_service_account(self, name: str, namespace: str = "kube-system"): + """ + Delete a service account + + :param: name: Name of the service account + :param: namespace: Kubernetes namespace for service account metadata + Default: kube-system + """ + self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace) + + def create_cluster_role_binding( + self, name: str, labels: Dict[str, str], namespace: str = "kube-system" + ): + """ + Create a cluster role binding + + :param: name: Name of the cluster role + :param: labels: Labels for cluster role binding metadata + :param: namespace: Kubernetes namespace for cluster role binding metadata + Default: kube-system + """ + role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding( + field_selector="metadata.name={}".format(name) + ) + if len(role_bindings.items) > 0: + raise Exception("Generated rbac id already exists") + + role_binding = V1ClusterRoleBinding( + metadata=V1ObjectMeta(name=name, labels=labels), + role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), + subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)], + ) + self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) + + def delete_cluster_role_binding(self, name: str): + """ + Delete a cluster role binding + + :param: name: Name of the cluster role binding + """ + self.clients[RBAC_CLIENT].delete_cluster_role_binding(name) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed getting the secret from service account"), + ) + async def get_secret_data( + self, name: str, namespace: str = "kube-system" + ) -> (str, str): + """ + Get secret data + + :param: name: Name of the secret data + :param: namespace: Name of the namespace where the secret is stored + + :return: Tuple with the token and client certificate + """ + v1_core = self.clients[CORE_CLIENT] + + secret_name = None + + service_accounts = v1_core.list_namespaced_service_account( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) == 0: + raise Exception( + "Service account not found with metadata.name={}".format(name) + ) + service_account = service_accounts.items[0] + if service_account.secrets and len(service_account.secrets) > 0: + secret_name = service_account.secrets[0].name + if not secret_name: + raise Exception( + "Failed getting the secret from service account {}".format(name) + ) + secret = v1_core.list_namespaced_secret( + namespace, field_selector="metadata.name={}".format(secret_name) + ).items[0] + + token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] + client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] + + return ( + base64.b64decode(token).decode("utf-8"), + base64.b64decode(client_certificate_data).decode("utf-8"), + )