| # Copyright 2020 Canonical Ltd. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import base64 |
| import logging |
| from typing import Dict |
| import typing |
| |
| |
| from kubernetes import client, config |
| from kubernetes.client.models import ( |
| V1ClusterRole, |
| V1ObjectMeta, |
| V1PolicyRule, |
| V1ServiceAccount, |
| V1ClusterRoleBinding, |
| V1RoleRef, |
| V1Subject, |
| ) |
| from kubernetes.client.rest import ApiException |
| from retrying_async import retry |
| |
| |
| SERVICE_ACCOUNT_TOKEN_KEY = "token" |
| SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" |
| # clients |
| CORE_CLIENT = "core_v1" |
| RBAC_CLIENT = "rbac_v1" |
| STORAGE_CLIENT = "storage_v1" |
| |
| |
| class Kubectl: |
| def __init__(self, config_file=None): |
| config.load_kube_config(config_file=config_file) |
| self._clients = { |
| CORE_CLIENT: client.CoreV1Api(), |
| RBAC_CLIENT: client.RbacAuthorizationV1Api(), |
| STORAGE_CLIENT: client.StorageV1Api(), |
| } |
| self._configuration = config.kube_config.Configuration.get_default_copy() |
| self.logger = logging.getLogger("Kubectl") |
| |
| @property |
| def configuration(self): |
| return self._configuration |
| |
| @property |
| def clients(self): |
| return self._clients |
| |
| def get_services( |
| self, |
| field_selector: str = None, |
| label_selector: str = None, |
| ) -> typing.List[typing.Dict]: |
| """ |
| Get Service list from a namespace |
| |
| :param: field_selector: Kubernetes field selector for the namespace |
| :param: label_selector: Kubernetes label selector for the namespace |
| |
| :return: List of the services matching the selectors specified |
| """ |
| kwargs = {} |
| if field_selector: |
| kwargs["field_selector"] = field_selector |
| if label_selector: |
| kwargs["label_selector"] = label_selector |
| try: |
| result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs) |
| return [ |
| { |
| "name": i.metadata.name, |
| "cluster_ip": i.spec.cluster_ip, |
| "type": i.spec.type, |
| "ports": [ |
| { |
| "name": p.name, |
| "node_port": p.node_port, |
| "port": p.port, |
| "protocol": p.protocol, |
| "target_port": p.target_port, |
| } |
| for p in i.spec.ports |
| ] |
| if i.spec.ports |
| else [], |
| "external_ip": [i.ip for i in i.status.load_balancer.ingress] |
| if i.status.load_balancer.ingress |
| else None, |
| } |
| for i in result.items |
| ] |
| except ApiException as e: |
| self.logger.error("Error calling get services: {}".format(e)) |
| raise e |
| |
| def get_default_storage_class(self) -> str: |
| """ |
| Default storage class |
| |
| :return: Returns the default storage class name, if exists. |
| If not, it returns the first storage class. |
| If there are not storage classes, returns None |
| """ |
| storage_classes = self.clients[STORAGE_CLIENT].list_storage_class() |
| selected_sc = None |
| default_sc_annotations = { |
| "storageclass.kubernetes.io/is-default-class": "true", |
| # Older clusters still use the beta annotation. |
| "storageclass.beta.kubernetes.io/is-default-class": "true", |
| } |
| for sc in storage_classes.items: |
| if not selected_sc: |
| # Select the first storage class in case there is no a default-class |
| selected_sc = sc.metadata.name |
| annotations = sc.metadata.annotations or {} |
| if any( |
| k in annotations and annotations[k] == v |
| for k, v in default_sc_annotations.items() |
| ): |
| # Default storage |
| selected_sc = sc.metadata.name |
| break |
| return selected_sc |
| |
| def create_cluster_role( |
| self, |
| name: str, |
| labels: Dict[str, str], |
| namespace: str = "kube-system", |
| ): |
| """ |
| Create a cluster role |
| |
| :param: name: Name of the cluster role |
| :param: labels: Labels for cluster role metadata |
| :param: namespace: Kubernetes namespace for cluster role metadata |
| Default: kube-system |
| """ |
| cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role( |
| field_selector="metadata.name={}".format(name) |
| ) |
| |
| if len(cluster_roles.items) > 0: |
| raise Exception( |
| "Cluster role with metadata.name={} already exists".format(name) |
| ) |
| |
| metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) |
| # Cluster role |
| cluster_role = V1ClusterRole( |
| metadata=metadata, |
| rules=[ |
| V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), |
| V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), |
| ], |
| ) |
| |
| self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) |
| |
| def delete_cluster_role(self, name: str): |
| """ |
| Delete a cluster role |
| |
| :param: name: Name of the cluster role |
| """ |
| self.clients[RBAC_CLIENT].delete_cluster_role(name) |
| |
| def create_service_account( |
| self, |
| name: str, |
| labels: Dict[str, str], |
| namespace: str = "kube-system", |
| ): |
| """ |
| Create a service account |
| |
| :param: name: Name of the service account |
| :param: labels: Labels for service account metadata |
| :param: namespace: Kubernetes namespace for service account metadata |
| Default: kube-system |
| """ |
| service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account( |
| namespace, field_selector="metadata.name={}".format(name) |
| ) |
| if len(service_accounts.items) > 0: |
| raise Exception( |
| "Service account with metadata.name={} already exists".format(name) |
| ) |
| |
| metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) |
| service_account = V1ServiceAccount(metadata=metadata) |
| |
| self.clients[CORE_CLIENT].create_namespaced_service_account( |
| namespace, service_account |
| ) |
| |
| def delete_service_account(self, name: str, namespace: str = "kube-system"): |
| """ |
| Delete a service account |
| |
| :param: name: Name of the service account |
| :param: namespace: Kubernetes namespace for service account metadata |
| Default: kube-system |
| """ |
| self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace) |
| |
| def create_cluster_role_binding( |
| self, name: str, labels: Dict[str, str], namespace: str = "kube-system" |
| ): |
| """ |
| Create a cluster role binding |
| |
| :param: name: Name of the cluster role |
| :param: labels: Labels for cluster role binding metadata |
| :param: namespace: Kubernetes namespace for cluster role binding metadata |
| Default: kube-system |
| """ |
| role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding( |
| field_selector="metadata.name={}".format(name) |
| ) |
| if len(role_bindings.items) > 0: |
| raise Exception("Generated rbac id already exists") |
| |
| role_binding = V1ClusterRoleBinding( |
| metadata=V1ObjectMeta(name=name, labels=labels), |
| role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), |
| subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)], |
| ) |
| self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) |
| |
| def delete_cluster_role_binding(self, name: str): |
| """ |
| Delete a cluster role binding |
| |
| :param: name: Name of the cluster role binding |
| """ |
| self.clients[RBAC_CLIENT].delete_cluster_role_binding(name) |
| |
| @retry( |
| attempts=10, |
| delay=1, |
| fallback=Exception("Failed getting the secret from service account"), |
| ) |
| async def get_secret_data( |
| self, name: str, namespace: str = "kube-system" |
| ) -> (str, str): |
| """ |
| Get secret data |
| |
| :param: name: Name of the secret data |
| :param: namespace: Name of the namespace where the secret is stored |
| |
| :return: Tuple with the token and client certificate |
| """ |
| v1_core = self.clients[CORE_CLIENT] |
| |
| secret_name = None |
| |
| service_accounts = v1_core.list_namespaced_service_account( |
| namespace, field_selector="metadata.name={}".format(name) |
| ) |
| if len(service_accounts.items) == 0: |
| raise Exception( |
| "Service account not found with metadata.name={}".format(name) |
| ) |
| service_account = service_accounts.items[0] |
| if service_account.secrets and len(service_account.secrets) > 0: |
| secret_name = service_account.secrets[0].name |
| if not secret_name: |
| raise Exception( |
| "Failed getting the secret from service account {}".format(name) |
| ) |
| secret = v1_core.list_namespaced_secret( |
| namespace, field_selector="metadata.name={}".format(secret_name) |
| ).items[0] |
| |
| token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] |
| client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] |
| |
| return ( |
| base64.b64decode(token).decode("utf-8"), |
| base64.b64decode(client_certificate_data).decode("utf-8"), |
| ) |