blob: a7f0ceefb51aa7131c917b4098a2bbf45b07e51b [file] [log] [blame]
#######################################################################################
# Copyright 2020 Canonical Ltd.
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
import base64
import logging
from typing import Dict
import typing
import uuid
import json
from distutils.version import LooseVersion
from kubernetes import client, config
from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
V1Role,
V1ObjectMeta,
V1PolicyRule,
V1ServiceAccount,
V1ClusterRoleBinding,
V1RoleBinding,
V1RoleRef,
RbacV1Subject,
V1Secret,
V1SecretReference,
V1Namespace,
)
from kubernetes.client.rest import ApiException
from n2vc.libjuju import retry_callback
from retrying_async import retry
SERVICE_ACCOUNT_TOKEN_KEY = "token"
SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
# clients
CORE_CLIENT = "core_v1"
RBAC_CLIENT = "rbac_v1"
STORAGE_CLIENT = "storage_v1"
CUSTOM_OBJECT_CLIENT = "custom_object"
class Kubectl:
def __init__(self, config_file=None):
config.load_kube_config(config_file=config_file)
self._clients = {
CORE_CLIENT: client.CoreV1Api(),
RBAC_CLIENT: client.RbacAuthorizationV1Api(),
STORAGE_CLIENT: client.StorageV1Api(),
CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
}
self._configuration = config.kube_config.Configuration.get_default_copy()
self.logger = logging.getLogger("lcm.odu")
@property
def configuration(self):
return self._configuration
@property
def clients(self):
return self._clients
def get_services(
self,
field_selector: str = None,
label_selector: str = None,
) -> typing.List[typing.Dict]:
"""
Get Service list from a namespace
:param: field_selector: Kubernetes field selector for the namespace
:param: label_selector: Kubernetes label selector for the namespace
:return: List of the services matching the selectors specified
"""
kwargs = {}
if field_selector:
kwargs["field_selector"] = field_selector
if label_selector:
kwargs["label_selector"] = label_selector
try:
result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
return [
{
"name": i.metadata.name,
"cluster_ip": i.spec.cluster_ip,
"type": i.spec.type,
"ports": (
[
{
"name": p.name,
"node_port": p.node_port,
"port": p.port,
"protocol": p.protocol,
"target_port": p.target_port,
}
for p in i.spec.ports
]
if i.spec.ports
else []
),
"external_ip": [i.ip for i in i.status.load_balancer.ingress]
if i.status.load_balancer.ingress
else None,
}
for i in result.items
]
except ApiException as e:
self.logger.error("Error calling get services: {}".format(e))
raise e
def get_default_storage_class(self) -> str:
"""
Default storage class
:return: Returns the default storage class name, if exists.
If not, it returns the first storage class.
If there are not storage classes, returns None
"""
storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
selected_sc = None
default_sc_annotations = {
"storageclass.kubernetes.io/is-default-class": "true",
# Older clusters still use the beta annotation.
"storageclass.beta.kubernetes.io/is-default-class": "true",
}
for sc in storage_classes.items:
if not selected_sc:
# Select the first storage class in case there is no a default-class
selected_sc = sc.metadata.name
annotations = sc.metadata.annotations or {}
if any(
k in annotations and annotations[k] == v
for k, v in default_sc_annotations.items()
):
# Default storage
selected_sc = sc.metadata.name
break
return selected_sc
def create_cluster_role(
self,
name: str,
labels: Dict[str, str],
namespace: str = "kube-system",
):
"""
Create a cluster role
:param: name: Name of the cluster role
:param: labels: Labels for cluster role metadata
:param: namespace: Kubernetes namespace for cluster role metadata
Default: kube-system
"""
cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
field_selector="metadata.name={}".format(name)
)
if len(cluster_roles.items) > 0:
raise Exception("Role with metadata.name={} already exists".format(name))
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
# Cluster role
cluster_role = V1ClusterRole(
metadata=metadata,
rules=[
V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
],
)
self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
async def create_role(
self,
name: str,
labels: Dict[str, str],
api_groups: list,
resources: list,
verbs: list,
namespace: str,
):
"""
Create a role with one PolicyRule
:param: name: Name of the namespaced Role
:param: labels: Labels for namespaced Role metadata
:param: api_groups: List with api-groups allowed in the policy rule
:param: resources: List with resources allowed in the policy rule
:param: verbs: List with verbs allowed in the policy rule
:param: namespace: Kubernetes namespace for Role metadata
:return: None
"""
roles = self.clients[RBAC_CLIENT].list_namespaced_role(
namespace, field_selector="metadata.name={}".format(name)
)
if len(roles.items) > 0:
raise Exception("Role with metadata.name={} already exists".format(name))
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
role = V1Role(
metadata=metadata,
rules=[
V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
],
)
self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
def delete_cluster_role(self, name: str):
"""
Delete a cluster role
:param: name: Name of the cluster role
"""
self.clients[RBAC_CLIENT].delete_cluster_role(name)
def _get_kubectl_version(self):
version = VersionApi().get_code()
return "{}.{}".format(version.major, version.minor)
def _need_to_create_new_secret(self):
min_k8s_version = "1.24"
current_k8s_version = self._get_kubectl_version()
return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
def _get_secret_name(self, service_account_name: str):
random_alphanum = str(uuid.uuid4())[:5]
return "{}-token-{}".format(service_account_name, random_alphanum)
def _create_service_account_secret(
self,
service_account_name: str,
namespace: str,
secret_name: str,
):
"""
Create a secret for the service account. K8s version >= 1.24
:param: service_account_name: Name of the service account
:param: namespace: Kubernetes namespace for service account metadata
:param: secret_name: Name of the secret
"""
v1_core = self.clients[CORE_CLIENT]
secrets = v1_core.list_namespaced_secret(
namespace, field_selector="metadata.name={}".format(secret_name)
).items
if len(secrets) > 0:
raise Exception(
"Secret with metadata.name={} already exists".format(secret_name)
)
annotations = {"kubernetes.io/service-account.name": service_account_name}
metadata = V1ObjectMeta(
name=secret_name, namespace=namespace, annotations=annotations
)
type = "kubernetes.io/service-account-token"
secret = V1Secret(metadata=metadata, type=type)
v1_core.create_namespaced_secret(namespace, secret)
def _get_secret_reference_list(self, namespace: str, secret_name: str):
"""
Return a secret reference list with one secret.
K8s version >= 1.24
:param: namespace: Kubernetes namespace for service account metadata
:param: secret_name: Name of the secret
:rtype: list[V1SecretReference]
"""
return [V1SecretReference(name=secret_name, namespace=namespace)]
def create_service_account(
self,
name: str,
labels: Dict[str, str],
namespace: str = "kube-system",
):
"""
Create a service account
:param: name: Name of the service account
:param: labels: Labels for service account metadata
:param: namespace: Kubernetes namespace for service account metadata
Default: kube-system
"""
v1_core = self.clients[CORE_CLIENT]
service_accounts = v1_core.list_namespaced_service_account(
namespace, field_selector="metadata.name={}".format(name)
)
if len(service_accounts.items) > 0:
raise Exception(
"Service account with metadata.name={} already exists".format(name)
)
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
if self._need_to_create_new_secret():
secret_name = self._get_secret_name(name)
secrets = self._get_secret_reference_list(namespace, secret_name)
service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
v1_core.create_namespaced_service_account(namespace, service_account)
self._create_service_account_secret(name, namespace, secret_name)
else:
service_account = V1ServiceAccount(metadata=metadata)
v1_core.create_namespaced_service_account(namespace, service_account)
def delete_secret(self, name: str, namespace: str = "kube-system"):
"""
Delete a secret
:param: name: Name of the secret
:param: namespace: Kubernetes namespace
Default: kube-system
"""
self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
def delete_service_account(self, name: str, namespace: str = "kube-system"):
"""
Delete a service account
:param: name: Name of the service account
:param: namespace: Kubernetes namespace for service account metadata
Default: kube-system
"""
self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
def create_cluster_role_binding(
self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
):
"""
Create a cluster role binding
:param: name: Name of the cluster role
:param: labels: Labels for cluster role binding metadata
:param: namespace: Kubernetes namespace for cluster role binding metadata
Default: kube-system
"""
role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
field_selector="metadata.name={}".format(name)
)
if len(role_bindings.items) > 0:
raise Exception("Generated rbac id already exists")
role_binding = V1ClusterRoleBinding(
metadata=V1ObjectMeta(name=name, labels=labels),
role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
subjects=[
RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
],
)
self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
async def create_role_binding(
self,
name: str,
role_name: str,
sa_name: str,
labels: Dict[str, str],
namespace: str,
):
"""
Create a cluster role binding
:param: name: Name of the namespaced Role Binding
:param: role_name: Name of the namespaced Role to be bound
:param: sa_name: Name of the Service Account to be bound
:param: labels: Labels for Role Binding metadata
:param: namespace: Kubernetes namespace for Role Binding metadata
:return: None
"""
role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
namespace, field_selector="metadata.name={}".format(name)
)
if len(role_bindings.items) > 0:
raise Exception(
"Role Binding with metadata.name={} already exists".format(name)
)
role_binding = V1RoleBinding(
metadata=V1ObjectMeta(name=name, labels=labels),
role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
subjects=[
RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
],
)
self.clients[RBAC_CLIENT].create_namespaced_role_binding(
namespace, role_binding
)
def delete_cluster_role_binding(self, name: str):
"""
Delete a cluster role binding
:param: name: Name of the cluster role binding
"""
self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
@retry(
attempts=10,
delay=1,
fallback=Exception("Failed getting the secret from service account"),
callback=retry_callback,
)
async def get_secret_data(
self, name: str, namespace: str = "kube-system"
) -> (str, str):
"""
Get secret data
:param: name: Name of the secret data
:param: namespace: Name of the namespace where the secret is stored
:return: Tuple with the token and client certificate
"""
v1_core = self.clients[CORE_CLIENT]
secret_name = None
service_accounts = v1_core.list_namespaced_service_account(
namespace, field_selector="metadata.name={}".format(name)
)
if len(service_accounts.items) == 0:
raise Exception(
"Service account not found with metadata.name={}".format(name)
)
service_account = service_accounts.items[0]
if service_account.secrets and len(service_account.secrets) > 0:
secret_name = service_account.secrets[0].name
if not secret_name:
raise Exception(
"Failed getting the secret from service account {}".format(name)
)
# TODO: refactor to use get_secret_content
secret = v1_core.list_namespaced_secret(
namespace, field_selector="metadata.name={}".format(secret_name)
).items[0]
token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
return (
base64.b64decode(token).decode("utf-8"),
base64.b64decode(client_certificate_data).decode("utf-8"),
)
@retry(
attempts=10,
delay=1,
fallback=Exception("Failed getting data from the secret"),
)
async def get_secret_content(
self,
name: str,
namespace: str,
) -> dict:
"""
Get secret data
:param: name: Name of the secret
:param: namespace: Name of the namespace where the secret is stored
:return: Dictionary with secret's data
"""
v1_core = self.clients[CORE_CLIENT]
secret = v1_core.read_namespaced_secret(name, namespace)
return secret.data
@retry(
attempts=10,
delay=1,
fallback=Exception("Failed creating the secret"),
)
async def create_secret(
self, name: str, data: dict, namespace: str, secret_type: str
):
"""
Create secret with data
:param: name: Name of the secret
:param: data: Dict with data content. Values must be already base64 encoded
:param: namespace: Name of the namespace where the secret will be stored
:param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
:return: None
"""
self.logger.info("Enter create_secret function")
v1_core = self.clients[CORE_CLIENT]
self.logger.info(f"v1_core: {v1_core}")
metadata = V1ObjectMeta(name=name, namespace=namespace)
self.logger.info(f"metadata: {metadata}")
secret = V1Secret(metadata=metadata, data=data, type=secret_type)
self.logger.info(f"secret: {secret}")
v1_core.create_namespaced_secret(namespace, secret)
self.logger.info("Namespaced secret was created")
async def create_certificate(
self,
namespace: str,
name: str,
dns_prefix: str,
secret_name: str,
usages: list,
issuer_name: str,
):
"""
Creates cert-manager certificate object
:param: namespace: Name of the namespace where the certificate and secret is stored
:param: name: Name of the certificate object
:param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
:param: secret_name: Name of the secret created by cert-manager
:param: usages: List of X.509 key usages
:param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
"""
certificate_body = {
"apiVersion": "cert-manager.io/v1",
"kind": "Certificate",
"metadata": {"name": name, "namespace": namespace},
"spec": {
"secretName": secret_name,
"privateKey": {
"rotationPolicy": "Always",
"algorithm": "ECDSA",
"size": 256,
},
"duration": "8760h", # 1 Year
"renewBefore": "2208h", # 9 months
"subject": {"organizations": ["osm"]},
"commonName": "osm",
"isCA": False,
"usages": usages,
"dnsNames": [
"{}.{}".format(dns_prefix, namespace),
"{}.{}.svc".format(dns_prefix, namespace),
"{}.{}.svc.cluster".format(dns_prefix, namespace),
"{}.{}.svc.cluster.local".format(dns_prefix, namespace),
],
"issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
},
}
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
client.create_namespaced_custom_object(
group="cert-manager.io",
plural="certificates",
version="v1",
body=certificate_body,
namespace=namespace,
)
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "alreadyexists":
self.logger.warning("Certificate already exists: {}".format(e))
else:
raise e
async def delete_certificate(self, namespace, object_name):
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
client.delete_namespaced_custom_object(
group="cert-manager.io",
plural="certificates",
version="v1",
name=object_name,
namespace=namespace,
)
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "notfound":
self.logger.warning("Certificate already deleted: {}".format(e))
else:
raise e
@retry(
attempts=10,
delay=1,
fallback=Exception("Failed creating the namespace"),
)
async def create_namespace(self, name: str, labels: dict = None):
"""
Create a namespace
:param: name: Name of the namespace to be created
:param: labels: Dictionary with labels for the new namespace
"""
v1_core = self.clients[CORE_CLIENT]
metadata = V1ObjectMeta(name=name, labels=labels)
namespace = V1Namespace(
metadata=metadata,
)
try:
v1_core.create_namespace(namespace)
self.logger.debug("Namespace created: {}".format(name))
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "alreadyexists":
self.logger.warning("Namespace already exists: {}".format(e))
else:
raise e
@retry(
attempts=10,
delay=1,
fallback=Exception("Failed deleting the namespace"),
)
async def delete_namespace(self, name: str):
"""
Delete a namespace
:param: name: Name of the namespace to be deleted
"""
try:
self.clients[CORE_CLIENT].delete_namespace(name)
except ApiException as e:
if e.reason == "Not Found":
self.logger.warning("Namespace already deleted: {}".format(e))
def get_secrets(
self,
namespace: str,
field_selector: str = None,
) -> typing.List[typing.Dict]:
"""
Get Secret list from a namespace
:param: namespace: Kubernetes namespace
:param: field_selector: Kubernetes field selector
:return: List of the secrets matching the selectors specified
"""
try:
v1_core = self.clients[CORE_CLIENT]
secrets = v1_core.list_namespaced_secret(
namespace=namespace,
field_selector=field_selector,
).items
return secrets
except ApiException as e:
self.logger.error("Error calling get secrets: {}".format(e))
raise e
def create_generic_object(
self,
api_group: str,
api_plural: str,
api_version: str,
namespace: str,
manifest_dict: dict,
):
"""
Creates generic object
:param: api_group: API Group
:param: api_plural: API Plural
:param: api_version: API Version
:param: namespace: Namespace
:param: manifest_dict: Dictionary with the content of the Kubernetes manifest
"""
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
client.create_namespaced_custom_object(
group=api_group,
plural=api_plural,
version=api_version,
body=manifest_dict,
namespace=namespace,
)
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "alreadyexists":
self.logger.warning("Object already exists: {}".format(e))
else:
raise e
def delete_generic_object(
self,
api_group: str,
api_plural: str,
api_version: str,
namespace: str,
name: str,
):
"""
Deletes generic object
:param: api_group: API Group
:param: api_plural: API Plural
:param: api_version: API Version
:param: namespace: Namespace
:param: name: Name of the object
"""
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
client.delete_namespaced_custom_object(
group=api_group,
plural=api_plural,
version=api_version,
name=name,
namespace=namespace,
)
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "notfound":
self.logger.warning("Object already deleted: {}".format(e))
else:
raise e
async def get_generic_object(
self,
api_group: str,
api_plural: str,
api_version: str,
namespace: str,
name: str,
):
"""
Gets generic object
:param: api_group: API Group
:param: api_plural: API Plural
:param: api_version: API Version
:param: namespace: Namespace
:param: name: Name of the object
"""
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
object_dict = client.list_namespaced_custom_object(
group=api_group,
plural=api_plural,
version=api_version,
namespace=namespace,
field_selector=f"metadata.name={name}",
)
if len(object_dict.get("items")) == 0:
return None
return object_dict.get("items")[0]
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "notfound":
self.logger.warning("Cannot get custom object: {}".format(e))
else:
raise e
async def list_generic_object(
self,
api_group: str,
api_plural: str,
api_version: str,
namespace: str,
):
"""
Lists all generic objects of the requested API group
:param: api_group: API Group
:param: api_plural: API Plural
:param: api_version: API Version
:param: namespace: Namespace
"""
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
object_dict = client.list_namespaced_custom_object(
group=api_group,
plural=api_plural,
version=api_version,
namespace=namespace,
)
self.logger.debug(f"Object-list: {object_dict.get('items')}")
return object_dict.get("items")
except ApiException as e:
info = json.loads(e.body)
if info.get("reason").lower() == "notfound":
self.logger.warning(
"Cannot retrieve list of custom objects: {}".format(e)
)
else:
raise e
@retry(
attempts=10,
delay=1,
fallback=Exception("Failed creating the secret"),
)
async def create_secret_string(
self, name: str, string_data: str, namespace: str, secret_type: str
):
"""
Create secret with data
:param: name: Name of the secret
:param: string_data: String with data content
:param: namespace: Name of the namespace where the secret will be stored
:param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
:return: None
"""
v1_core = self.clients[CORE_CLIENT]
metadata = V1ObjectMeta(name=name, namespace=namespace)
secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
v1_core.create_namespaced_secret(namespace, secret)