New N2VC interface + updated libjuju
[osm/N2VC.git] / modules / libjuju / juju / model.py
index fc8d5e9..ac22599 100644 (file)
@@ -14,26 +14,25 @@ from concurrent.futures import CancelledError
 from functools import partial
 from pathlib import Path
 
 from functools import partial
 from pathlib import Path
 
-import websockets
-import yaml
 import theblues.charmstore
 import theblues.errors
 import theblues.charmstore
 import theblues.errors
+import websockets
+import yaml
 
 from . import tag, utils
 
 from . import tag, utils
-from .client import client
-from .client import connection
+from .client import client, connector
 from .client.client import ConfigValue
 from .client.client import ConfigValue
-from .constraints import parse as parse_constraints, normalize_key
-from .delta import get_entity_delta
-from .delta import get_entity_class
+from .constraints import parse as parse_constraints
+from .constraints import normalize_key
+from .delta import get_entity_class, get_entity_delta
+from .errors import JujuAPIError, JujuError
 from .exceptions import DeadEntityException
 from .exceptions import DeadEntityException
-from .errors import JujuError, JujuAPIError
 from .placement import parse as parse_placement
 
 log = logging.getLogger(__name__)
 
 
 from .placement import parse as parse_placement
 
 log = logging.getLogger(__name__)
 
 
-class _Observer(object):
+class _Observer:
     """Wrapper around an observer callable.
 
     This wrapper allows filter criteria to be associated with the
     """Wrapper around an observer callable.
 
     This wrapper allows filter criteria to be associated with the
@@ -77,7 +76,7 @@ class _Observer(object):
         return True
 
 
         return True
 
 
-class ModelObserver(object):
+class ModelObserver:
     """
     Base class for creating observers that react to changes in a model.
     """
     """
     Base class for creating observers that react to changes in a model.
     """
@@ -100,7 +99,7 @@ class ModelObserver(object):
         pass
 
 
         pass
 
 
-class ModelState(object):
+class ModelState:
     """Holds the state of the model, including the delta history of all
     entities in the model.
 
     """Holds the state of the model, including the delta history of all
     entities in the model.
 
@@ -144,6 +143,14 @@ class ModelState(object):
         """
         return self._live_entity_map('unit')
 
         """
         return self._live_entity_map('unit')
 
+    @property
+    def relations(self):
+        """Return a map of relation-id:Relation for all relations currently in
+        the model.
+
+        """
+        return self._live_entity_map('relation')
+
     def entity_history(self, entity_type, entity_id):
         """Return the history deque for an entity.
 
     def entity_history(self, entity_type, entity_id):
         """Return the history deque for an entity.
 
@@ -209,7 +216,7 @@ class ModelState(object):
             connected=connected)
 
 
             connected=connected)
 
 
-class ModelEntity(object):
+class ModelEntity:
     """An object in the Model tree"""
 
     def __init__(self, entity_id, model, history_index=-1, connected=True):
     """An object in the Model tree"""
 
     def __init__(self, entity_id, model, history_index=-1, connected=True):
@@ -228,7 +235,7 @@ class ModelEntity(object):
         self.model = model
         self._history_index = history_index
         self.connected = connected
         self.model = model
         self._history_index = history_index
         self.connected = connected
-        self.connection = model.connection
+        self.connection = model.connection()
 
     def __repr__(self):
         return '<{} entity_id="{}">'.format(type(self).__name__,
 
     def __repr__(self):
         return '<{} entity_id="{}">'.format(type(self).__name__,
@@ -380,90 +387,148 @@ class ModelEntity(object):
         return self.model.state.get_entity(self.entity_type, self.entity_id)
 
 
         return self.model.state.get_entity(self.entity_type, self.entity_id)
 
 
-class Model(object):
+class Model:
     """
     The main API for interacting with a Juju model.
     """
     """
     The main API for interacting with a Juju model.
     """
-    def __init__(self, loop=None,
-                 max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE):
-        """Instantiate a new connected Model.
+    def __init__(
+        self,
+        loop=None,
+        max_frame_size=None,
+        bakery_client=None,
+        jujudata=None,
+    ):
+        """Instantiate a new Model.
+
+        The connect method will need to be called before this
+        object can be used for anything interesting.
+
+        If jujudata is None, jujudata.FileJujuData will be used.
 
         :param loop: an asyncio event loop
         :param max_frame_size: See
             `juju.client.connection.Connection.MAX_FRAME_SIZE`
 
         :param loop: an asyncio event loop
         :param max_frame_size: See
             `juju.client.connection.Connection.MAX_FRAME_SIZE`
+        :param bakery_client httpbakery.Client: The bakery client to use
+            for macaroon authorization.
+        :param jujudata JujuData: The source for current controller information.
+        """
+        self._connector = connector.Connector(
+            loop=loop,
+            max_frame_size=max_frame_size,
+            bakery_client=bakery_client,
+            jujudata=jujudata,
+        )
+        self._observers = weakref.WeakValueDictionary()
+        self.state = ModelState(self)
+        self._info = None
+        self._watch_stopping = asyncio.Event(loop=self._connector.loop)
+        self._watch_stopped = asyncio.Event(loop=self._connector.loop)
+        self._watch_received = asyncio.Event(loop=self._connector.loop)
+        self._watch_stopped.set()
+        self._charmstore = CharmStore(self._connector.loop)
+
+    def is_connected(self):
+        """Reports whether the Model is currently connected."""
+        return self._connector.is_connected()
+
+    @property
+    def loop(self):
+        return self._connector.loop
 
 
+    def connection(self):
+        """Return the current Connection object. It raises an exception
+        if the Model is disconnected"""
+        return self._connector.connection()
+
+    async def get_controller(self):
+        """Return a Controller instance for the currently connected model.
+        :return Controller:
         """
         """
-        self.loop = loop or asyncio.get_event_loop()
-        self.max_frame_size = max_frame_size
-        self.connection = None
-        self.observers = weakref.WeakValueDictionary()
-        self.state = ModelState(self)
-        self.info = None
-        self._watch_stopping = asyncio.Event(loop=self.loop)
-        self._watch_stopped = asyncio.Event(loop=self.loop)
-        self._watch_received = asyncio.Event(loop=self.loop)
-        self._charmstore = CharmStore(self.loop)
+        from juju.controller import Controller
+        controller = Controller(jujudata=self._connector.jujudata)
+        kwargs = self.connection().connect_params()
+        kwargs.pop('uuid')
+        await controller._connect_direct(**kwargs)
+        return controller
 
     async def __aenter__(self):
 
     async def __aenter__(self):
-        await self.connect_current()
+        await self.connect()
         return self
 
     async def __aexit__(self, exc_type, exc, tb):
         await self.disconnect()
 
         return self
 
     async def __aexit__(self, exc_type, exc, tb):
         await self.disconnect()
 
-        if exc_type is not None:
-            return False
+    async def connect(self, model_name=None, **kwargs):
+        """Connect to a juju model.
 
 
-    async def connect(self, *args, **kw):
-        """Connect to an arbitrary Juju model.
+        If any arguments are specified other than model_name, then
+        model_name must be None and an explicit connection will be made
+        using Connection.connect using those parameters (the 'uuid'
+        parameter must be specified).
 
 
-        args and kw are passed through to Connection.connect()
+        Otherwise, if model_name is None, connect to the current model.
 
 
-        """
-        if 'loop' not in kw:
-            kw['loop'] = self.loop
-        if 'max_frame_size' not in kw:
-            kw['max_frame_size'] = self.max_frame_size
-        self.connection = await connection.Connection.connect(*args, **kw)
-        await self._after_connect()
+        Otherwise, model_name must specify the name of a known
+        model.
 
 
-    async def connect_current(self):
-        """Connect to the current Juju model.
+        :param model_name:  Format [controller:][user/]model
 
         """
 
         """
-        self.connection = await connection.Connection.connect_current(
-            self.loop, max_frame_size=self.max_frame_size)
+        await self.disconnect()
+        if not kwargs:
+            await self._connector.connect_model(model_name)
+        else:
+            if kwargs.get('uuid') is None:
+                raise ValueError('no UUID specified when connecting to model')
+            await self._connector.connect(**kwargs)
         await self._after_connect()
 
     async def connect_model(self, model_name):
         await self._after_connect()
 
     async def connect_model(self, model_name):
-        """Connect to a specific Juju model by name.
-
-        :param model_name:  Format [controller:][user/]model
+        """
+        .. deprecated:: 0.6.2
+           Use connect(model_name=model_name) instead.
+        """
+        return await self.connect(model_name=model_name)
 
 
+    async def connect_current(self):
+        """
+        .. deprecated:: 0.6.2
+           Use connect instead.
         """
         """
-        self.connection = await connection.Connection.connect_model(
-            model_name, self.loop, self.max_frame_size)
+        return await self.connect()
+
+    async def _connect_direct(self, **kwargs):
+        await self.disconnect()
+        await self._connector.connect(**kwargs)
         await self._after_connect()
 
     async def _after_connect(self):
         await self._after_connect()
 
     async def _after_connect(self):
-        """Run initialization steps after connecting to websocket.
-
-        """
         self._watch()
         self._watch()
+
+        # Wait for the first packet of data from the AllWatcher,
+        # which contains all information on the model.
+        # TODO this means that we can't do anything until
+        # we've received all the model data, which might be
+        # a whole load of unneeded data if all the client wants
+        # to do is make one RPC call.
         await self._watch_received.wait()
         await self._watch_received.wait()
+
         await self.get_info()
 
     async def disconnect(self):
         """Shut down the watcher task and close websockets.
 
         """
         await self.get_info()
 
     async def disconnect(self):
         """Shut down the watcher task and close websockets.
 
         """
-        if self.connection and self.connection.is_open:
+        if not self._watch_stopped.is_set():
             log.debug('Stopping watcher task')
             self._watch_stopping.set()
             await self._watch_stopped.wait()
             log.debug('Stopping watcher task')
             self._watch_stopping.set()
             await self._watch_stopped.wait()
+            self._watch_stopping.clear()
+
+        if self.is_connected():
             log.debug('Closing model connection')
             log.debug('Closing model connection')
-            await self.connection.close()
-            self.connection = None
+            await self._connector.disconnect()
+            self.info = None
 
     async def add_local_charm_dir(self, charm_dir, series):
         """Upload a local charm to the model.
 
     async def add_local_charm_dir(self, charm_dir, series):
         """Upload a local charm to the model.
@@ -480,7 +545,7 @@ class Model(object):
         with fh:
             func = partial(
                 self.add_local_charm, fh, series, os.stat(fh.name).st_size)
         with fh:
             func = partial(
                 self.add_local_charm, fh, series, os.stat(fh.name).st_size)
-            charm_url = await self.loop.run_in_executor(None, func)
+            charm_url = await self._connector.loop.run_in_executor(None, func)
 
         log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
         return charm_url
 
         log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
         return charm_url
@@ -505,7 +570,7 @@ class Model(object):
            instead.
 
         """
            instead.
 
         """
-        conn, headers, path_prefix = self.connection.https_connection()
+        conn, headers, path_prefix = self.connection().https_connection()
         path = "%s/charms?series=%s" % (path_prefix, series)
         headers['Content-Type'] = 'application/zip'
         if size:
         path = "%s/charms?series=%s" % (path_prefix, series)
         headers['Content-Type'] = 'application/zip'
         if size:
@@ -549,13 +614,20 @@ class Model(object):
     async def block_until(self, *conditions, timeout=None, wait_period=0.5):
         """Return only after all conditions are true.
 
     async def block_until(self, *conditions, timeout=None, wait_period=0.5):
         """Return only after all conditions are true.
 
+        Raises `websockets.ConnectionClosed` if disconnected.
         """
         """
-        async def _block():
-            while not all(c() for c in conditions):
-                if not (self.connection and self.connection.is_open):
-                    raise websockets.ConnectionClosed(1006, 'no reason')
-                await asyncio.sleep(wait_period, loop=self.loop)
-        await asyncio.wait_for(_block(), timeout, loop=self.loop)
+        def _disconnected():
+            return not (self.is_connected() and self.connection().is_open)
+
+        def done():
+            return _disconnected() or all(c() for c in conditions)
+
+        await utils.block_until(done,
+                                timeout=timeout,
+                                wait_period=wait_period,
+                                loop=self.loop)
+        if _disconnected():
+            raise websockets.ConnectionClosed(1006, 'no reason')
 
     @property
     def applications(self):
 
     @property
     def applications(self):
@@ -581,6 +653,13 @@ class Model(object):
         """
         return self.state.units
 
         """
         return self.state.units
 
+    @property
+    def relations(self):
+        """Return a list of all Relations currently in the model.
+
+        """
+        return list(self.state.relations.values())
+
     async def get_info(self):
         """Return a client.ModelInfo object for this Model.
 
     async def get_info(self):
         """Return a client.ModelInfo object for this Model.
 
@@ -594,7 +673,7 @@ class Model(object):
         explicit call to this method.
 
         """
         explicit call to this method.
 
         """
-        facade = client.ClientFacade.from_connection(self.connection)
+        facade = client.ClientFacade.from_connection(self.connection())
 
         self.info = await facade.ModelInfo()
         log.debug('Got ModelInfo: %s', vars(self.info))
 
         self.info = await facade.ModelInfo()
         log.debug('Got ModelInfo: %s', vars(self.info))
@@ -639,7 +718,7 @@ class Model(object):
         """
         observer = _Observer(
             callable_, entity_type, action, entity_id, predicate)
         """
         observer = _Observer(
             callable_, entity_type, action, entity_id, predicate)
-        self.observers[observer] = callable_
+        self._observers[observer] = callable_
 
     def _watch(self):
         """Start an asynchronous watch against this model.
 
     def _watch(self):
         """Start an asynchronous watch against this model.
@@ -650,13 +729,13 @@ class Model(object):
         async def _all_watcher():
             try:
                 allwatcher = client.AllWatcherFacade.from_connection(
         async def _all_watcher():
             try:
                 allwatcher = client.AllWatcherFacade.from_connection(
-                    self.connection)
+                    self.connection())
                 while not self._watch_stopping.is_set():
                     try:
                         results = await utils.run_with_interrupt(
                             allwatcher.Next(),
                             self._watch_stopping,
                 while not self._watch_stopping.is_set():
                     try:
                         results = await utils.run_with_interrupt(
                             allwatcher.Next(),
                             self._watch_stopping,
-                            self.loop)
+                            self._connector.loop)
                     except JujuAPIError as e:
                         if 'watcher was stopped' not in str(e):
                             raise
                     except JujuAPIError as e:
                         if 'watcher was stopped' not in str(e):
                             raise
@@ -673,19 +752,27 @@ class Model(object):
                         del allwatcher.Id
                         continue
                     except websockets.ConnectionClosed:
                         del allwatcher.Id
                         continue
                     except websockets.ConnectionClosed:
-                        monitor = self.connection.monitor
+                        monitor = self.connection().monitor
                         if monitor.status == monitor.ERROR:
                             # closed unexpectedly, try to reopen
                             log.warning(
                                 'Watcher: connection closed, reopening')
                         if monitor.status == monitor.ERROR:
                             # closed unexpectedly, try to reopen
                             log.warning(
                                 'Watcher: connection closed, reopening')
-                            await self.connection.reconnect()
+                            await self.connection().reconnect()
+                            if monitor.status != monitor.CONNECTED:
+                                # reconnect failed; abort and shutdown
+                                log.error('Watcher: automatic reconnect '
+                                          'failed; stopping watcher')
+                                break
                             del allwatcher.Id
                             continue
                         else:
                             # closed on request, go ahead and shutdown
                             break
                     if self._watch_stopping.is_set():
                             del allwatcher.Id
                             continue
                         else:
                             # closed on request, go ahead and shutdown
                             break
                     if self._watch_stopping.is_set():
-                        await allwatcher.Stop()
+                        try:
+                            await allwatcher.Stop()
+                        except websockets.ConnectionClosed:
+                            pass  # can't stop on a closed conn
                         break
                     for delta in results.deltas:
                         delta = get_entity_delta(delta)
                         break
                     for delta in results.deltas:
                         delta = get_entity_delta(delta)
@@ -704,7 +791,7 @@ class Model(object):
         self._watch_received.clear()
         self._watch_stopping.clear()
         self._watch_stopped.clear()
         self._watch_received.clear()
         self._watch_stopping.clear()
         self._watch_stopped.clear()
-        self.loop.create_task(_all_watcher())
+        self._connector.loop.create_task(_all_watcher())
 
     async def _notify_observers(self, delta, old_obj, new_obj):
         """Call observing callbacks, notifying them of a change in model state
 
     async def _notify_observers(self, delta, old_obj, new_obj):
         """Call observing callbacks, notifying them of a change in model state
@@ -724,10 +811,10 @@ class Model(object):
             'Model changed: %s %s %s',
             delta.entity, delta.type, delta.get_id())
 
             'Model changed: %s %s %s',
             delta.entity, delta.type, delta.get_id())
 
-        for o in self.observers:
+        for o in self._observers:
             if o.cares_about(delta):
                 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
             if o.cares_about(delta):
                 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
-                                      loop=self.loop)
+                                      loop=self._connector.loop)
 
     async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
 
     async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
@@ -744,7 +831,7 @@ class Model(object):
             has a 'completed' status. See the _Observer class for details.
 
         """
             has a 'completed' status. See the _Observer class for details.
 
         """
-        q = asyncio.Queue(loop=self.loop)
+        q = asyncio.Queue(loop=self._connector.loop)
 
         async def callback(delta, old, new, model):
             await q.put(delta.get_id())
 
         async def callback(delta, old, new, model):
             await q.put(delta.get_id())
@@ -755,24 +842,19 @@ class Model(object):
         # 'remove' action
         return self.state._live_entity_map(entity_type).get(entity_id)
 
         # 'remove' action
         return self.state._live_entity_map(entity_type).get(entity_id)
 
-    async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
+    async def _wait_for_new(self, entity_type, entity_id):
         """Wait for a new object to appear in the Model and return it.
 
         """Wait for a new object to appear in the Model and return it.
 
-        Waits for an object of type ``entity_type`` with id ``entity_id``.
-        If ``entity_id`` is ``None``, it will wait for the first new entity
-        of the correct type.
-
-        This coroutine blocks until the new object appears in the model.
+        Waits for an object of type ``entity_type`` with id ``entity_id``
+        to appear in the model.  This is similar to watching for the
+        object using ``block_until``, but uses the watcher rather than
+        polling.
 
         """
         # if the entity is already in the model, just return it
         if entity_id in self.state._live_entity_map(entity_type):
             return self.state._live_entity_map(entity_type)[entity_id]
 
         """
         # if the entity is already in the model, just return it
         if entity_id in self.state._live_entity_map(entity_type):
             return self.state._live_entity_map(entity_type)[entity_id]
-        # if we know the entity_id, we can trigger on any action that puts
-        # the enitty into the model; otherwise, we have to watch for the
-        # next "add" action on that entity_type
-        action = 'add' if entity_id is None else None
-        return await self._wait(entity_type, entity_id, action, predicate)
+        return await self._wait(entity_type, entity_id, None)
 
     async def wait_for_action(self, action_id):
         """Given an action, wait for it to complete."""
 
     async def wait_for_action(self, action_id):
         """Given an action, wait for it to complete."""
@@ -785,7 +867,7 @@ class Model(object):
         def predicate(delta):
             return delta.data['status'] in ('completed', 'failed')
 
         def predicate(delta):
             return delta.data['status'] in ('completed', 'failed')
 
-        return await self._wait('action', action_id, 'change', predicate)
+        return await self._wait('action', action_id, None, predicate)
 
     async def add_machine(
             self, spec=None, constraints=None, disks=None, series=None):
 
     async def add_machine(
             self, spec=None, constraints=None, disks=None, series=None):
@@ -865,7 +947,7 @@ class Model(object):
             params.series = series
 
         # Submit the request.
             params.series = series
 
         # Submit the request.
-        client_facade = client.ClientFacade.from_connection(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection())
         results = await client_facade.AddMachines([params])
         error = results.machines[0].error
         if error:
         results = await client_facade.AddMachines([params])
         error = results.machines[0].error
         if error:
@@ -881,29 +963,33 @@ class Model(object):
         :param str relation2: '<application>[:<relation_name>]'
 
         """
         :param str relation2: '<application>[:<relation_name>]'
 
         """
-        app_facade = client.ApplicationFacade.from_connection(self.connection)
+        app_facade = client.ApplicationFacade.from_connection(self.connection())
 
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
 
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
+        def _find_relation(*specs):
+            for rel in self.relations:
+                if rel.matches(*specs):
+                    return rel
+            return None
+
         try:
             result = await app_facade.AddRelation([relation1, relation2])
         except JujuAPIError as e:
             if 'relation already exists' not in e.message:
                 raise
         try:
             result = await app_facade.AddRelation([relation1, relation2])
         except JujuAPIError as e:
             if 'relation already exists' not in e.message:
                 raise
-            log.debug(
-                'Relation %s <-> %s already exists', relation1, relation2)
-            # TODO: if relation already exists we should return the
-            # Relation ModelEntity here
-            return None
+            rel = _find_relation(relation1, relation2)
+            if rel:
+                return rel
+            raise JujuError('Relation {} {} exists but not in model'.format(
+                relation1, relation2))
 
 
-        def predicate(delta):
-            endpoints = {}
-            for endpoint in delta.data['endpoints']:
-                endpoints[endpoint['application-name']] = endpoint['relation']
-            return endpoints == result.endpoints
+        specs = ['{}:{}'.format(app, data['name'])
+                 for app, data in result.endpoints.items()]
 
 
-        return await self._wait_for_new('relation', None, predicate)
+        await self.block_until(lambda: _find_relation(*specs) is not None)
+        return _find_relation(*specs)
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
@@ -924,7 +1010,7 @@ class Model(object):
         :param str key: The public ssh key
 
         """
         :param str key: The public ssh key
 
         """
-        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
         return await key_facade.AddKeys([key], user)
     add_ssh_keys = add_ssh_key
 
         return await key_facade.AddKeys([key], user)
     add_ssh_keys = add_ssh_key
 
@@ -1072,9 +1158,14 @@ class Model(object):
                 for k, v in storage.items()
             }
 
                 for k, v in storage.items()
             }
 
+        entity_path = Path(entity_url.replace('local:', ''))
+        bundle_path = entity_path / 'bundle.yaml'
+        metadata_path = entity_path / 'metadata.yaml'
+
         is_local = (
             entity_url.startswith('local:') or
         is_local = (
             entity_url.startswith('local:') or
-            os.path.isdir(entity_url)
+            entity_path.is_dir() or
+            entity_path.is_file()
         )
         if is_local:
             entity_id = entity_url.replace('local:', '')
         )
         if is_local:
             entity_id = entity_url.replace('local:', '')
@@ -1082,10 +1173,11 @@ class Model(object):
             entity = await self.charmstore.entity(entity_url, channel=channel)
             entity_id = entity['Id']
 
             entity = await self.charmstore.entity(entity_url, channel=channel)
             entity_id = entity['Id']
 
-        client_facade = client.ClientFacade.from_connection(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection())
 
         is_bundle = ((is_local and
 
         is_bundle = ((is_local and
-                      (Path(entity_id) / 'bundle.yaml').exists()) or
+                      (entity_id.endswith('.yaml') and entity_path.exists()) or
+                      bundle_path.exists()) or
                      (not is_local and 'bundle/' in entity_id))
 
         if is_bundle:
                      (not is_local and 'bundle/' in entity_id))
 
         if is_bundle:
@@ -1100,9 +1192,9 @@ class Model(object):
                 await asyncio.gather(*[
                     asyncio.ensure_future(
                         self._wait_for_new('application', app_name),
                 await asyncio.gather(*[
                     asyncio.ensure_future(
                         self._wait_for_new('application', app_name),
-                        loop=self.loop)
+                        loop=self._connector.loop)
                     for app_name in pending_apps
                     for app_name in pending_apps
-                ], loop=self.loop)
+                ], loop=self._connector.loop)
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
@@ -1118,6 +1210,9 @@ class Model(object):
                                                             entity_id,
                                                             entity)
             else:
                                                             entity_id,
                                                             entity)
             else:
+                if not application_name:
+                    metadata = yaml.load(metadata_path.read_text())
+                    application_name = metadata['name']
                 # We have a local charm dir that needs to be uploaded
                 charm_dir = os.path.abspath(
                     os.path.expanduser(entity_id))
                 # We have a local charm dir that needs to be uploaded
                 charm_dir = os.path.abspath(
                     os.path.expanduser(entity_id))
@@ -1163,7 +1258,7 @@ class Model(object):
             return None
 
         resources_facade = client.ResourcesFacade.from_connection(
             return None
 
         resources_facade = client.ResourcesFacade.from_connection(
-            self.connection)
+            self.connection())
         response = await resources_facade.AddPendingResources(
             tag.application(application),
             entity_url,
         response = await resources_facade.AddPendingResources(
             tag.application(application),
             entity_url,
@@ -1186,7 +1281,7 @@ class Model(object):
                            default_flow_style=False)
 
         app_facade = client.ApplicationFacade.from_connection(
                            default_flow_style=False)
 
         app_facade = client.ApplicationFacade.from_connection(
-            self.connection)
+            self.connection())
 
         app = client.ApplicationDeploy(
             charm_url=charm_url,
 
         app = client.ApplicationDeploy(
             charm_url=charm_url,
@@ -1201,7 +1296,6 @@ class Model(object):
             storage=storage,
             placement=placement
         )
             storage=storage,
             placement=placement
         )
-
         result = await app_facade.Deploy([app])
         errors = [r.error.message for r in result.results if r.error]
         if errors:
         result = await app_facade.Deploy([app])
         errors = [r.error.message for r in result.results if r.error]
         if errors:
@@ -1218,7 +1312,7 @@ class Model(object):
         """Destroy units by name.
 
         """
         """Destroy units by name.
 
         """
-        app_facade = client.ApplicationFacade.from_connection(self.connection)
+        app_facade = client.ApplicationFacade.from_connection(self.connection())
 
         log.debug(
             'Destroying unit%s %s',
 
         log.debug(
             'Destroying unit%s %s',
@@ -1263,7 +1357,7 @@ class Model(object):
             which have `source` and `value` attributes.
         """
         config_facade = client.ModelConfigFacade.from_connection(
             which have `source` and `value` attributes.
         """
         config_facade = client.ModelConfigFacade.from_connection(
-            self.connection
+            self.connection()
         )
         result = await config_facade.ModelGet()
         config = result.config
         )
         result = await config_facade.ModelGet()
         config = result.config
@@ -1277,22 +1371,6 @@ class Model(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    async def grant(self, username, acl='read'):
-        """Grant a user access to this model.
-
-        :param str username: Username
-        :param str acl: Access control ('read' or 'write')
-
-        """
-        controller_conn = await self.connection.controller()
-        model_facade = client.ModelManagerFacade.from_connection(
-            controller_conn)
-        user = tag.user(username)
-        model = tag.model(self.info.uuid)
-        changes = client.ModifyModelAccess(acl, 'grant', model, user)
-        await self.revoke(username)
-        return await model_facade.ModifyModelAccess([changes])
-
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
 
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
 
@@ -1326,7 +1404,7 @@ class Model(object):
             else it's fingerprint
 
         """
             else it's fingerprint
 
         """
-        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
         entity = {'tag': tag.model(self.info.uuid)}
         entities = client.Entities([entity])
         return await key_facade.ListKeys(entities, raw_ssh)
         entity = {'tag': tag.model(self.info.uuid)}
         entities = client.Entities([entity])
         return await key_facade.ListKeys(entities, raw_ssh)
@@ -1399,7 +1477,7 @@ class Model(object):
         :param str user: Juju user to which the key is registered
 
         """
         :param str user: Juju user to which the key is registered
 
         """
-        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
         key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
         key = hashlib.md5(key).hexdigest()
         key = ':'.join(a + b for a, b in zip(key[::2], key[1::2]))
         key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
         key = hashlib.md5(key).hexdigest()
         key = ':'.join(a + b for a, b in zip(key[::2], key[1::2]))
@@ -1427,20 +1505,6 @@ class Model(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    async def revoke(self, username):
-        """Revoke a user's access to this model.
-
-        :param str username: Username to revoke
-
-        """
-        controller_conn = await self.connection.controller()
-        model_facade = client.ModelManagerFacade.from_connection(
-            controller_conn)
-        user = tag.user(username)
-        model = tag.model(self.info.uuid)
-        changes = client.ModifyModelAccess('read', 'revoke', model, user)
-        return await model_facade.ModifyModelAccess([changes])
-
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
 
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
 
@@ -1457,7 +1521,7 @@ class Model(object):
             `ConfigValue` instances, as returned by `get_config`.
         """
         config_facade = client.ModelConfigFacade.from_connection(
             `ConfigValue` instances, as returned by `get_config`.
         """
         config_facade = client.ModelConfigFacade.from_connection(
-            self.connection
+            self.connection()
         )
         for key, value in config.items():
             if isinstance(value, ConfigValue):
         )
         for key, value in config.items():
             if isinstance(value, ConfigValue):
@@ -1506,7 +1570,7 @@ class Model(object):
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        client_facade = client.ClientFacade.from_connection(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection())
         return await client_facade.FullStatus(filters)
 
     def sync_tools(
         return await client_facade.FullStatus(filters)
 
     def sync_tools(
@@ -1587,7 +1651,7 @@ class Model(object):
                   ', '.join(tags) if tags else "all units")
 
         metrics_facade = client.MetricsDebugFacade.from_connection(
                   ', '.join(tags) if tags else "all units")
 
         metrics_facade = client.MetricsDebugFacade.from_connection(
-            self.connection)
+            self.connection())
 
         entities = [client.Entity(tag) for tag in tags]
         metrics_result = await metrics_facade.GetMetrics(entities)
 
         entities = [client.Entity(tag) for tag in tags]
         metrics_result = await metrics_facade.GetMetrics(entities)
@@ -1623,7 +1687,7 @@ def get_charm_series(path):
     return series[0] if series else None
 
 
     return series[0] if series else None
 
 
-class BundleHandler(object):
+class BundleHandler:
     """
     Handle bundles by using the API to translate bundle YAML into a plan of
     steps and then dispatching each of those using the API.
     """
     Handle bundles by using the API to translate bundle YAML into a plan of
     steps and then dispatching each of those using the API.
@@ -1638,11 +1702,11 @@ class BundleHandler(object):
             app_units = self._units_by_app.setdefault(unit.application, [])
             app_units.append(unit_name)
         self.client_facade = client.ClientFacade.from_connection(
             app_units = self._units_by_app.setdefault(unit.application, [])
             app_units.append(unit_name)
         self.client_facade = client.ClientFacade.from_connection(
-            model.connection)
+            model.connection())
         self.app_facade = client.ApplicationFacade.from_connection(
         self.app_facade = client.ApplicationFacade.from_connection(
-            model.connection)
+            model.connection())
         self.ann_facade = client.AnnotationsFacade.from_connection(
         self.ann_facade = client.AnnotationsFacade.from_connection(
-            model.connection)
+            model.connection())
 
     async def _handle_local_charms(self, bundle):
         """Search for references to local charms (i.e. filesystem paths)
 
     async def _handle_local_charms(self, bundle):
         """Search for references to local charms (i.e. filesystem paths)
@@ -1694,8 +1758,11 @@ class BundleHandler(object):
         return bundle
 
     async def fetch_plan(self, entity_id):
         return bundle
 
     async def fetch_plan(self, entity_id):
-        is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
-        if is_local:
+        is_local = not entity_id.startswith('cs:')
+
+        if is_local and os.path.isfile(entity_id):
+            bundle_yaml = Path(entity_id).read_text()
+        elif is_local and os.path.isdir(entity_id):
             bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
         else:
             bundle_yaml = await self.charmstore.files(entity_id,
             bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
         else:
             bundle_yaml = await self.charmstore.files(entity_id,
@@ -1920,7 +1987,7 @@ class BundleHandler(object):
         return await entity.set_annotations(annotations)
 
 
         return await entity.set_annotations(annotations)
 
 
-class CharmStore(object):
+class CharmStore:
     """
     Async wrapper around theblues.charmstore.CharmStore
     """
     """
     Async wrapper around theblues.charmstore.CharmStore
     """
@@ -1952,7 +2019,7 @@ class CharmStore(object):
         return wrapper
 
 
         return wrapper
 
 
-class CharmArchiveGenerator(object):
+class CharmArchiveGenerator:
     """
     Create a Zip archive of a local charm directory for upload to a controller.
 
     """
     Create a Zip archive of a local charm directory for upload to a controller.