X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fkubectl.py;h=a56b6cdc2f98e39e0b7674e9d586a475ef956625;hp=9d4ce57a49b900fb7abc8b441cb3c1a324339239;hb=ba1d07263821169bc36b3f9f9f93cbed93baad4a;hpb=37004983e8e484d5504ae4253bdb75204ff389d9 diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index 9d4ce57..a56b6cd 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -12,26 +12,73 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 +import logging +from typing import Dict +import typing + + from kubernetes import client, config +from kubernetes.client.models import ( + V1ClusterRole, + V1ObjectMeta, + V1PolicyRule, + V1ServiceAccount, + V1ClusterRoleBinding, + V1RoleRef, + V1Subject, +) from kubernetes.client.rest import ApiException -import logging +from retrying_async import retry + + +SERVICE_ACCOUNT_TOKEN_KEY = "token" +SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" +# clients +CORE_CLIENT = "core_v1" +RBAC_CLIENT = "rbac_v1" +STORAGE_CLIENT = "storage_v1" class Kubectl: def __init__(self, config_file=None): config.load_kube_config(config_file=config_file) + self._clients = { + CORE_CLIENT: client.CoreV1Api(), + RBAC_CLIENT: client.RbacAuthorizationV1Api(), + STORAGE_CLIENT: client.StorageV1Api(), + } + self._configuration = config.kube_config.Configuration.get_default_copy() self.logger = logging.getLogger("Kubectl") - def get_services(self, field_selector=None, label_selector=None): + @property + def configuration(self): + return self._configuration + + @property + def clients(self): + return self._clients + + def get_services( + self, + field_selector: str = None, + label_selector: str = None, + ) -> typing.List[typing.Dict]: + """ + Get Service list from a namespace + + :param: field_selector: Kubernetes field selector for the namespace + :param: label_selector: Kubernetes label selector for the namespace + + :return: List of the services matching the selectors specified + """ kwargs = {} if field_selector: kwargs["field_selector"] = field_selector if label_selector: kwargs["label_selector"] = label_selector - try: - v1 = client.CoreV1Api() - result = v1.list_service_for_all_namespaces(**kwargs) + result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs) return [ { "name": i.metadata.name, @@ -46,7 +93,9 @@ class Kubectl: "target_port": p.target_port, } for p in i.spec.ports - ], + ] + if i.spec.ports + else [], "external_ip": [i.ip for i in i.status.load_balancer.ingress] if i.status.load_balancer.ingress else None, @@ -56,3 +105,192 @@ class Kubectl: except ApiException as e: self.logger.error("Error calling get services: {}".format(e)) raise e + + def get_default_storage_class(self) -> str: + """ + Default storage class + + :return: Returns the default storage class name, if exists. + If not, it returns the first storage class. + If there are not storage classes, returns None + """ + storage_classes = self.clients[STORAGE_CLIENT].list_storage_class() + selected_sc = None + default_sc_annotations = { + "storageclass.kubernetes.io/is-default-class": "true", + # Older clusters still use the beta annotation. + "storageclass.beta.kubernetes.io/is-default-class": "true", + } + for sc in storage_classes.items: + if not selected_sc: + # Select the first storage class in case there is no a default-class + selected_sc = sc.metadata.name + annotations = sc.metadata.annotations or {} + if any( + k in annotations and annotations[k] == v + for k, v in default_sc_annotations.items() + ): + # Default storage + selected_sc = sc.metadata.name + break + return selected_sc + + def create_cluster_role( + self, + name: str, + labels: Dict[str, str], + namespace: str = "kube-system", + ): + """ + Create a cluster role + + :param: name: Name of the cluster role + :param: labels: Labels for cluster role metadata + :param: namespace: Kubernetes namespace for cluster role metadata + Default: kube-system + """ + cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role( + field_selector="metadata.name={}".format(name) + ) + + if len(cluster_roles.items) > 0: + raise Exception( + "Cluster role with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + # Cluster role + cluster_role = V1ClusterRole( + metadata=metadata, + rules=[ + V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), + V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), + ], + ) + + self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) + + def delete_cluster_role(self, name: str): + """ + Delete a cluster role + + :param: name: Name of the cluster role + """ + self.clients[RBAC_CLIENT].delete_cluster_role(name) + + def create_service_account( + self, + name: str, + labels: Dict[str, str], + namespace: str = "kube-system", + ): + """ + Create a service account + + :param: name: Name of the service account + :param: labels: Labels for service account metadata + :param: namespace: Kubernetes namespace for service account metadata + Default: kube-system + """ + service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) > 0: + raise Exception( + "Service account with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + service_account = V1ServiceAccount(metadata=metadata) + + self.clients[CORE_CLIENT].create_namespaced_service_account( + namespace, service_account + ) + + def delete_service_account(self, name: str, namespace: str = "kube-system"): + """ + Delete a service account + + :param: name: Name of the service account + :param: namespace: Kubernetes namespace for service account metadata + Default: kube-system + """ + self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace) + + def create_cluster_role_binding( + self, name: str, labels: Dict[str, str], namespace: str = "kube-system" + ): + """ + Create a cluster role binding + + :param: name: Name of the cluster role + :param: labels: Labels for cluster role binding metadata + :param: namespace: Kubernetes namespace for cluster role binding metadata + Default: kube-system + """ + role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding( + field_selector="metadata.name={}".format(name) + ) + if len(role_bindings.items) > 0: + raise Exception("Generated rbac id already exists") + + role_binding = V1ClusterRoleBinding( + metadata=V1ObjectMeta(name=name, labels=labels), + role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), + subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)], + ) + self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) + + def delete_cluster_role_binding(self, name: str): + """ + Delete a cluster role binding + + :param: name: Name of the cluster role binding + """ + self.clients[RBAC_CLIENT].delete_cluster_role_binding(name) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed getting the secret from service account"), + ) + async def get_secret_data( + self, name: str, namespace: str = "kube-system" + ) -> (str, str): + """ + Get secret data + + :param: name: Name of the secret data + :param: namespace: Name of the namespace where the secret is stored + + :return: Tuple with the token and client certificate + """ + v1_core = self.clients[CORE_CLIENT] + + secret_name = None + + service_accounts = v1_core.list_namespaced_service_account( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) == 0: + raise Exception( + "Service account not found with metadata.name={}".format(name) + ) + service_account = service_accounts.items[0] + if service_account.secrets and len(service_account.secrets) > 0: + secret_name = service_account.secrets[0].name + if not secret_name: + raise Exception( + "Failed getting the secret from service account {}".format(name) + ) + secret = v1_core.list_namespaced_secret( + namespace, field_selector="metadata.name={}".format(secret_name) + ).items[0] + + token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] + client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] + + return ( + base64.b64decode(token).decode("utf-8"), + base64.b64decode(client_certificate_data).decode("utf-8"), + )