Coverity-CWE 330: Use of Insufficiently Random Values
[osm/N2VC.git] / n2vc / kubectl.py
index 31b6f55..c16c95a 100644 (file)
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
+import base64
+import logging
+from typing import Dict
+import typing
+import uuid
+import json
+
+from distutils.version import LooseVersion
+
 from kubernetes import client, config
 from kubernetes import client, config
+from kubernetes.client.api import VersionApi
+from kubernetes.client.models import (
+    V1ClusterRole,
+    V1Role,
+    V1ObjectMeta,
+    V1PolicyRule,
+    V1ServiceAccount,
+    V1ClusterRoleBinding,
+    V1RoleBinding,
+    V1RoleRef,
+    V1Subject,
+    V1Secret,
+    V1SecretReference,
+    V1Namespace,
+)
 from kubernetes.client.rest import ApiException
 from kubernetes.client.rest import ApiException
-import logging
+from n2vc.libjuju import retry_callback
+from retrying_async import retry
+
+
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+# clients
+CORE_CLIENT = "core_v1"
+RBAC_CLIENT = "rbac_v1"
+STORAGE_CLIENT = "storage_v1"
+CUSTOM_OBJECT_CLIENT = "custom_object"
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
+        self._clients = {
+            CORE_CLIENT: client.CoreV1Api(),
+            RBAC_CLIENT: client.RbacAuthorizationV1Api(),
+            STORAGE_CLIENT: client.StorageV1Api(),
+            CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
+        }
+        self._configuration = config.kube_config.Configuration.get_default_copy()
         self.logger = logging.getLogger("Kubectl")
 
         self.logger = logging.getLogger("Kubectl")
 
-    def get_configuration(self):
-        return config.kube_config.Configuration()
+    @property
+    def configuration(self):
+        return self._configuration
+
+    @property
+    def clients(self):
+        return self._clients
 
 
-    def get_services(self, field_selector=None, label_selector=None):
+    def get_services(
+        self,
+        field_selector: str = None,
+        label_selector: str = None,
+    ) -> typing.List[typing.Dict]:
+        """
+        Get Service list from a namespace
+
+        :param: field_selector:     Kubernetes field selector for the namespace
+        :param: label_selector:     Kubernetes label selector for the namespace
+
+        :return: List of the services matching the selectors specified
+        """
         kwargs = {}
         if field_selector:
             kwargs["field_selector"] = field_selector
         if label_selector:
             kwargs["label_selector"] = label_selector
         kwargs = {}
         if field_selector:
             kwargs["field_selector"] = field_selector
         if label_selector:
             kwargs["label_selector"] = label_selector
-
         try:
         try:
-            v1 = client.CoreV1Api()
-            result = v1.list_service_for_all_namespaces(**kwargs)
+            result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
             return [
                 {
                     "name": i.metadata.name,
             return [
                 {
                     "name": i.metadata.name,
@@ -70,9 +126,7 @@ class Kubectl:
                     If not, it returns the first storage class.
                     If there are not storage classes, returns None
         """
                     If not, it returns the first storage class.
                     If there are not storage classes, returns None
         """
-
-        storagev1 = client.StorageV1Api()
-        storage_classes = storagev1.list_storage_class()
+        storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
         selected_sc = None
         default_sc_annotations = {
             "storageclass.kubernetes.io/is-default-class": "true",
         selected_sc = None
         default_sc_annotations = {
             "storageclass.kubernetes.io/is-default-class": "true",
@@ -83,7 +137,7 @@ class Kubectl:
             if not selected_sc:
                 # Select the first storage class in case there is no a default-class
                 selected_sc = sc.metadata.name
             if not selected_sc:
                 # Select the first storage class in case there is no a default-class
                 selected_sc = sc.metadata.name
-            annotations = sc.metadata.annotations
+            annotations = sc.metadata.annotations or {}
             if any(
                 k in annotations and annotations[k] == v
                 for k, v in default_sc_annotations.items()
             if any(
                 k in annotations and annotations[k] == v
                 for k, v in default_sc_annotations.items()
@@ -92,3 +146,472 @@ class Kubectl:
                 selected_sc = sc.metadata.name
                 break
         return selected_sc
                 selected_sc = sc.metadata.name
                 break
         return selected_sc
+
+    def create_cluster_role(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        namespace: str = "kube-system",
+    ):
+        """
+        Create a cluster role
+
+        :param: name:       Name of the cluster role
+        :param: labels:     Labels for cluster role metadata
+        :param: namespace:  Kubernetes namespace for cluster role metadata
+                            Default: kube-system
+        """
+        cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
+            field_selector="metadata.name={}".format(name)
+        )
+
+        if len(cluster_roles.items) > 0:
+            raise Exception("Role with metadata.name={} already exists".format(name))
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+        # Cluster role
+        cluster_role = V1ClusterRole(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+                V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+            ],
+        )
+
+        self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+    async def create_role(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        api_groups: list,
+        resources: list,
+        verbs: list,
+        namespace: str,
+    ):
+        """
+        Create a role with one PolicyRule
+
+        :param: name:       Name of the namespaced Role
+        :param: labels:     Labels for namespaced Role metadata
+        :param: api_groups: List with api-groups allowed in the policy rule
+        :param: resources:  List with resources allowed in the policy rule
+        :param: verbs:      List with verbs allowed in the policy rule
+        :param: namespace:  Kubernetes namespace for Role metadata
+
+        :return: None
+        """
+
+        roles = self.clients[RBAC_CLIENT].list_namespaced_role(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+
+        if len(roles.items) > 0:
+            raise Exception("Role with metadata.name={} already exists".format(name))
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+        role = V1Role(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
+            ],
+        )
+
+        self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
+
+    def delete_cluster_role(self, name: str):
+        """
+        Delete a cluster role
+
+        :param: name:       Name of the cluster role
+        """
+        self.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+    def _get_kubectl_version(self):
+        version = VersionApi().get_code()
+        return "{}.{}".format(version.major, version.minor)
+
+    def _need_to_create_new_secret(self):
+        min_k8s_version = "1.24"
+        current_k8s_version = self._get_kubectl_version()
+        return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
+
+    def _get_secret_name(self, service_account_name: str):
+        random_alphanum = str(uuid.uuid4())[:5]
+        return "{}-token-{}".format(service_account_name, random_alphanum)
+
+    def _create_service_account_secret(
+        self, service_account_name: str, namespace: str, secret_name: str
+    ):
+        """
+        Create a secret for the service account. K8s version >= 1.24
+
+        :param: service_account_name: Name of the service account
+        :param: namespace:  Kubernetes namespace for service account metadata
+        :param: secret_name: Name of the secret
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        secrets = v1_core.list_namespaced_secret(
+            namespace, field_selector="metadata.name={}".format(secret_name)
+        ).items
+
+        if len(secrets) > 0:
+            raise Exception(
+                "Secret with metadata.name={} already exists".format(secret_name)
+            )
+
+        annotations = {"kubernetes.io/service-account.name": service_account_name}
+        metadata = V1ObjectMeta(
+            name=secret_name, namespace=namespace, annotations=annotations
+        )
+        type = "kubernetes.io/service-account-token"
+        secret = V1Secret(metadata=metadata, type=type)
+        v1_core.create_namespaced_secret(namespace, secret)
+
+    def _get_secret_reference_list(self, namespace: str, secret_name: str):
+        """
+        Return a secret reference list with one secret.
+        K8s version >= 1.24
+
+        :param: namespace:  Kubernetes namespace for service account metadata
+        :param: secret_name: Name of the secret
+        :rtype: list[V1SecretReference]
+        """
+        return [V1SecretReference(name=secret_name, namespace=namespace)]
+
+    def create_service_account(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        namespace: str = "kube-system",
+    ):
+        """
+        Create a service account
+
+        :param: name:       Name of the service account
+        :param: labels:     Labels for service account metadata
+        :param: namespace:  Kubernetes namespace for service account metadata
+                            Default: kube-system
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        service_accounts = v1_core.list_namespaced_service_account(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) > 0:
+            raise Exception(
+                "Service account with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+        if self._need_to_create_new_secret():
+            secret_name = self._get_secret_name(name)
+            secrets = self._get_secret_reference_list(namespace, secret_name)
+            service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
+            v1_core.create_namespaced_service_account(namespace, service_account)
+            self._create_service_account_secret(name, namespace, secret_name)
+        else:
+            service_account = V1ServiceAccount(metadata=metadata)
+            v1_core.create_namespaced_service_account(namespace, service_account)
+
+    def delete_service_account(self, name: str, namespace: str = "kube-system"):
+        """
+        Delete a service account
+
+        :param: name:       Name of the service account
+        :param: namespace:  Kubernetes namespace for service account metadata
+                            Default: kube-system
+        """
+        self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
+
+    def create_cluster_role_binding(
+        self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
+    ):
+        """
+        Create a cluster role binding
+
+        :param: name:       Name of the cluster role
+        :param: labels:     Labels for cluster role binding metadata
+        :param: namespace:  Kubernetes namespace for cluster role binding metadata
+                            Default: kube-system
+        """
+        role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
+            field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception("Generated rbac id already exists")
+
+        role_binding = V1ClusterRoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+            subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
+        )
+        self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+    async def create_role_binding(
+        self,
+        name: str,
+        role_name: str,
+        sa_name: str,
+        labels: Dict[str, str],
+        namespace: str,
+    ):
+        """
+        Create a cluster role binding
+
+        :param: name:       Name of the namespaced Role Binding
+        :param: role_name:  Name of the namespaced Role to be bound
+        :param: sa_name:    Name of the Service Account to be bound
+        :param: labels:     Labels for Role Binding metadata
+        :param: namespace:  Kubernetes namespace for Role Binding metadata
+
+        :return: None
+        """
+        role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception(
+                "Role Binding with metadata.name={} already exists".format(name)
+            )
+
+        role_binding = V1RoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
+            subjects=[
+                V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
+            ],
+        )
+        self.clients[RBAC_CLIENT].create_namespaced_role_binding(
+            namespace, role_binding
+        )
+
+    def delete_cluster_role_binding(self, name: str):
+        """
+        Delete a cluster role binding
+
+        :param: name:       Name of the cluster role binding
+        """
+        self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed getting the secret from service account"),
+        callback=retry_callback,
+    )
+    async def get_secret_data(
+        self, name: str, namespace: str = "kube-system"
+    ) -> (str, str):
+        """
+        Get secret data
+
+        :param: name:       Name of the secret data
+        :param: namespace:  Name of the namespace where the secret is stored
+
+        :return: Tuple with the token and client certificate
+        """
+        v1_core = self.clients[CORE_CLIENT]
+
+        secret_name = None
+
+        service_accounts = v1_core.list_namespaced_service_account(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) == 0:
+            raise Exception(
+                "Service account not found with metadata.name={}".format(name)
+            )
+        service_account = service_accounts.items[0]
+        if service_account.secrets and len(service_account.secrets) > 0:
+            secret_name = service_account.secrets[0].name
+        if not secret_name:
+            raise Exception(
+                "Failed getting the secret from service account {}".format(name)
+            )
+        # TODO: refactor to use get_secret_content
+        secret = v1_core.list_namespaced_secret(
+            namespace, field_selector="metadata.name={}".format(secret_name)
+        ).items[0]
+
+        token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+        client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+        return (
+            base64.b64decode(token).decode("utf-8"),
+            base64.b64decode(client_certificate_data).decode("utf-8"),
+        )
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed getting data from the secret"),
+    )
+    async def get_secret_content(
+        self,
+        name: str,
+        namespace: str,
+    ) -> dict:
+        """
+        Get secret data
+
+        :param: name:       Name of the secret
+        :param: namespace:  Name of the namespace where the secret is stored
+
+        :return: Dictionary with secret's data
+        """
+        v1_core = self.clients[CORE_CLIENT]
+
+        secret = v1_core.read_namespaced_secret(name, namespace)
+
+        return secret.data
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the secret"),
+    )
+    async def create_secret(
+        self, name: str, data: dict, namespace: str, secret_type: str
+    ):
+        """
+        Get secret data
+
+        :param: name:        Name of the secret
+        :param: data:        Dict with data content. Values must be already base64 encoded
+        :param: namespace:   Name of the namespace where the secret will be stored
+        :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+        :return: None
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        metadata = V1ObjectMeta(name=name, namespace=namespace)
+        secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+        v1_core.create_namespaced_secret(namespace, secret)
+
+    async def create_certificate(
+        self,
+        namespace: str,
+        name: str,
+        dns_prefix: str,
+        secret_name: str,
+        usages: list,
+        issuer_name: str,
+    ):
+        """
+        Creates cert-manager certificate object
+
+        :param: namespace:       Name of the namespace where the certificate and secret is stored
+        :param: name:            Name of the certificate object
+        :param: dns_prefix:      Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
+        :param: secret_name:     Name of the secret created by cert-manager
+        :param: usages:          List of X.509 key usages
+        :param: issuer_name:     Name of the cert-manager's Issuer or ClusterIssuer object
+
+        """
+        certificate_body = {
+            "apiVersion": "cert-manager.io/v1",
+            "kind": "Certificate",
+            "metadata": {"name": name, "namespace": namespace},
+            "spec": {
+                "secretName": secret_name,
+                "privateKey": {
+                    "rotationPolicy": "Always",
+                    "algorithm": "ECDSA",
+                    "size": 256,
+                },
+                "duration": "8760h",  # 1 Year
+                "renewBefore": "2208h",  # 9 months
+                "subject": {"organizations": ["osm"]},
+                "commonName": "osm",
+                "isCA": False,
+                "usages": usages,
+                "dnsNames": [
+                    "{}.{}".format(dns_prefix, namespace),
+                    "{}.{}.svc".format(dns_prefix, namespace),
+                    "{}.{}.svc.cluster".format(dns_prefix, namespace),
+                    "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
+                ],
+                "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
+            },
+        }
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            client.create_namespaced_custom_object(
+                group="cert-manager.io",
+                plural="certificates",
+                version="v1",
+                body=certificate_body,
+                namespace=namespace,
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "alreadyexists":
+                self.logger.warning("Certificate already exists: {}".format(e))
+            else:
+                raise e
+
+    async def delete_certificate(self, namespace, object_name):
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            client.delete_namespaced_custom_object(
+                group="cert-manager.io",
+                plural="certificates",
+                version="v1",
+                name=object_name,
+                namespace=namespace,
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "notfound":
+                self.logger.warning("Certificate already deleted: {}".format(e))
+            else:
+                raise e
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the namespace"),
+    )
+    async def create_namespace(self, name: str, labels: dict = None):
+        """
+        Create a namespace
+
+        :param: name:       Name of the namespace to be created
+        :param: labels:     Dictionary with labels for the new namespace
+
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        metadata = V1ObjectMeta(name=name, labels=labels)
+        namespace = V1Namespace(
+            metadata=metadata,
+        )
+
+        try:
+            v1_core.create_namespace(namespace)
+            self.logger.debug("Namespace created: {}".format(name))
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "alreadyexists":
+                self.logger.warning("Namespace already exists: {}".format(e))
+            else:
+                raise e
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed deleting the namespace"),
+    )
+    async def delete_namespace(self, name: str):
+        """
+        Delete a namespace
+
+        :param: name:       Name of the namespace to be deleted
+
+        """
+        try:
+            self.clients[CORE_CLIENT].delete_namespace(name)
+        except ApiException as e:
+            if e.reason == "Not Found":
+                self.logger.warning("Namespace already deleted: {}".format(e))