Update from master
[osm/N2VC.git] / n2vc / kubectl.py
index 31b6f55..3fe6b53 100644 (file)
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
+import base64
+import logging
+from typing import Dict
+import typing
+import uuid
+import json
+
+from distutils.version import LooseVersion
+
 from kubernetes import client, config
 from kubernetes import client, config
+from kubernetes.client.api import VersionApi
+from kubernetes.client.models import (
+    V1ClusterRole,
+    V1ObjectMeta,
+    V1PolicyRule,
+    V1ServiceAccount,
+    V1ClusterRoleBinding,
+    V1RoleRef,
+    V1Subject,
+    V1Secret,
+    V1SecretReference,
+)
 from kubernetes.client.rest import ApiException
 from kubernetes.client.rest import ApiException
-import logging
+from n2vc.libjuju import retry_callback
+from retrying_async import retry
+
+
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+# clients
+CORE_CLIENT = "core_v1"
+RBAC_CLIENT = "rbac_v1"
+STORAGE_CLIENT = "storage_v1"
+CUSTOM_OBJECT_CLIENT = "custom_object"
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
+        self._clients = {
+            CORE_CLIENT: client.CoreV1Api(),
+            RBAC_CLIENT: client.RbacAuthorizationV1Api(),
+            STORAGE_CLIENT: client.StorageV1Api(),
+            CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
+        }
+        self._configuration = config.kube_config.Configuration.get_default_copy()
         self.logger = logging.getLogger("Kubectl")
 
         self.logger = logging.getLogger("Kubectl")
 
-    def get_configuration(self):
-        return config.kube_config.Configuration()
+    @property
+    def configuration(self):
+        return self._configuration
+
+    @property
+    def clients(self):
+        return self._clients
+
+    def get_services(
+        self,
+        field_selector: str = None,
+        label_selector: str = None,
+    ) -> typing.List[typing.Dict]:
+        """
+        Get Service list from a namespace
+
+        :param: field_selector:     Kubernetes field selector for the namespace
+        :param: label_selector:     Kubernetes label selector for the namespace
 
 
-    def get_services(self, field_selector=None, label_selector=None):
+        :return: List of the services matching the selectors specified
+        """
         kwargs = {}
         if field_selector:
             kwargs["field_selector"] = field_selector
         if label_selector:
             kwargs["label_selector"] = label_selector
         kwargs = {}
         if field_selector:
             kwargs["field_selector"] = field_selector
         if label_selector:
             kwargs["label_selector"] = label_selector
-
         try:
         try:
-            v1 = client.CoreV1Api()
-            result = v1.list_service_for_all_namespaces(**kwargs)
+            result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
             return [
                 {
                     "name": i.metadata.name,
             return [
                 {
                     "name": i.metadata.name,
@@ -70,9 +123,7 @@ class Kubectl:
                     If not, it returns the first storage class.
                     If there are not storage classes, returns None
         """
                     If not, it returns the first storage class.
                     If there are not storage classes, returns None
         """
-
-        storagev1 = client.StorageV1Api()
-        storage_classes = storagev1.list_storage_class()
+        storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
         selected_sc = None
         default_sc_annotations = {
             "storageclass.kubernetes.io/is-default-class": "true",
         selected_sc = None
         default_sc_annotations = {
             "storageclass.kubernetes.io/is-default-class": "true",
@@ -83,7 +134,7 @@ class Kubectl:
             if not selected_sc:
                 # Select the first storage class in case there is no a default-class
                 selected_sc = sc.metadata.name
             if not selected_sc:
                 # Select the first storage class in case there is no a default-class
                 selected_sc = sc.metadata.name
-            annotations = sc.metadata.annotations
+            annotations = sc.metadata.annotations or {}
             if any(
                 k in annotations and annotations[k] == v
                 for k, v in default_sc_annotations.items()
             if any(
                 k in annotations and annotations[k] == v
                 for k, v in default_sc_annotations.items()
@@ -92,3 +143,301 @@ class Kubectl:
                 selected_sc = sc.metadata.name
                 break
         return selected_sc
                 selected_sc = sc.metadata.name
                 break
         return selected_sc
+
+    def create_cluster_role(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        namespace: str = "kube-system",
+    ):
+        """
+        Create a cluster role
+
+        :param: name:       Name of the cluster role
+        :param: labels:     Labels for cluster role metadata
+        :param: namespace:  Kubernetes namespace for cluster role metadata
+                            Default: kube-system
+        """
+        cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
+            field_selector="metadata.name={}".format(name)
+        )
+
+        if len(cluster_roles.items) > 0:
+            raise Exception(
+                "Cluster role with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+        # Cluster role
+        cluster_role = V1ClusterRole(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+                V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+            ],
+        )
+
+        self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+    def delete_cluster_role(self, name: str):
+        """
+        Delete a cluster role
+
+        :param: name:       Name of the cluster role
+        """
+        self.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+    def _get_kubectl_version(self):
+        version = VersionApi().get_code()
+        return "{}.{}".format(version.major, version.minor)
+
+    def _need_to_create_new_secret(self):
+        min_k8s_version = "1.24"
+        current_k8s_version = self._get_kubectl_version()
+        return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
+
+    def _get_secret_name(self, service_account_name: str):
+        random_alphanum = str(uuid.uuid4())[:5]
+        return "{}-token-{}".format(service_account_name, random_alphanum)
+
+    def _create_service_account_secret(
+        self, service_account_name: str, namespace: str, secret_name: str
+    ):
+        """
+        Create a secret for the service account. K8s version >= 1.24
+
+        :param: service_account_name: Name of the service account
+        :param: namespace:  Kubernetes namespace for service account metadata
+        :param: secret_name: Name of the secret
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        secrets = v1_core.list_namespaced_secret(
+            namespace, field_selector="metadata.name={}".format(secret_name)
+        ).items
+
+        if len(secrets) > 0:
+            raise Exception(
+                "Secret with metadata.name={} already exists".format(secret_name)
+            )
+
+        annotations = {"kubernetes.io/service-account.name": service_account_name}
+        metadata = V1ObjectMeta(
+            name=secret_name, namespace=namespace, annotations=annotations
+        )
+        type = "kubernetes.io/service-account-token"
+        secret = V1Secret(metadata=metadata, type=type)
+        v1_core.create_namespaced_secret(namespace, secret)
+
+    def _get_secret_reference_list(self, namespace: str, secret_name: str):
+        """
+        Return a secret reference list with one secret.
+        K8s version >= 1.24
+
+        :param: namespace:  Kubernetes namespace for service account metadata
+        :param: secret_name: Name of the secret
+        :rtype: list[V1SecretReference]
+        """
+        return [V1SecretReference(name=secret_name, namespace=namespace)]
+
+    def create_service_account(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        namespace: str = "kube-system",
+    ):
+        """
+        Create a service account
+
+        :param: name:       Name of the service account
+        :param: labels:     Labels for service account metadata
+        :param: namespace:  Kubernetes namespace for service account metadata
+                            Default: kube-system
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        service_accounts = v1_core.list_namespaced_service_account(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) > 0:
+            raise Exception(
+                "Service account with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+        if self._need_to_create_new_secret():
+            secret_name = self._get_secret_name(name)
+            secrets = self._get_secret_reference_list(namespace, secret_name)
+            service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
+            v1_core.create_namespaced_service_account(namespace, service_account)
+            self._create_service_account_secret(name, namespace, secret_name)
+        else:
+            service_account = V1ServiceAccount(metadata=metadata)
+            v1_core.create_namespaced_service_account(namespace, service_account)
+
+    def delete_service_account(self, name: str, namespace: str = "kube-system"):
+        """
+        Delete a service account
+
+        :param: name:       Name of the service account
+        :param: namespace:  Kubernetes namespace for service account metadata
+                            Default: kube-system
+        """
+        self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
+
+    def create_cluster_role_binding(
+        self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
+    ):
+        """
+        Create a cluster role binding
+
+        :param: name:       Name of the cluster role
+        :param: labels:     Labels for cluster role binding metadata
+        :param: namespace:  Kubernetes namespace for cluster role binding metadata
+                            Default: kube-system
+        """
+        role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
+            field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception("Generated rbac id already exists")
+
+        role_binding = V1ClusterRoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+            subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
+        )
+        self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+    def delete_cluster_role_binding(self, name: str):
+        """
+        Delete a cluster role binding
+
+        :param: name:       Name of the cluster role binding
+        """
+        self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed getting the secret from service account"),
+        callback=retry_callback,
+    )
+    async def get_secret_data(
+        self, name: str, namespace: str = "kube-system"
+    ) -> (str, str):
+        """
+        Get secret data
+
+        :param: name:       Name of the secret data
+        :param: namespace:  Name of the namespace where the secret is stored
+
+        :return: Tuple with the token and client certificate
+        """
+        v1_core = self.clients[CORE_CLIENT]
+
+        secret_name = None
+
+        service_accounts = v1_core.list_namespaced_service_account(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) == 0:
+            raise Exception(
+                "Service account not found with metadata.name={}".format(name)
+            )
+        service_account = service_accounts.items[0]
+        if service_account.secrets and len(service_account.secrets) > 0:
+            secret_name = service_account.secrets[0].name
+        if not secret_name:
+            raise Exception(
+                "Failed getting the secret from service account {}".format(name)
+            )
+        secret = v1_core.list_namespaced_secret(
+            namespace, field_selector="metadata.name={}".format(secret_name)
+        ).items[0]
+
+        token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+        client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+        return (
+            base64.b64decode(token).decode("utf-8"),
+            base64.b64decode(client_certificate_data).decode("utf-8"),
+        )
+
+    async def create_certificate(
+        self,
+        namespace: str,
+        name: str,
+        dns_prefix: str,
+        secret_name: str,
+        usages: list,
+        issuer_name: str,
+    ):
+        """
+        Creates cert-manager certificate object
+
+        :param: namespace:       Name of the namespace where the certificate and secret is stored
+        :param: name:            Name of the certificate object
+        :param: dns_prefix:      Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
+        :param: secret_name:     Name of the secret created by cert-manager
+        :param: usages:          List of X.509 key usages
+        :param: issuer_name:     Name of the cert-manager's Issuer or ClusterIssuer object
+
+        """
+        certificate_body = {
+            "apiVersion": "cert-manager.io/v1",
+            "kind": "Certificate",
+            "metadata": {"name": name, "namespace": namespace},
+            "spec": {
+                "secretName": secret_name,
+                "privateKey": {
+                    "rotationPolicy": "Always",
+                    "algorithm": "ECDSA",
+                    "size": 256,
+                },
+                "duration": "8760h",  # 1 Year
+                "renewBefore": "2208h",  # 9 months
+                "subject": {"organizations": ["osm"]},
+                "commonName": "osm",
+                "isCA": False,
+                "usages": usages,
+                "dnsNames": [
+                    "{}.{}".format(dns_prefix, namespace),
+                    "{}.{}.svc".format(dns_prefix, namespace),
+                    "{}.{}.svc.cluster".format(dns_prefix, namespace),
+                    "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
+                ],
+                "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
+            },
+        }
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            client.create_namespaced_custom_object(
+                group="cert-manager.io",
+                plural="certificates",
+                version="v1",
+                body=certificate_body,
+                namespace=namespace,
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "alreadyexists":
+                self.logger.warning("Certificate already exists: {}".format(e))
+            else:
+                raise e
+
+    async def delete_certificate(self, namespace, object_name):
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            client.delete_namespaced_custom_object(
+                group="cert-manager.io",
+                plural="certificates",
+                version="v1",
+                name=object_name,
+                namespace=namespace,
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "notfound":
+                self.logger.warning("Certificate already deleted: {}".format(e))
+            else:
+                raise e