Improved Primitive support and better testing
[osm/N2VC.git] / modules / libjuju / tests / base.py
index 96ed9c7..97eea53 100644 (file)
-import mock
+import inspect
 import subprocess
 import uuid
+from contextlib import contextmanager
+from pathlib import Path
 
-import pytest
-
+import mock
+from juju.client.jujudata import FileJujuData
 from juju.controller import Controller
-from juju.client.connection import JujuData
+
+import pytest
 
 
 def is_bootstrapped():
-    result = subprocess.run(['juju', 'switch'], stdout=subprocess.PIPE)
-    return (
-        result.returncode == 0 and
-        len(result.stdout.decode().strip()) > 0)
+    try:
+        result = subprocess.run(['juju', 'switch'], stdout=subprocess.PIPE)
+        return (
+            result.returncode == 0 and
+            len(result.stdout.decode().strip()) > 0)
+    except FileNotFoundError:
+        return False
+
+
 
 bootstrapped = pytest.mark.skipif(
     not is_bootstrapped(),
     reason='bootstrapped Juju environment required')
 
+test_run_nonce = uuid.uuid4().hex[-4:]
+
 
 class CleanController():
+    """
+    Context manager that automatically connects and disconnects from
+    the currently active controller.
+
+    Note: Unlike CleanModel, this will not create a new controller for you,
+    and an active controller must already be available.
+    """
     def __init__(self):
-        self.controller = None
+        self._controller = None
 
     async def __aenter__(self):
-        self.controller = Controller()
-        await self.controller.connect_current()
-        return self.controller
+        self._controller = Controller()
+        await self._controller.connect()
+        return self._controller
 
     async def __aexit__(self, exc_type, exc, tb):
-        await self.controller.disconnect()
+        await self._controller.disconnect()
 
 
 class CleanModel():
-    def __init__(self):
-        self.user_name = None
-        self.controller = None
-        self.controller_name = None
-        self.model = None
-        self.model_name = None
-        self.model_uuid = None
+    """
+    Context manager that automatically connects to the currently active
+    controller, adds a fresh model, returns the connection to that model,
+    and automatically disconnects and cleans up the model.
+
+    The new model is also set as the current default for the controller
+    connection.
+    """
+    def __init__(self, bakery_client=None):
+        self._controller = None
+        self._model = None
+        self._model_uuid = None
+        self._bakery_client = bakery_client
 
     async def __aenter__(self):
-        self.controller = Controller()
-        juju_data = JujuData()
-        self.controller_name = juju_data.current_controller()
-        self.user_name = juju_data.accounts()[self.controller_name]['user']
-        await self.controller.connect_controller(self.controller_name)
-
-        self.model_name = 'test-{}'.format(uuid.uuid4())
-        self.model = await self.controller.add_model(self.model_name)
+        model_nonce = uuid.uuid4().hex[-4:]
+        frame = inspect.stack()[1]
+        test_name = frame.function.replace('_', '-')
+        jujudata = TestJujuData()
+        self._controller = Controller(
+            jujudata=jujudata,
+            bakery_client=self._bakery_client,
+        )
+        controller_name = jujudata.current_controller()
+        user_name = jujudata.accounts()[controller_name]['user']
+        await self._controller.connect(controller_name)
+
+        model_name = 'test-{}-{}-{}'.format(
+            test_run_nonce,
+            test_name,
+            model_nonce,
+        )
+        self._model = await self._controller.add_model(model_name)
+
+        # Change the JujuData instance so that it will return the new
+        # model as the current model name, so that we'll connect
+        # to it by default.
+        jujudata.set_model(
+            controller_name,
+            user_name + "/" + model_name,
+            self._model.info.uuid,
+        )
 
         # save the model UUID in case test closes model
-        self.model_uuid = self.model.info.uuid
-
-        # Ensure that we connect to the new model by default.  This also
-        # prevents failures if test was started with no current model.
-        self._patch_cm = mock.patch.object(JujuData, 'current_model',
-                                           return_value=self.model_name)
-        self._patch_cm.start()
-
-        # Ensure that the models data includes this model, since it doesn't
-        # get added to the client store by Controller.add_model().
-        self._orig_models = JujuData().models
-        self._patch_models = mock.patch.object(JujuData, 'models',
-                                               side_effect=self._models)
-        self._patch_models.start()
-
-        return self.model
-
-    def _models(self):
-        result = self._orig_models()
-        models = result[self.controller_name]['models']
-        full_model_name = '{}/{}'.format(self.user_name, self.model_name)
-        if full_model_name not in models:
-            models[full_model_name] = {'uuid': self.model_uuid}
-        return result
+        self._model_uuid = self._model.info.uuid
+
+        return self._model
 
     async def __aexit__(self, exc_type, exc, tb):
-        self._patch_models.stop()
-        self._patch_cm.stop()
-        await self.model.disconnect()
-        await self.controller.destroy_model(self.model_uuid)
-        await self.controller.disconnect()
+        await self._model.disconnect()
+        await self._controller.destroy_model(self._model_uuid)
+        await self._controller.disconnect()
+
+
+class TestJujuData(FileJujuData):
+    def __init__(self):
+        self.__controller_name = None
+        self.__model_name = None
+        self.__model_uuid = None
+        super().__init__()
+
+    def set_model(self, controller_name, model_name, model_uuid):
+        self.__controller_name = controller_name
+        self.__model_name = model_name
+        self.__model_uuid = model_uuid
+
+    def current_model(self, *args, **kwargs):
+        return self.__model_name or super().current_model(*args, **kwargs)
+
+    def models(self):
+        all_models = super().models()
+        if self.__model_name is None:
+            return all_models
+        all_models.setdefault(self.__controller_name, {})
+        all_models[self.__controller_name].setdefault('models', {})
+        cmodels = all_models[self.__controller_name]['models']
+        cmodels[self.__model_name] = {'uuid': self.__model_uuid}
+        return all_models
 
 
 class AsyncMock(mock.MagicMock):
     async def __call__(self, *args, **kwargs):
         return super().__call__(*args, **kwargs)
+
+
+@contextmanager
+def patch_file(filename):
+    """
+    "Patch" a file so that its current contents are automatically restored
+    when the context is exited.
+    """
+    filepath = Path(filename).expanduser()
+    data = filepath.read_bytes()
+    try:
+        yield
+    finally:
+        filepath.write_bytes(data)