Upgrade to libjuju 2.9.2 40/11040/8 master
authorDavid Garcia <david.garcia@canonical.com>
Thu, 24 Jun 2021 16:47:22 +0000 (18:47 +0200)
committerDavid Garcia <david.garcia@canonical.com>
Wed, 30 Jun 2021 14:55:30 +0000 (16:55 +0200)
- The upgrade to libjuju enables Pebble support
- Libjuju requires a newer version of Kubernetes so I updated it to the
latest one
- Additionally, a group of functions were moved from k8s_juju_conn.p to
kubectl.py, because they fit better there.

Related feature number: 10884

Change-Id: I354a2f49e7fc7b87a204bf60131a8d52a4f74cac
Signed-off-by: David Garcia <david.garcia@canonical.com>
n2vc/k8s_juju_conn.py
n2vc/kubectl.py
n2vc/tests/unit/test_k8s_juju_conn.py
n2vc/tests/unit/test_kubectl.py
requirements.in
requirements.txt

index 24b3142..149947d 100644 (file)
@@ -18,35 +18,20 @@ import uuid
 import yaml
 import tempfile
 import binascii
-import base64
 
 from n2vc.config import EnvironConfig
 from n2vc.exceptions import K8sException
 from n2vc.k8s_conn import K8sConnector
-from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
+from n2vc.kubectl import Kubectl
 from .exceptions import MethodNotImplemented
 from n2vc.libjuju import Libjuju
 from n2vc.utils import obj_to_dict, obj_to_yaml
 from n2vc.store import MotorStore
 from n2vc.vca.cloud import Cloud
 from n2vc.vca.connection import get_connection
-from kubernetes.client.models import (
-    V1ClusterRole,
-    V1ObjectMeta,
-    V1PolicyRule,
-    V1ServiceAccount,
-    V1ClusterRoleBinding,
-    V1RoleRef,
-    V1Subject,
-)
-
-from typing import Dict
-
-SERVICE_ACCOUNT_TOKEN_KEY = "token"
-SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
-RBAC_LABEL_KEY_NAME = "rbac-id"
 
-ADMIN_NAMESPACE = "kube-system"
+
+RBAC_LABEL_KEY_NAME = "rbac-id"
 RBAC_STACK_PREFIX = "juju-credential"
 
 
@@ -125,11 +110,7 @@ class K8sJujuConnector(K8sConnector):
         libjuju = await self._get_libjuju(kwargs.get("vca_id"))
 
         cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
-
-        kubecfg = tempfile.NamedTemporaryFile()
-        with open(kubecfg.name, "w") as kubecfg_file:
-            kubecfg_file.write(k8s_creds)
-        kubectl = Kubectl(config_file=kubecfg.name)
+        kubectl = self._get_kubectl(k8s_creds)
 
         # CREATING RESOURCES IN K8S
         rbac_id = generate_rbac_id()
@@ -140,43 +121,39 @@ class K8sJujuConnector(K8sConnector):
         # if it fails in the middle of the process
         cleanup_data = []
         try:
-            self._create_cluster_role(
-                kubectl,
+            kubectl.create_cluster_role(
                 name=metadata_name,
                 labels=labels,
             )
             cleanup_data.append(
                 {
-                    "delete": self._delete_cluster_role,
-                    "args": (kubectl, metadata_name),
+                    "delete": kubectl.delete_cluster_role,
+                    "args": (metadata_name),
                 }
             )
 
-            self._create_service_account(
-                kubectl,
+            kubectl.create_service_account(
                 name=metadata_name,
                 labels=labels,
             )
             cleanup_data.append(
                 {
-                    "delete": self._delete_service_account,
-                    "args": (kubectl, metadata_name),
+                    "delete": kubectl.delete_service_account,
+                    "args": (metadata_name),
                 }
             )
 
-            self._create_cluster_role_binding(
-                kubectl,
+            kubectl.create_cluster_role_binding(
                 name=metadata_name,
                 labels=labels,
             )
             cleanup_data.append(
                 {
-                    "delete": self._delete_service_account,
-                    "args": (kubectl, metadata_name),
+                    "delete": kubectl.delete_service_account,
+                    "args": (metadata_name),
                 }
             )
-            token, client_cert_data = await self._get_secret_data(
-                kubectl,
+            token, client_cert_data = await kubectl.get_secret_data(
                 metadata_name,
             )
 
@@ -259,27 +236,23 @@ class K8sJujuConnector(K8sConnector):
 
             await libjuju.remove_cloud(cluster_uuid)
 
-            kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
+            credentials = self.get_credentials(cluster_uuid=cluster_uuid)
 
-            kubecfg_file = tempfile.NamedTemporaryFile()
-            with open(kubecfg_file.name, "w") as f:
-                f.write(kubecfg)
-            kubectl = Kubectl(config_file=kubecfg_file.name)
+            kubectl = self._get_kubectl(credentials)
 
             delete_functions = [
-                self._delete_cluster_role_binding,
-                self._delete_service_account,
-                self._delete_cluster_role,
+                kubectl.delete_cluster_role_binding,
+                kubectl.delete_service_account,
+                kubectl.delete_cluster_role,
             ]
 
             credential_attrs = cloud_creds[0].result["attrs"]
             if RBAC_LABEL_KEY_NAME in credential_attrs:
                 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
                 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
-                delete_args = (kubectl, metadata_name)
                 for delete_func in delete_functions:
                     try:
-                        delete_func(*delete_args)
+                        delete_func(metadata_name)
                     except Exception as e:
                         self.log.warning("Cannot remove resource in K8s {}".format(e))
 
@@ -738,12 +711,7 @@ class K8sJujuConnector(K8sConnector):
         """Return a list of services of a kdu_instance"""
 
         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
-
-        kubecfg = tempfile.NamedTemporaryFile()
-        with open(kubecfg.name, "w") as kubecfg_file:
-            kubecfg_file.write(credentials)
-        kubectl = Kubectl(config_file=kubecfg.name)
-
+        kubectl = self._get_kubectl(credentials)
         return kubectl.get_services(
             field_selector="metadata.namespace={}".format(kdu_instance)
         )
@@ -754,12 +722,7 @@ class K8sJujuConnector(K8sConnector):
         """Return data for a specific service inside a namespace"""
 
         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
-
-        kubecfg = tempfile.NamedTemporaryFile()
-        with open(kubecfg.name, "w") as kubecfg_file:
-            kubecfg_file.write(credentials)
-        kubectl = Kubectl(config_file=kubecfg.name)
-
+        kubectl = self._get_kubectl(credentials)
         return kubectl.get_services(
             field_selector="metadata.name={},metadata.namespace={}".format(
                 service_name, namespace
@@ -811,122 +774,6 @@ class K8sJujuConnector(K8sConnector):
         """
         pass
 
-    def _create_cluster_role(
-        self,
-        kubectl: Kubectl,
-        name: str,
-        labels: Dict[str, str],
-    ):
-        cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
-            field_selector="metadata.name={}".format(name)
-        )
-
-        if len(cluster_roles.items) > 0:
-            raise Exception(
-                "Cluster role with metadata.name={} already exists".format(name)
-            )
-
-        metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
-        # Cluster role
-        cluster_role = V1ClusterRole(
-            metadata=metadata,
-            rules=[
-                V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
-                V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
-            ],
-        )
-
-        kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
-
-    def _delete_cluster_role(self, kubectl: Kubectl, name: str):
-        kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
-
-    def _create_service_account(
-        self,
-        kubectl: Kubectl,
-        name: str,
-        labels: Dict[str, str],
-    ):
-        service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
-            ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
-        )
-        if len(service_accounts.items) > 0:
-            raise Exception(
-                "Service account with metadata.name={} already exists".format(name)
-            )
-
-        metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
-        service_account = V1ServiceAccount(metadata=metadata)
-
-        kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
-            ADMIN_NAMESPACE, service_account
-        )
-
-    def _delete_service_account(self, kubectl: Kubectl, name: str):
-        kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
-            name, ADMIN_NAMESPACE
-        )
-
-    def _create_cluster_role_binding(
-        self,
-        kubectl: Kubectl,
-        name: str,
-        labels: Dict[str, str],
-    ):
-        role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
-            field_selector="metadata.name={}".format(name)
-        )
-        if len(role_bindings.items) > 0:
-            raise Exception("Generated rbac id already exists")
-
-        role_binding = V1ClusterRoleBinding(
-            metadata=V1ObjectMeta(name=name, labels=labels),
-            role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
-            subjects=[
-                V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
-            ],
-        )
-        kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
-
-    def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
-        kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
-
-    async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
-        v1_core = kubectl.clients[CORE_CLIENT]
-
-        retries_limit = 10
-        secret_name = None
-        while True:
-            retries_limit -= 1
-            service_accounts = v1_core.list_namespaced_service_account(
-                ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
-            )
-            if len(service_accounts.items) == 0:
-                raise Exception(
-                    "Service account not found with metadata.name={}".format(name)
-                )
-            service_account = service_accounts.items[0]
-            if service_account.secrets and len(service_account.secrets) > 0:
-                secret_name = service_account.secrets[0].name
-            if secret_name is not None or not retries_limit:
-                break
-        if not secret_name:
-            raise Exception(
-                "Failed getting the secret from service account {}".format(name)
-            )
-        secret = v1_core.list_namespaced_secret(
-            ADMIN_NAMESPACE,
-            field_selector="metadata.name={}".format(secret_name),
-        ).items[0]
-
-        token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
-        client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
-
-        return (
-            base64.b64decode(token).decode("utf-8"),
-            base64.b64decode(client_certificate_data).decode("utf-8"),
-        )
-
     @staticmethod
     def generate_kdu_instance_name(**kwargs):
         db_dict = kwargs.get("db_dict")
@@ -961,3 +808,14 @@ class K8sJujuConnector(K8sConnector):
                 log=self.log,
                 n2vc=self,
             )
+
+    def _get_kubectl(self, credentials: str) -> Kubectl:
+        """
+        Get Kubectl object
+
+        :param: kubeconfig_credentials: Kubeconfig credentials
+        """
+        kubecfg = tempfile.NamedTemporaryFile()
+        with open(kubecfg.name, "w") as kubecfg_file:
+            kubecfg_file.write(credentials)
+        return Kubectl(config_file=kubecfg.name)
index d67f4db..ff48eda 100644 (file)
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
+import base64
 import logging
+from typing import Dict
+import typing
+
 
 from kubernetes import client, config
+from kubernetes.client.models import (
+    V1ClusterRole,
+    V1ObjectMeta,
+    V1PolicyRule,
+    V1ServiceAccount,
+    V1ClusterRoleBinding,
+    V1RoleRef,
+    V1Subject,
+)
 from kubernetes.client.rest import ApiException
+from retrying_async import retry
 
 
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+# clients
 CORE_CLIENT = "core_v1"
-STORAGE_CLIENT = "storage_v1"
 RBAC_CLIENT = "rbac_v1"
+STORAGE_CLIENT = "storage_v1"
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
         self._clients = {
-            "core_v1": client.CoreV1Api(),
-            "storage_v1": client.StorageV1Api(),
-            "rbac_v1": client.RbacAuthorizationV1Api(),
+            CORE_CLIENT: client.CoreV1Api(),
+            RBAC_CLIENT: client.RbacAuthorizationV1Api(),
+            STORAGE_CLIENT: client.StorageV1Api(),
         }
-        self._configuration = config.kube_config.Configuration()
+        self._configuration = config.kube_config.Configuration.get_default_copy()
         self.logger = logging.getLogger("Kubectl")
 
     @property
@@ -42,7 +59,19 @@ class Kubectl:
     def clients(self):
         return self._clients
 
-    def get_services(self, field_selector=None, label_selector=None):
+    def get_services(
+        self,
+        field_selector: str = None,
+        label_selector: str = None,
+    ) -> typing.List[typing.Dict]:
+        """
+        Get Service list from a namespace
+
+        :param: field_selector:     Kubernetes field selector for the namespace
+        :param: label_selector:     Kubernetes label selector for the namespace
+
+        :return: List of the services matching the selectors specified
+        """
         kwargs = {}
         if field_selector:
             kwargs["field_selector"] = field_selector
@@ -105,3 +134,158 @@ class Kubectl:
                 selected_sc = sc.metadata.name
                 break
         return selected_sc
+
+    def create_cluster_role(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        namespace: str = "kube-system",
+    ):
+        """
+        Create a cluster role
+
+        :param: name:       Name of the cluster role
+        :param: labels:     Labels for cluster role metadata
+        :param: namespace:  Kubernetes namespace for cluster role metadata
+                            Default: kube-system
+        """
+        cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
+            field_selector="metadata.name={}".format(name)
+        )
+
+        if len(cluster_roles.items) > 0:
+            raise Exception(
+                "Cluster role with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+        # Cluster role
+        cluster_role = V1ClusterRole(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+                V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+            ],
+        )
+
+        self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+    def delete_cluster_role(self, name: str):
+        """
+        Delete a cluster role
+
+        :param: name:       Name of the cluster role
+        """
+        self.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+    def create_service_account(
+        self,
+        name: str,
+        labels: Dict[str, str],
+        namespace: str = "kube-system",
+    ):
+        """
+        Create a service account
+
+        :param: name:       Name of the service account
+        :param: labels:     Labels for service account metadata
+        :param: namespace:  Kubernetes namespace for service account metadata
+                            Default: kube-system
+        """
+        service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) > 0:
+            raise Exception(
+                "Service account with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+        service_account = V1ServiceAccount(metadata=metadata)
+
+        self.clients[CORE_CLIENT].create_namespaced_service_account(
+            namespace, service_account
+        )
+
+    def delete_service_account(self, name: str, namespace: str = "kube-system"):
+        """
+        Delete a service account
+
+        :param: name:       Name of the service account
+        :param: namespace:  Kubernetes namespace for service account metadata
+                            Default: kube-system
+        """
+        self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
+
+    def create_cluster_role_binding(
+        self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
+    ):
+        """
+        Create a cluster role binding
+
+        :param: name:       Name of the cluster role
+        :param: labels:     Labels for cluster role binding metadata
+        :param: namespace:  Kubernetes namespace for cluster role binding metadata
+                            Default: kube-system
+        """
+        role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
+            field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception("Generated rbac id already exists")
+
+        role_binding = V1ClusterRoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+            subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
+        )
+        self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+    def delete_cluster_role_binding(self, name: str):
+        """
+        Delete a cluster role binding
+
+        :param: name:       Name of the cluster role binding
+        """
+        self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed getting the secret from service account"),
+    )
+    async def get_secret_data(self, name: str, namespace: str = "kube-system") -> (str, str):
+        """
+        Get secret data
+
+        :return: Tuple with the token and client certificate
+        """
+        v1_core = self.clients[CORE_CLIENT]
+
+        secret_name = None
+
+        service_accounts = v1_core.list_namespaced_service_account(
+            namespace, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) == 0:
+            raise Exception(
+                "Service account not found with metadata.name={}".format(name)
+            )
+        service_account = service_accounts.items[0]
+        if service_account.secrets and len(service_account.secrets) > 0:
+            secret_name = service_account.secrets[0].name
+        if not secret_name:
+            raise Exception(
+                "Failed getting the secret from service account {}".format(name)
+            )
+        secret = v1_core.list_namespaced_secret(
+            namespace, field_selector="metadata.name={}".format(secret_name)
+        ).items[0]
+
+        token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+        client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+        return (
+            base64.b64decode(token).decode("utf-8"),
+            base64.b64decode(client_certificate_data).decode("utf-8"),
+        )
index d7f55be..1423c61 100644 (file)
@@ -75,25 +75,22 @@ class K8sJujuConnTestCase(asynctest.TestCase):
         )
         self.k8s_juju_conn._store.get_vca_id.return_value = None
         self.k8s_juju_conn.libjuju = Mock()
+        # Mock Kubectl
+        self.kubectl = Mock()
+        self.kubectl.get_secret_data = AsyncMock()
+        self.kubectl.get_secret_data.return_value = ("token", "cacert")
+        self.kubectl.get_services.return_value = [{}]
+        self.k8s_juju_conn._get_kubectl = Mock()
+        self.k8s_juju_conn._get_kubectl.return_value = self.kubectl
 
 
-@asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
 class InitEnvTest(K8sJujuConnTestCase):
     def setUp(self):
         super(InitEnvTest, self).setUp()
         self.k8s_juju_conn.libjuju.add_k8s = AsyncMock()
-        self.k8s_juju_conn._create_cluster_role = Mock()
-        self.k8s_juju_conn._create_service_account = Mock()
-        self.k8s_juju_conn._create_cluster_role_binding = Mock()
-        self.k8s_juju_conn._delete_cluster_role = Mock()
-        self.k8s_juju_conn._delete_service_account = Mock()
-        self.k8s_juju_conn._delete_cluster_role_binding = Mock()
-        self.k8s_juju_conn._get_secret_data = AsyncMock()
-        self.k8s_juju_conn._get_secret_data.return_value = ("token", "cacert")
 
     def test_with_cluster_uuid(
         self,
-        mock_get_default_storage_class,
     ):
         reuse_cluster_uuid = "uuid"
         uuid, created = self.loop.run_until_complete(
@@ -104,12 +101,11 @@ class InitEnvTest(K8sJujuConnTestCase):
 
         self.assertTrue(created)
         self.assertEqual(uuid, reuse_cluster_uuid)
-        mock_get_default_storage_class.assert_called_once()
+        self.kubectl.get_default_storage_class.assert_called_once()
         self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
 
     def test_with_no_cluster_uuid(
         self,
-        mock_get_default_storage_class,
     ):
         uuid, created = self.loop.run_until_complete(
             self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
@@ -117,12 +113,11 @@ class InitEnvTest(K8sJujuConnTestCase):
 
         self.assertTrue(created)
         self.assertTrue(isinstance(uuid, str))
-        mock_get_default_storage_class.assert_called_once()
+        self.kubectl.get_default_storage_class.assert_called_once()
         self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
 
     def test_init_env_exception(
         self,
-        mock_get_default_storage_class,
     ):
         self.k8s_juju_conn.libjuju.add_k8s.side_effect = Exception()
         created = None
@@ -134,7 +129,6 @@ class InitEnvTest(K8sJujuConnTestCase):
 
         self.assertIsNone(created)
         self.assertIsNone(uuid)
-        mock_get_default_storage_class.assert_called_once()
         self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
 
 
@@ -183,9 +177,6 @@ class ResetTest(K8sJujuConnTestCase):
         cloud_creds = Mock()
         cloud_creds.result = {"attrs": {RBAC_LABEL_KEY_NAME: "asd"}}
         self.k8s_juju_conn.libjuju.get_cloud_credentials.return_value = [cloud_creds]
-        self.k8s_juju_conn._delete_cluster_role_binding = Mock()
-        self.k8s_juju_conn._delete_service_account = Mock()
-        self.k8s_juju_conn._delete_cluster_role = Mock()
         self.k8s_juju_conn.get_credentials = Mock()
         self.k8s_juju_conn.get_credentials.return_value = kubeconfig
 
@@ -614,26 +605,24 @@ class GetServicesTest(K8sJujuConnTestCase):
     def setUp(self):
         super(GetServicesTest, self).setUp()
 
-    @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
     @asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
-    def test_success(self, mock_get_credentials, mock_get_services):
+    def test_success(self, mock_get_credentials):
         mock_get_credentials.return_value = kubeconfig
         self.loop.run_until_complete(self.k8s_juju_conn.get_services("", "", ""))
         mock_get_credentials.assert_called_once()
-        mock_get_services.assert_called_once()
+        self.kubectl.get_services.assert_called_once()
 
 
 class GetServiceTest(K8sJujuConnTestCase):
     def setUp(self):
         super(GetServiceTest, self).setUp()
 
-    @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
     @asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
-    def test_success(self, mock_get_credentials, mock_get_services):
+    def test_success(self, mock_get_credentials):
         mock_get_credentials.return_value = kubeconfig
         self.loop.run_until_complete(self.k8s_juju_conn.get_service("", "", ""))
         mock_get_credentials.assert_called_once()
-        mock_get_services.assert_called_once()
+        self.kubectl.get_services.assert_called_once()
 
 
 class GetCredentialsTest(K8sJujuConnTestCase):
index 28d66c5..eb9b01d 100644 (file)
@@ -150,16 +150,26 @@ class GetServices(TestCase):
             self.kubectl.get_services()
 
 
-@mock.patch("kubernetes.config.kube_config.Configuration")
+@mock.patch("n2vc.kubectl.client")
+@mock.patch("n2vc.kubectl.config.kube_config.Configuration.get_default_copy")
 @mock.patch("n2vc.kubectl.config.load_kube_config")
 class GetConfiguration(KubectlTestCase):
     def setUp(self):
         super(GetConfiguration, self).setUp()
 
-    def test_get_configuration(self, mock_load_kube_config, mock_configuration):
+    def test_get_configuration(
+        self,
+        mock_load_kube_config,
+        mock_configuration,
+        mock_client,
+    ):
         kubectl = Kubectl()
         kubectl.configuration
         mock_configuration.assert_called_once()
+        mock_load_kube_config.assert_called_once()
+        mock_client.CoreV1Api.assert_called_once()
+        mock_client.RbacAuthorizationV1Api.assert_called_once()
+        mock_client.StorageV1Api.assert_called_once()
 
 
 @mock.patch("kubernetes.client.StorageV1Api.list_storage_class")
index 277ee19..91e8bf5 100644 (file)
@@ -12,8 +12,8 @@
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
-juju==2.8.6
-kubernetes==10.0.1
+juju
+kubernetes
 pyasn1
 motor==1.3.1
 retrying-async
index 5c30a05..9a5e5cf 100644 (file)
@@ -21,11 +21,11 @@ google-auth==1.28.0
     # via kubernetes
 idna==2.10
     # via requests
-juju==2.8.6
+juju==2.9.2
     # via -r requirements.in
 jujubundlelib==0.5.6
     # via theblues
-kubernetes==10.0.1
+kubernetes==17.17.0
     # via -r requirements.in
 macaroonbakery==1.3.1
     # via