blob: 8b8008efa1bf18ef8ea723113efcf8ce9fe19593 [file] [log] [blame]
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import logging
from typing import Dict
import typing
import uuid
from distutils.version import LooseVersion
from kubernetes import client, config
from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
V1ObjectMeta,
V1PolicyRule,
V1ServiceAccount,
V1ClusterRoleBinding,
V1RoleRef,
V1Subject,
V1Secret,
V1SecretReference,
)
from kubernetes.client.rest import ApiException
from retrying_async import retry
SERVICE_ACCOUNT_TOKEN_KEY = "token"
SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
# clients
CORE_CLIENT = "core_v1"
RBAC_CLIENT = "rbac_v1"
STORAGE_CLIENT = "storage_v1"
class Kubectl:
def __init__(self, config_file=None):
config.load_kube_config(config_file=config_file)
self._clients = {
CORE_CLIENT: client.CoreV1Api(),
RBAC_CLIENT: client.RbacAuthorizationV1Api(),
STORAGE_CLIENT: client.StorageV1Api(),
}
self._configuration = config.kube_config.Configuration.get_default_copy()
self.logger = logging.getLogger("Kubectl")
@property
def configuration(self):
return self._configuration
@property
def clients(self):
return self._clients
def get_services(
self,
field_selector: str = None,
label_selector: str = None,
) -> typing.List[typing.Dict]:
"""
Get Service list from a namespace
:param: field_selector: Kubernetes field selector for the namespace
:param: label_selector: Kubernetes label selector for the namespace
:return: List of the services matching the selectors specified
"""
kwargs = {}
if field_selector:
kwargs["field_selector"] = field_selector
if label_selector:
kwargs["label_selector"] = label_selector
try:
result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
return [
{
"name": i.metadata.name,
"cluster_ip": i.spec.cluster_ip,
"type": i.spec.type,
"ports": [
{
"name": p.name,
"node_port": p.node_port,
"port": p.port,
"protocol": p.protocol,
"target_port": p.target_port,
}
for p in i.spec.ports
]
if i.spec.ports
else [],
"external_ip": [i.ip for i in i.status.load_balancer.ingress]
if i.status.load_balancer.ingress
else None,
}
for i in result.items
]
except ApiException as e:
self.logger.error("Error calling get services: {}".format(e))
raise e
def get_default_storage_class(self) -> str:
"""
Default storage class
:return: Returns the default storage class name, if exists.
If not, it returns the first storage class.
If there are not storage classes, returns None
"""
storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
selected_sc = None
default_sc_annotations = {
"storageclass.kubernetes.io/is-default-class": "true",
# Older clusters still use the beta annotation.
"storageclass.beta.kubernetes.io/is-default-class": "true",
}
for sc in storage_classes.items:
if not selected_sc:
# Select the first storage class in case there is no a default-class
selected_sc = sc.metadata.name
annotations = sc.metadata.annotations or {}
if any(
k in annotations and annotations[k] == v
for k, v in default_sc_annotations.items()
):
# Default storage
selected_sc = sc.metadata.name
break
return selected_sc
def create_cluster_role(
self,
name: str,
labels: Dict[str, str],
namespace: str = "kube-system",
):
"""
Create a cluster role
:param: name: Name of the cluster role
:param: labels: Labels for cluster role metadata
:param: namespace: Kubernetes namespace for cluster role metadata
Default: kube-system
"""
cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
field_selector="metadata.name={}".format(name)
)
if len(cluster_roles.items) > 0:
raise Exception(
"Cluster role with metadata.name={} already exists".format(name)
)
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
# Cluster role
cluster_role = V1ClusterRole(
metadata=metadata,
rules=[
V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
],
)
self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
def delete_cluster_role(self, name: str):
"""
Delete a cluster role
:param: name: Name of the cluster role
"""
self.clients[RBAC_CLIENT].delete_cluster_role(name)
def _get_kubectl_version(self):
version = VersionApi().get_code()
return "{}.{}".format(version.major, version.minor)
def _need_to_create_new_secret(self):
min_k8s_version = "1.24"
current_k8s_version = self._get_kubectl_version()
return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
def _get_secret_name(self, service_account_name: str):
random_alphanum = str(uuid.uuid4())[:5]
return "{}-token-{}".format(service_account_name, random_alphanum)
def _create_service_account_secret(
self, service_account_name: str, namespace: str, secret_name: str
):
"""
Create a secret for the service account. K8s version >= 1.24
:param: service_account_name: Name of the service account
:param: namespace: Kubernetes namespace for service account metadata
:param: secret_name: Name of the secret
"""
v1_core = self.clients[CORE_CLIENT]
secrets = v1_core.list_namespaced_secret(
namespace, field_selector="metadata.name={}".format(secret_name)
).items
if len(secrets) > 0:
raise Exception(
"Secret with metadata.name={} already exists".format(secret_name)
)
annotations = {"kubernetes.io/service-account.name": service_account_name}
metadata = V1ObjectMeta(
name=secret_name, namespace=namespace, annotations=annotations
)
type = "kubernetes.io/service-account-token"
secret = V1Secret(metadata=metadata, type=type)
v1_core.create_namespaced_secret(namespace, secret)
def _get_secret_reference_list(self, namespace: str, secret_name: str):
"""
Return a secret reference list with one secret.
K8s version >= 1.24
:param: namespace: Kubernetes namespace for service account metadata
:param: secret_name: Name of the secret
:rtype: list[V1SecretReference]
"""
return [V1SecretReference(name=secret_name, namespace=namespace)]
def create_service_account(
self,
name: str,
labels: Dict[str, str],
namespace: str = "kube-system",
):
"""
Create a service account
:param: name: Name of the service account
:param: labels: Labels for service account metadata
:param: namespace: Kubernetes namespace for service account metadata
Default: kube-system
"""
v1_core = self.clients[CORE_CLIENT]
service_accounts = v1_core.list_namespaced_service_account(
namespace, field_selector="metadata.name={}".format(name)
)
if len(service_accounts.items) > 0:
raise Exception(
"Service account with metadata.name={} already exists".format(name)
)
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
if self._need_to_create_new_secret():
secret_name = self._get_secret_name(name)
secrets = self._get_secret_reference_list(namespace, secret_name)
service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
v1_core.create_namespaced_service_account(namespace, service_account)
self._create_service_account_secret(name, namespace, secret_name)
else:
service_account = V1ServiceAccount(metadata=metadata)
v1_core.create_namespaced_service_account(namespace, service_account)
def delete_service_account(self, name: str, namespace: str = "kube-system"):
"""
Delete a service account
:param: name: Name of the service account
:param: namespace: Kubernetes namespace for service account metadata
Default: kube-system
"""
self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
def create_cluster_role_binding(
self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
):
"""
Create a cluster role binding
:param: name: Name of the cluster role
:param: labels: Labels for cluster role binding metadata
:param: namespace: Kubernetes namespace for cluster role binding metadata
Default: kube-system
"""
role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
field_selector="metadata.name={}".format(name)
)
if len(role_bindings.items) > 0:
raise Exception("Generated rbac id already exists")
role_binding = V1ClusterRoleBinding(
metadata=V1ObjectMeta(name=name, labels=labels),
role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
)
self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
def delete_cluster_role_binding(self, name: str):
"""
Delete a cluster role binding
:param: name: Name of the cluster role binding
"""
self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
@retry(
attempts=10,
delay=1,
fallback=Exception("Failed getting the secret from service account"),
)
async def get_secret_data(
self, name: str, namespace: str = "kube-system"
) -> (str, str):
"""
Get secret data
:param: name: Name of the secret data
:param: namespace: Name of the namespace where the secret is stored
:return: Tuple with the token and client certificate
"""
v1_core = self.clients[CORE_CLIENT]
secret_name = None
service_accounts = v1_core.list_namespaced_service_account(
namespace, field_selector="metadata.name={}".format(name)
)
if len(service_accounts.items) == 0:
raise Exception(
"Service account not found with metadata.name={}".format(name)
)
service_account = service_accounts.items[0]
if service_account.secrets and len(service_account.secrets) > 0:
secret_name = service_account.secrets[0].name
if not secret_name:
raise Exception(
"Failed getting the secret from service account {}".format(name)
)
secret = v1_core.list_namespaced_secret(
namespace, field_selector="metadata.name={}".format(secret_name)
).items[0]
token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
return (
base64.b64decode(token).decode("utf-8"),
base64.b64decode(client_certificate_data).decode("utf-8"),
)