Remove unsecure and unused function
[osm/N2VC.git] / n2vc / kubectl.py
index 31b6f55..a56b6cd 100644 (file)
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
+import base64
+import logging
+from typing import Dict
+import typing
+
+
 from kubernetes import client, config
 from kubernetes import client, config
+from kubernetes.client.models import (
+    V1ClusterRole,
+    V1ObjectMeta,
+    V1PolicyRule,
+    V1ServiceAccount,
+    V1ClusterRoleBinding,
+    V1RoleRef,
+    V1Subject,
+)
 from kubernetes.client.rest import ApiException
 from kubernetes.client.rest import ApiException
-import logging
+from retrying_async import retry
+
+
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+# clients
+CORE_CLIENT = "core_v1"
+RBAC_CLIENT = "rbac_v1"
+STORAGE_CLIENT = "storage_v1"
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
+        self._clients = {
+            CORE_CLIENT: client.CoreV1Api(),
+            RBAC_CLIENT: client.RbacAuthorizationV1Api(),
+            STORAGE_CLIENT: client.StorageV1Api(),
+        }
+        self._configuration = config.kube_config.Configuration.get_default_copy()
         self.logger = logging.getLogger("Kubectl")
 
         self.logger = logging.getLogger("Kubectl")
 
-    def get_configuration(self):
-        return config.kube_config.Configuration()
+    @property
+    def configuration(self):
+        return self._configuration
 
 
-    def get_services(self, field_selector=None, label_selector=None):
+    @property
+    def clients(self):
+        return self._clients
+
+    def get_services(
+        self,
+        field_selector: str = None,
+        label_selector: str = None,
+    ) -> typing.List[typing.Dict]:
+        """
+        Get Service list from a namespace
+
+        :param: field_selector:     Kubernetes field selector for the namespace
+        :param: label_selector:     Kubernetes label selector for the namespace
+
+        :return: List of the services matching the selectors specified
+        """
         kwargs = {}
         if field_selector:
             kwargs["field_selector"] = field_selector
         if label_selector:
             kwargs["label_selector"] = label_selector
         kwargs = {}
         if field_selector:
             kwargs["field_selector"] = field_selector
         if label_selector:
             kwargs["label_selector"] = label_selector
-
         try:
         try:
-            v1 = client.CoreV1Api()
-            result = v1.list_service_for_all_namespaces(**kwargs)
+            result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
             return [
                 {
                     "name": i.metadata.name,
             return [
                 {
                     "name": i.metadata.name,
@@ -70,9 +114,7 @@ class Kubectl:
                     If not, it returns the first storage class.
                     If there are not storage classes, returns None
         """
                     If not, it returns the first storage class.
                     If there are not storage classes, returns None
         """
-
-        storagev1 = client.StorageV1Api()
-        storage_classes = storagev1.list_storage_class()
+        storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
         selected_sc = None
         default_sc_annotations = {
             "storageclass.kubernetes.io/is-default-class": "true",
         selected_sc = None
         default_sc_annotations = {
             "storageclass.kubernetes.io/is-default-class": "true",
@@ -83,7 +125,7 @@ class Kubectl:
             if not selected_sc:
                 # Select the first storage class in case there is no a default-class
                 selected_sc = sc.metadata.name
             if not selected_sc:
                 # Select the first storage class in case there is no a default-class
                 selected_sc = sc.metadata.name
-            annotations = sc.metadata.annotations
+            annotations = sc.metadata.annotations or {}
             if any(
                 k in annotations and annotations[k] == v
                 for k, v in default_sc_annotations.items()
             if any(
                 k in annotations and annotations[k] == v
                 for k, v in default_sc_annotations.items()
@@ -92,3 +134,163 @@ class Kubectl:
                 selected_sc = sc.metadata.name
                 break
         return selected_sc
                 selected_sc = sc.metadata.name
                 break
         return selected_sc
+
+    def create_cluster_role(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        namespace: str = "kube-system",
+    ):
+        """
+        Create a cluster role
+
+        :param: name:       Name of the cluster role
+        :param: labels:     Labels for cluster role metadata
+        :param: namespace:  Kubernetes namespace for cluster role metadata
+                            Default: kube-system
+        """
+        cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
+            field_selector="metadata.name={}".format(name)
+        )
+
+        if len(cluster_roles.items) > 0:
+            raise Exception(
+                "Cluster role with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+        # Cluster role
+        cluster_role = V1ClusterRole(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+                V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+            ],
+        )
+
+        self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+    def delete_cluster_role(self, name: str):
+        """
+        Delete a cluster role
+
+        :param: name:       Name of the cluster role
+        """
+        self.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+    def create_service_account(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        namespace: str = "kube-system",
+    ):
+        """
+        Create a service account
+
+        :param: name:       Name of the service account
+        :param: labels:     Labels for service account metadata
+        :param: namespace:  Kubernetes namespace for service account metadata
+                            Default: kube-system
+        """
+        service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) > 0:
+            raise Exception(
+                "Service account with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+        service_account = V1ServiceAccount(metadata=metadata)
+
+        self.clients[CORE_CLIENT].create_namespaced_service_account(
+            namespace, service_account
+        )
+
+    def delete_service_account(self, name: str, namespace: str = "kube-system"):
+        """
+        Delete a service account
+
+        :param: name:       Name of the service account
+        :param: namespace:  Kubernetes namespace for service account metadata
+                            Default: kube-system
+        """
+        self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
+
+    def create_cluster_role_binding(
+        self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
+    ):
+        """
+        Create a cluster role binding
+
+        :param: name:       Name of the cluster role
+        :param: labels:     Labels for cluster role binding metadata
+        :param: namespace:  Kubernetes namespace for cluster role binding metadata
+                            Default: kube-system
+        """
+        role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
+            field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception("Generated rbac id already exists")
+
+        role_binding = V1ClusterRoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+            subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
+        )
+        self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+    def delete_cluster_role_binding(self, name: str):
+        """
+        Delete a cluster role binding
+
+        :param: name:       Name of the cluster role binding
+        """
+        self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed getting the secret from service account"),
+    )
+    async def get_secret_data(
+        self, name: str, namespace: str = "kube-system"
+    ) -> (str, str):
+        """
+        Get secret data
+
+        :param: name:       Name of the secret data
+        :param: namespace:  Name of the namespace where the secret is stored
+
+        :return: Tuple with the token and client certificate
+        """
+        v1_core = self.clients[CORE_CLIENT]
+
+        secret_name = None
+
+        service_accounts = v1_core.list_namespaced_service_account(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) == 0:
+            raise Exception(
+                "Service account not found with metadata.name={}".format(name)
+            )
+        service_account = service_accounts.items[0]
+        if service_account.secrets and len(service_account.secrets) > 0:
+            secret_name = service_account.secrets[0].name
+        if not secret_name:
+            raise Exception(
+                "Failed getting the secret from service account {}".format(name)
+            )
+        secret = v1_core.list_namespaced_secret(
+            namespace, field_selector="metadata.name={}".format(secret_name)
+        ).items[0]
+
+        token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+        client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+        return (
+            base64.b64decode(token).decode("utf-8"),
+            base64.b64decode(client_certificate_data).decode("utf-8"),
+        )