Revert "Remove vendored libjuju"
[osm/N2VC.git] / modules / libjuju / juju / model.py
diff --git a/modules/libjuju/juju/model.py b/modules/libjuju/juju/model.py
new file mode 100644 (file)
index 0000000..9a14add
--- /dev/null
@@ -0,0 +1,2328 @@
+import asyncio
+import base64
+import collections
+import hashlib
+import json
+import logging
+import os
+import re
+import stat
+import tempfile
+import weakref
+import zipfile
+from concurrent.futures import CancelledError
+from functools import partial
+from pathlib import Path
+
+import theblues.charmstore
+import theblues.errors
+import websockets
+import yaml
+
+from . import tag, utils
+from .client import client, connector
+from .client.client import ConfigValue
+from .client.client import Value
+from .constraints import parse as parse_constraints
+from .constraints import normalize_key
+from .delta import get_entity_class, get_entity_delta
+from .errors import JujuAPIError, JujuError
+from .exceptions import DeadEntityException
+from .placement import parse as parse_placement
+from . import provisioner
+
+
+log = logging.getLogger(__name__)
+
+
+class _Observer:
+    """Wrapper around an observer callable.
+
+    This wrapper allows filter criteria to be associated with the
+    callable so that it's only called for changes that meet the criteria.
+
+    """
+    def __init__(self, callable_, entity_type, action, entity_id, predicate):
+        self.callable_ = callable_
+        self.entity_type = entity_type
+        self.action = action
+        self.entity_id = entity_id
+        self.predicate = predicate
+        if self.entity_id:
+            self.entity_id = str(self.entity_id)
+            if not self.entity_id.startswith('^'):
+                self.entity_id = '^' + self.entity_id
+            if not self.entity_id.endswith('$'):
+                self.entity_id += '$'
+
+    async def __call__(self, delta, old, new, model):
+        await self.callable_(delta, old, new, model)
+
+    def cares_about(self, delta):
+        """Return True if this observer "cares about" (i.e. wants to be
+        called) for a this delta.
+
+        """
+        if (self.entity_id and delta.get_id() and
+                not re.match(self.entity_id, str(delta.get_id()))):
+            return False
+
+        if self.entity_type and self.entity_type != delta.entity:
+            return False
+
+        if self.action and self.action != delta.type:
+            return False
+
+        if self.predicate and not self.predicate(delta):
+            return False
+
+        return True
+
+
+class ModelObserver:
+    """
+    Base class for creating observers that react to changes in a model.
+    """
+    async def __call__(self, delta, old, new, model):
+        handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
+        method = getattr(self, handler_name, self.on_change)
+        await method(delta, old, new, model)
+
+    async def on_change(self, delta, old, new, model):
+        """Generic model-change handler.
+
+        This should be overridden in a subclass.
+
+        :param delta: :class:`juju.client.overrides.Delta`
+        :param old: :class:`juju.model.ModelEntity`
+        :param new: :class:`juju.model.ModelEntity`
+        :param model: :class:`juju.model.Model`
+
+        """
+        pass
+
+
+class ModelState:
+    """Holds the state of the model, including the delta history of all
+    entities in the model.
+
+    """
+    def __init__(self, model):
+        self.model = model
+        self.state = dict()
+
+    def _live_entity_map(self, entity_type):
+        """Return an id:Entity map of all the living entities of
+        type ``entity_type``.
+
+        """
+        return {
+            entity_id: self.get_entity(entity_type, entity_id)
+            for entity_id, history in self.state.get(entity_type, {}).items()
+            if history[-1] is not None
+        }
+
+    @property
+    def applications(self):
+        """Return a map of application-name:Application for all applications
+        currently in the model.
+
+        """
+        return self._live_entity_map('application')
+
+    @property
+    def machines(self):
+        """Return a map of machine-id:Machine for all machines currently in
+        the model.
+
+        """
+        return self._live_entity_map('machine')
+
+    @property
+    def units(self):
+        """Return a map of unit-id:Unit for all units currently in
+        the model.
+
+        """
+        return self._live_entity_map('unit')
+
+    @property
+    def relations(self):
+        """Return a map of relation-id:Relation for all relations currently in
+        the model.
+
+        """
+        return self._live_entity_map('relation')
+
+    def entity_history(self, entity_type, entity_id):
+        """Return the history deque for an entity.
+
+        """
+        return self.state[entity_type][entity_id]
+
+    def entity_data(self, entity_type, entity_id, history_index):
+        """Return the data dict for an entity at a specific index of its
+        history.
+
+        """
+        return self.entity_history(entity_type, entity_id)[history_index]
+
+    def apply_delta(self, delta):
+        """Apply delta to our state and return a copy of the
+        affected object as it was before and after the update, e.g.:
+
+            old_obj, new_obj = self.apply_delta(delta)
+
+        old_obj may be None if the delta is for the creation of a new object,
+        e.g. a new application or unit is deployed.
+
+        new_obj will never be None, but may be dead (new_obj.dead == True)
+        if the object was deleted as a result of the delta being applied.
+
+        """
+        history = (
+            self.state
+            .setdefault(delta.entity, {})
+            .setdefault(delta.get_id(), collections.deque())
+        )
+
+        history.append(delta.data)
+        if delta.type == 'remove':
+            history.append(None)
+
+        entity = self.get_entity(delta.entity, delta.get_id())
+        return entity.previous(), entity
+
+    def get_entity(
+            self, entity_type, entity_id, history_index=-1, connected=True):
+        """Return an object instance for the given entity_type and id.
+
+        By default the object state matches the most recent state from
+        Juju. To get an instance of the object in an older state, pass
+        history_index, an index into the history deque for the entity.
+
+        """
+
+        if history_index < 0 and history_index != -1:
+            history_index += len(self.entity_history(entity_type, entity_id))
+            if history_index < 0:
+                return None
+
+        try:
+            self.entity_data(entity_type, entity_id, history_index)
+        except IndexError:
+            return None
+
+        entity_class = get_entity_class(entity_type)
+        return entity_class(
+            entity_id, self.model, history_index=history_index,
+            connected=connected)
+
+
+class ModelEntity:
+    """An object in the Model tree"""
+
+    def __init__(self, entity_id, model, history_index=-1, connected=True):
+        """Initialize a new entity
+
+        :param entity_id str: The unique id of the object in the model
+        :param model: The model instance in whose object tree this
+            entity resides
+        :history_index int: The index of this object's state in the model's
+            history deque for this entity
+        :connected bool: Flag indicating whether this object gets live updates
+            from the model.
+
+        """
+        self.entity_id = entity_id
+        self.model = model
+        self._history_index = history_index
+        self.connected = connected
+        self.connection = model.connection()
+
+    def __repr__(self):
+        return '<{} entity_id="{}">'.format(type(self).__name__,
+                                            self.entity_id)
+
+    def __getattr__(self, name):
+        """Fetch object attributes from the underlying data dict held in the
+        model.
+
+        """
+        try:
+            return self.safe_data[name]
+        except KeyError:
+            name = name.replace('_', '-')
+            if name in self.safe_data:
+                return self.safe_data[name]
+            else:
+                raise
+
+    def __bool__(self):
+        return bool(self.data)
+
+    def on_change(self, callable_):
+        """Add a change observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'change', self.entity_id)
+
+    def on_remove(self, callable_):
+        """Add a remove observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'remove', self.entity_id)
+
+    @property
+    def entity_type(self):
+        """A string identifying the entity type of this object, e.g.
+        'application' or 'unit', etc.
+
+        """
+        return self.__class__.__name__.lower()
+
+    @property
+    def current(self):
+        """Return True if this object represents the current state of the
+        entity in the underlying model.
+
+        This will be True except when the object represents an entity at a
+        non-latest state in history, e.g. if the object was obtained by calling
+        .previous() on another object.
+
+        """
+        return self._history_index == -1
+
+    @property
+    def dead(self):
+        """Returns True if this entity no longer exists in the underlying
+        model.
+
+        """
+        return (
+            self.data is None or
+            self.model.state.entity_data(
+                self.entity_type, self.entity_id, -1) is None
+        )
+
+    @property
+    def alive(self):
+        """Returns True if this entity still exists in the underlying
+        model.
+
+        """
+        return not self.dead
+
+    @property
+    def data(self):
+        """The data dictionary for this entity.
+
+        """
+        return self.model.state.entity_data(
+            self.entity_type, self.entity_id, self._history_index)
+
+    @property
+    def safe_data(self):
+        """The data dictionary for this entity.
+
+        If this `ModelEntity` points to the dead state, it will
+        raise `DeadEntityException`.
+
+        """
+        if self.data is None:
+            raise DeadEntityException(
+                "Entity {}:{} is dead - its attributes can no longer be "
+                "accessed. Use the .previous() method on this object to get "
+                "a copy of the object at its previous state.".format(
+                    self.entity_type, self.entity_id))
+        return self.data
+
+    def previous(self):
+        """Return a copy of this object as was at its previous state in
+        history.
+
+        Returns None if this object is new (and therefore has no history).
+
+        The returned object is always "disconnected", i.e. does not receive
+        live updates.
+
+        """
+        return self.model.state.get_entity(
+            self.entity_type, self.entity_id, self._history_index - 1,
+            connected=False)
+
+    def next(self):
+        """Return a copy of this object at its next state in
+        history.
+
+        Returns None if this object is already the latest.
+
+        The returned object is "disconnected", i.e. does not receive
+        live updates, unless it is current (latest).
+
+        """
+        if self._history_index == -1:
+            return None
+
+        new_index = self._history_index + 1
+        connected = (
+            new_index == len(self.model.state.entity_history(
+                self.entity_type, self.entity_id)) - 1
+        )
+        return self.model.state.get_entity(
+            self.entity_type, self.entity_id, self._history_index - 1,
+            connected=connected)
+
+    def latest(self):
+        """Return a copy of this object at its current state in the model.
+
+        Returns self if this object is already the latest.
+
+        The returned object is always "connected", i.e. receives
+        live updates from the model.
+
+        """
+        if self._history_index == -1:
+            return self
+
+        return self.model.state.get_entity(self.entity_type, self.entity_id)
+
+
+class Model:
+    """
+    The main API for interacting with a Juju model.
+    """
+    def __init__(
+        self,
+        loop=None,
+        max_frame_size=None,
+        bakery_client=None,
+        jujudata=None,
+    ):
+        """Instantiate a new Model.
+
+        The connect method will need to be called before this
+        object can be used for anything interesting.
+
+        If jujudata is None, jujudata.FileJujuData will be used.
+
+        :param loop: an asyncio event loop
+        :param max_frame_size: See
+            `juju.client.connection.Connection.MAX_FRAME_SIZE`
+        :param bakery_client httpbakery.Client: The bakery client to use
+            for macaroon authorization.
+        :param jujudata JujuData: The source for current controller information
+        """
+        self._connector = connector.Connector(
+            loop=loop,
+            max_frame_size=max_frame_size,
+            bakery_client=bakery_client,
+            jujudata=jujudata,
+        )
+        self._observers = weakref.WeakValueDictionary()
+        self.state = ModelState(self)
+        self._info = None
+        self._watch_stopping = asyncio.Event(loop=self._connector.loop)
+        self._watch_stopped = asyncio.Event(loop=self._connector.loop)
+        self._watch_received = asyncio.Event(loop=self._connector.loop)
+        self._watch_stopped.set()
+        self._charmstore = CharmStore(self._connector.loop)
+
+    def is_connected(self):
+        """Reports whether the Model is currently connected."""
+        return self._connector.is_connected()
+
+    @property
+    def loop(self):
+        return self._connector.loop
+
+    def connection(self):
+        """Return the current Connection object. It raises an exception
+        if the Model is disconnected"""
+        return self._connector.connection()
+
+    async def get_controller(self):
+        """Return a Controller instance for the currently connected model.
+        :return Controller:
+        """
+        from juju.controller import Controller
+        controller = Controller(jujudata=self._connector.jujudata)
+        kwargs = self.connection().connect_params()
+        kwargs.pop('uuid')
+        await controller._connect_direct(**kwargs)
+        return controller
+
+    async def __aenter__(self):
+        await self.connect()
+        return self
+
+    async def __aexit__(self, exc_type, exc, tb):
+        await self.disconnect()
+
+    async def connect(self, *args, **kwargs):
+        """Connect to a juju model.
+
+        This supports two calling conventions:
+
+        The model and (optionally) authentication information can be taken
+        from the data files created by the Juju CLI.  This convention will
+        be used if a ``model_name`` is specified, or if the ``endpoint``
+        and ``uuid`` are not.
+
+        Otherwise, all of the ``endpoint``, ``uuid``, and authentication
+        information (``username`` and ``password``, or ``bakery_client`` and/or
+        ``macaroons``) are required.
+
+        If a single positional argument is given, it will be assumed to be
+        the ``model_name``.  Otherwise, the first positional argument, if any,
+        must be the ``endpoint``.
+
+        Available parameters are:
+
+        :param model_name:  Format [controller:][user/]model
+        :param str endpoint: The hostname:port of the controller to connect to.
+        :param str uuid: The model UUID to connect to.
+        :param str username: The username for controller-local users (or None
+            to use macaroon-based login.)
+        :param str password: The password for controller-local users.
+        :param str cacert: The CA certificate of the controller
+            (PEM formatted).
+        :param httpbakery.Client bakery_client: The macaroon bakery client to
+            to use when performing macaroon-based login. Macaroon tokens
+            acquired when logging will be saved to bakery_client.cookies.
+            If this is None, a default bakery_client will be used.
+        :param list macaroons: List of macaroons to load into the
+            ``bakery_client``.
+        :param asyncio.BaseEventLoop loop: The event loop to use for async
+            operations.
+        :param int max_frame_size: The maximum websocket frame size to allow.
+        """
+        await self.disconnect()
+        if 'endpoint' not in kwargs and len(args) < 2:
+            if args and 'model_name' in kwargs:
+                raise TypeError('connect() got multiple values for model_name')
+            elif args:
+                model_name = args[0]
+            else:
+                model_name = kwargs.pop('model_name', None)
+            await self._connector.connect_model(model_name, **kwargs)
+        else:
+            if 'model_name' in kwargs:
+                raise TypeError('connect() got values for both '
+                                'model_name and endpoint')
+            if args and 'endpoint' in kwargs:
+                raise TypeError('connect() got multiple values for endpoint')
+            if len(args) < 2 and 'uuid' not in kwargs:
+                raise TypeError('connect() missing value for uuid')
+            has_userpass = (len(args) >= 4 or
+                            {'username', 'password'}.issubset(kwargs))
+            has_macaroons = (len(args) >= 6 or not
+                             {'bakery_client', 'macaroons'}.isdisjoint(kwargs))
+            if not (has_userpass or has_macaroons):
+                raise TypeError('connect() missing auth params')
+            arg_names = [
+                'endpoint',
+                'uuid',
+                'username',
+                'password',
+                'cacert',
+                'bakery_client',
+                'macaroons',
+                'loop',
+                'max_frame_size',
+            ]
+            for i, arg in enumerate(args):
+                kwargs[arg_names[i]] = arg
+            if not {'endpoint', 'uuid'}.issubset(kwargs):
+                raise ValueError('endpoint and uuid are required '
+                                 'if model_name not given')
+            if not ({'username', 'password'}.issubset(kwargs) or
+                    {'bakery_client', 'macaroons'}.intersection(kwargs)):
+                raise ValueError('Authentication parameters are required '
+                                 'if model_name not given')
+            await self._connector.connect(**kwargs)
+        await self._after_connect()
+
+    async def connect_model(self, model_name):
+        """
+        .. deprecated:: 0.6.2
+           Use ``connect(model_name=model_name)`` instead.
+        """
+        return await self.connect(model_name=model_name)
+
+    async def connect_current(self):
+        """
+        .. deprecated:: 0.6.2
+           Use ``connect()`` instead.
+        """
+        return await self.connect()
+
+    async def _connect_direct(self, **kwargs):
+        await self.disconnect()
+        await self._connector.connect(**kwargs)
+        await self._after_connect()
+
+    async def _after_connect(self):
+        self._watch()
+
+        # Wait for the first packet of data from the AllWatcher,
+        # which contains all information on the model.
+        # TODO this means that we can't do anything until
+        # we've received all the model data, which might be
+        # a whole load of unneeded data if all the client wants
+        # to do is make one RPC call.
+        await self._watch_received.wait()
+
+        await self.get_info()
+
+    async def disconnect(self):
+        """Shut down the watcher task and close websockets.
+
+        """
+        if not self._watch_stopped.is_set():
+            log.debug('Stopping watcher task')
+            self._watch_stopping.set()
+            await self._watch_stopped.wait()
+            self._watch_stopping.clear()
+
+        if self.is_connected():
+            log.debug('Closing model connection')
+            await self._connector.disconnect()
+            self._info = None
+
+    async def add_local_charm_dir(self, charm_dir, series):
+        """Upload a local charm to the model.
+
+        This will automatically generate an archive from
+        the charm dir.
+
+        :param charm_dir: Path to the charm directory
+        :param series: Charm series
+
+        """
+        fh = tempfile.NamedTemporaryFile()
+        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
+        with fh:
+            func = partial(
+                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
+            charm_url = await self._connector.loop.run_in_executor(None, func)
+
+        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
+        return charm_url
+
+    def add_local_charm(self, charm_file, series, size=None):
+        """Upload a local charm archive to the model.
+
+        Returns the 'local:...' url that should be used to deploy the charm.
+
+        :param charm_file: Path to charm zip archive
+        :param series: Charm series
+        :param size: Size of the archive, in bytes
+        :return str: 'local:...' url for deploying the charm
+        :raises: :class:`JujuError` if the upload fails
+
+        Uses an https endpoint at the same host:port as the wss.
+        Supports large file uploads.
+
+        .. warning::
+
+           This method will block. Consider using :meth:`add_local_charm_dir`
+           instead.
+
+        """
+        conn, headers, path_prefix = self.connection().https_connection()
+        path = "%s/charms?series=%s" % (path_prefix, series)
+        headers['Content-Type'] = 'application/zip'
+        if size:
+            headers['Content-Length'] = size
+        conn.request("POST", path, charm_file, headers)
+        response = conn.getresponse()
+        result = response.read().decode()
+        if not response.status == 200:
+            raise JujuError(result)
+        result = json.loads(result)
+        return result['charm-url']
+
+    def all_units_idle(self):
+        """Return True if all units are idle.
+
+        """
+        for unit in self.units.values():
+            unit_status = unit.data['agent-status']['current']
+            if unit_status != 'idle':
+                return False
+        return True
+
+    async def reset(self, force=False):
+        """Reset the model to a clean state.
+
+        :param bool force: Force-terminate machines.
+
+        This returns only after the model has reached a clean state. "Clean"
+        means no applications or machines exist in the model.
+
+        """
+        log.debug('Resetting model')
+        for app in self.applications.values():
+            await app.destroy()
+        for machine in self.machines.values():
+            await machine.destroy(force=force)
+        await self.block_until(
+            lambda: len(self.machines) == 0
+        )
+
+    async def block_until(self, *conditions, timeout=None, wait_period=0.5):
+        """Return only after all conditions are true.
+
+        Raises `websockets.ConnectionClosed` if disconnected.
+        """
+        def _disconnected():
+            return not (self.is_connected() and self.connection().is_open)
+
+        def done():
+            return _disconnected() or all(c() for c in conditions)
+
+        await utils.block_until(done,
+                                timeout=timeout,
+                                wait_period=wait_period,
+                                loop=self.loop)
+        if _disconnected():
+            raise websockets.ConnectionClosed(1006, 'no reason')
+
+    @property
+    def applications(self):
+        """Return a map of application-name:Application for all applications
+        currently in the model.
+
+        """
+        return self.state.applications
+
+    @property
+    def machines(self):
+        """Return a map of machine-id:Machine for all machines currently in
+        the model.
+
+        """
+        return self.state.machines
+
+    @property
+    def units(self):
+        """Return a map of unit-id:Unit for all units currently in
+        the model.
+
+        """
+        return self.state.units
+
+    @property
+    def relations(self):
+        """Return a list of all Relations currently in the model.
+
+        """
+        return list(self.state.relations.values())
+
+    async def get_info(self):
+        """Return a client.ModelInfo object for this Model.
+
+        Retrieves latest info for this Model from the api server. The
+        return value is cached on the Model.info attribute so that the
+        valued may be accessed again without another api call, if
+        desired.
+
+        This method is called automatically when the Model is connected,
+        resulting in Model.info being initialized without requiring an
+        explicit call to this method.
+
+        """
+        facade = client.ClientFacade.from_connection(self.connection())
+
+        self._info = await facade.ModelInfo()
+        log.debug('Got ModelInfo: %s', vars(self.info))
+
+        return self.info
+
+    @property
+    def info(self):
+        """Return the cached client.ModelInfo object for this Model.
+
+        If Model.get_info() has not been called, this will return None.
+        """
+        return self._info
+
+    def add_observer(
+            self, callable_, entity_type=None, action=None, entity_id=None,
+            predicate=None):
+        """Register an "on-model-change" callback
+
+        Once the model is connected, ``callable_``
+        will be called each time the model changes. ``callable_`` should
+        be Awaitable and accept the following positional arguments:
+
+            delta - An instance of :class:`juju.delta.EntityDelta`
+                containing the raw delta data recv'd from the Juju
+                websocket.
+
+            old_obj - If the delta modifies an existing object in the model,
+                old_obj will be a copy of that object, as it was before the
+                delta was applied. Will be None if the delta creates a new
+                entity in the model.
+
+            new_obj - A copy of the new or updated object, after the delta
+                is applied. Will be None if the delta removes an entity
+                from the model.
+
+            model - The :class:`Model` itself.
+
+        Events for which ``callable_`` is called can be specified by passing
+        entity_type, action, and/or entitiy_id filter criteria, e.g.::
+
+            add_observer(
+                myfunc,
+                entity_type='application', action='add', entity_id='ubuntu')
+
+        For more complex filtering conditions, pass a predicate function. It
+        will be called with a delta as its only argument. If the predicate
+        function returns True, the ``callable_`` will be called.
+
+        """
+        observer = _Observer(
+            callable_, entity_type, action, entity_id, predicate)
+        self._observers[observer] = callable_
+
+    def _watch(self):
+        """Start an asynchronous watch against this model.
+
+        See :meth:`add_observer` to register an onchange callback.
+
+        """
+        async def _all_watcher():
+            try:
+                allwatcher = client.AllWatcherFacade.from_connection(
+                    self.connection())
+                while not self._watch_stopping.is_set():
+                    try:
+                        results = await utils.run_with_interrupt(
+                            allwatcher.Next(),
+                            self._watch_stopping,
+                            loop=self._connector.loop)
+                    except JujuAPIError as e:
+                        if 'watcher was stopped' not in str(e):
+                            raise
+                        if self._watch_stopping.is_set():
+                            # this shouldn't ever actually happen, because
+                            # the event should trigger before the controller
+                            # has a chance to tell us the watcher is stopped
+                            # but handle it gracefully, just in case
+                            break
+                        # controller stopped our watcher for some reason
+                        # but we're not actually stopping, so just restart it
+                        log.warning(
+                            'Watcher: watcher stopped, restarting')
+                        del allwatcher.Id
+                        continue
+                    except websockets.ConnectionClosed:
+                        monitor = self.connection().monitor
+                        if monitor.status == monitor.ERROR:
+                            # closed unexpectedly, try to reopen
+                            log.warning(
+                                'Watcher: connection closed, reopening')
+                            await self.connection().reconnect()
+                            if monitor.status != monitor.CONNECTED:
+                                # reconnect failed; abort and shutdown
+                                log.error('Watcher: automatic reconnect '
+                                          'failed; stopping watcher')
+                                break
+                            del allwatcher.Id
+                            continue
+                        else:
+                            # closed on request, go ahead and shutdown
+                            break
+                    if self._watch_stopping.is_set():
+                        try:
+                            await allwatcher.Stop()
+                        except websockets.ConnectionClosed:
+                            pass  # can't stop on a closed conn
+                        break
+                    for delta in results.deltas:
+                        try:
+                            delta = get_entity_delta(delta)
+                            old_obj, new_obj = self.state.apply_delta(delta)
+                            await self._notify_observers(delta, old_obj, new_obj)
+                        except KeyError as e:
+                            log.debug("unknown delta type: %s", e.args[0])
+                    self._watch_received.set()
+            except CancelledError:
+                pass
+            except Exception:
+                log.exception('Error in watcher')
+                raise
+            finally:
+                self._watch_stopped.set()
+
+        log.debug('Starting watcher task')
+        self._watch_received.clear()
+        self._watch_stopping.clear()
+        self._watch_stopped.clear()
+        self._connector.loop.create_task(_all_watcher())
+
+    async def _notify_observers(self, delta, old_obj, new_obj):
+        """Call observing callbacks, notifying them of a change in model state
+
+        :param delta: The raw change from the watcher
+            (:class:`juju.client.overrides.Delta`)
+        :param old_obj: The object in the model that this delta updates.
+            May be None.
+        :param new_obj: The object in the model that is created or updated
+            by applying this delta.
+
+        """
+        if new_obj and not old_obj:
+            delta.type = 'add'
+
+        log.debug(
+            'Model changed: %s %s %s',
+            delta.entity, delta.type, delta.get_id())
+
+        for o in self._observers:
+            if o.cares_about(delta):
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+                                      loop=self._connector.loop)
+
+    async def _wait(self, entity_type, entity_id, action, predicate=None):
+        """
+        Block the calling routine until a given action has happened to the
+        given entity
+
+        :param entity_type: The entity's type.
+        :param entity_id: The entity's id.
+        :param action: the type of action (e.g., 'add', 'change', or 'remove')
+        :param predicate: optional callable that must take as an
+            argument a delta, and must return a boolean, indicating
+            whether the delta contains the specific action we're looking
+            for. For example, you might check to see whether a 'change'
+            has a 'completed' status. See the _Observer class for details.
+
+        """
+        q = asyncio.Queue(loop=self._connector.loop)
+
+        async def callback(delta, old, new, model):
+            await q.put(delta.get_id())
+
+        self.add_observer(callback, entity_type, action, entity_id, predicate)
+        entity_id = await q.get()
+        # object might not be in the entity_map if we were waiting for a
+        # 'remove' action
+        return self.state._live_entity_map(entity_type).get(entity_id)
+
+    async def _wait_for_new(self, entity_type, entity_id):
+        """Wait for a new object to appear in the Model and return it.
+
+        Waits for an object of type ``entity_type`` with id ``entity_id``
+        to appear in the model.  This is similar to watching for the
+        object using ``block_until``, but uses the watcher rather than
+        polling.
+
+        """
+        # if the entity is already in the model, just return it
+        if entity_id in self.state._live_entity_map(entity_type):
+            return self.state._live_entity_map(entity_type)[entity_id]
+        return await self._wait(entity_type, entity_id, None)
+
+    async def wait_for_action(self, action_id):
+        """Given an action, wait for it to complete."""
+
+        if action_id.startswith("action-"):
+            # if we've been passed action.tag, transform it into the
+            # id that the api deltas will use.
+            action_id = action_id[7:]
+
+        def predicate(delta):
+            return delta.data['status'] in ('completed', 'failed')
+
+        return await self._wait('action', action_id, None, predicate)
+
+    async def add_machine(
+            self, spec=None, constraints=None, disks=None, series=None):
+        """Start a new, empty machine and optionally a container, or add a
+        container to a machine.
+
+        :param str spec: Machine specification
+            Examples::
+
+                (None) - starts a new machine
+                'lxd' - starts a new machine with one lxd container
+                'lxd:4' - starts a new lxd container on machine 4
+                'ssh:user@10.10.0.3:/path/to/private/key' - manually provision
+                a machine with ssh and the private key used for authentication
+                'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
+                'maas2.name' - acquire machine maas2.name on MAAS
+
+        :param dict constraints: Machine constraints, which can contain the
+            the following keys::
+
+                arch : str
+                container : str
+                cores : int
+                cpu_power : int
+                instance_type : str
+                mem : int
+                root_disk : int
+                spaces : list(str)
+                tags : list(str)
+                virt_type : str
+
+            Example::
+
+                constraints={
+                    'mem': 256 * MB,
+                    'tags': ['virtual'],
+                }
+
+        :param list disks: List of disk constraint dictionaries, which can
+            contain the following keys::
+
+                count : int
+                pool : str
+                size : int
+
+            Example::
+
+                disks=[{
+                    'pool': 'rootfs',
+                    'size': 10 * GB,
+                    'count': 1,
+                }]
+
+        :param str series: Series, e.g. 'xenial'
+
+        Supported container types are: lxd, kvm
+
+        When deploying a container to an existing machine, constraints cannot
+        be used.
+
+        """
+        params = client.AddMachineParams()
+
+        if spec:
+            if spec.startswith("ssh:"):
+                placement, target, private_key_path = spec.split(":")
+                user, host = target.split("@")
+
+                sshProvisioner = provisioner.SSHProvisioner(
+                    host=host,
+                    user=user,
+                    private_key_path=private_key_path,
+                )
+
+                params = sshProvisioner.provision_machine()
+            else:
+                placement = parse_placement(spec)
+                if placement:
+                    params.placement = placement[0]
+
+        params.jobs = ['JobHostUnits']
+
+        if constraints:
+            params.constraints = client.Value.from_json(constraints)
+
+        if disks:
+            params.disks = [
+                client.Constraints.from_json(o) for o in disks]
+
+        if series:
+            params.series = series
+
+        # Submit the request.
+        client_facade = client.ClientFacade.from_connection(self.connection())
+        results = await client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s" % error.message)
+        machine_id = results.machines[0].machine
+
+        if spec:
+            if spec.startswith("ssh:"):
+                # Need to run this after AddMachines has been called,
+                # as we need the machine_id
+                await sshProvisioner.install_agent(
+                    self.connection(),
+                    params.nonce,
+                    machine_id,
+                )
+
+        log.debug('Added new machine %s', machine_id)
+        return await self._wait_for_new('machine', machine_id)
+
+    async def add_relation(self, relation1, relation2):
+        """Add a relation between two applications.
+
+        :param str relation1: '<application>[:<relation_name>]'
+        :param str relation2: '<application>[:<relation_name>]'
+
+        """
+        connection = self.connection()
+        app_facade = client.ApplicationFacade.from_connection(connection)
+
+        log.debug(
+            'Adding relation %s <-> %s', relation1, relation2)
+
+        def _find_relation(*specs):
+            for rel in self.relations:
+                if rel.matches(*specs):
+                    return rel
+            return None
+
+        try:
+            result = await app_facade.AddRelation([relation1, relation2])
+        except JujuAPIError as e:
+            if 'relation already exists' not in e.message:
+                raise
+            rel = _find_relation(relation1, relation2)
+            if rel:
+                return rel
+            raise JujuError('Relation {} {} exists but not in model'.format(
+                relation1, relation2))
+
+        specs = ['{}:{}'.format(app, data['name'])
+                 for app, data in result.endpoints.items()]
+
+        await self.block_until(lambda: _find_relation(*specs) is not None)
+        return _find_relation(*specs)
+
+    def add_space(self, name, *cidrs):
+        """Add a new network space.
+
+        Adds a new space with the given name and associates the given
+        (optional) list of existing subnet CIDRs with it.
+
+        :param str name: Name of the space
+        :param *cidrs: Optional list of existing subnet CIDRs
+
+        """
+        raise NotImplementedError()
+
+    async def add_ssh_key(self, user, key):
+        """Add a public SSH key to this model.
+
+        :param str user: The username of the user
+        :param str key: The public ssh key
+
+        """
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
+        return await key_facade.AddKeys([key], user)
+    add_ssh_keys = add_ssh_key
+
+    def add_subnet(self, cidr_or_id, space, *zones):
+        """Add an existing subnet to this model.
+
+        :param str cidr_or_id: CIDR or provider ID of the existing subnet
+        :param str space: Network space with which to associate
+        :param str *zones: Zone(s) in which the subnet resides
+
+        """
+        raise NotImplementedError()
+
+    def get_backups(self):
+        """Retrieve metadata for backups in this model.
+
+        """
+        raise NotImplementedError()
+
+    def block(self, *commands):
+        """Add a new block to this model.
+
+        :param str *commands: The commands to block. Valid values are
+            'all-changes', 'destroy-model', 'remove-object'
+
+        """
+        raise NotImplementedError()
+
+    def get_blocks(self):
+        """List blocks for this model.
+
+        """
+        raise NotImplementedError()
+
+    def get_cached_images(self, arch=None, kind=None, series=None):
+        """Return a list of cached OS images.
+
+        :param str arch: Filter by image architecture
+        :param str kind: Filter by image kind, e.g. 'lxd'
+        :param str series: Filter by image series, e.g. 'xenial'
+
+        """
+        raise NotImplementedError()
+
+    def create_backup(self, note=None, no_download=False):
+        """Create a backup of this model.
+
+        :param str note: A note to store with the backup
+        :param bool no_download: Do not download the backup archive
+        :return str: Path to downloaded archive
+
+        """
+        raise NotImplementedError()
+
+    def create_storage_pool(self, name, provider_type, **pool_config):
+        """Create or define a storage pool.
+
+        :param str name: Name to give the storage pool
+        :param str provider_type: Pool provider type
+        :param **pool_config: key/value pool configuration pairs
+
+        """
+        raise NotImplementedError()
+
+    def debug_log(
+            self, no_tail=False, exclude_module=None, include_module=None,
+            include=None, level=None, limit=0, lines=10, replay=False,
+            exclude=None):
+        """Get log messages for this model.
+
+        :param bool no_tail: Stop after returning existing log messages
+        :param list exclude_module: Do not show log messages for these logging
+            modules
+        :param list include_module: Only show log messages for these logging
+            modules
+        :param list include: Only show log messages for these entities
+        :param str level: Log level to show, valid options are 'TRACE',
+            'DEBUG', 'INFO', 'WARNING', 'ERROR,
+        :param int limit: Return this many of the most recent (possibly
+            filtered) lines are shown
+        :param int lines: Yield this many of the most recent lines, and keep
+            yielding
+        :param bool replay: Yield the entire log, and keep yielding
+        :param list exclude: Do not show log messages for these entities
+
+        """
+        raise NotImplementedError()
+
+    def _get_series(self, entity_url, entity):
+        # try to get the series from the provided charm URL
+        if entity_url.startswith('cs:'):
+            parts = entity_url[3:].split('/')
+        else:
+            parts = entity_url.split('/')
+        if parts[0].startswith('~'):
+            parts.pop(0)
+        if len(parts) > 1:
+            # series was specified in the URL
+            return parts[0]
+        # series was not supplied at all, so use the newest
+        # supported series according to the charm store
+        ss = entity['Meta']['supported-series']
+        return ss['SupportedSeries'][0]
+
+    async def deploy(
+            self, entity_url, application_name=None, bind=None, budget=None,
+            channel=None, config=None, constraints=None, force=False,
+            num_units=1, plan=None, resources=None, series=None, storage=None,
+            to=None, devices=None):
+        """Deploy a new service or bundle.
+
+        :param str entity_url: Charm or bundle url
+        :param str application_name: Name to give the service
+        :param dict bind: <charm endpoint>:<network space> pairs
+        :param dict budget: <budget name>:<limit> pairs
+        :param str channel: Charm store channel from which to retrieve
+            the charm or bundle, e.g. 'edge'
+        :param dict config: Charm configuration dictionary
+        :param constraints: Service constraints
+        :type constraints: :class:`juju.Constraints`
+        :param bool force: Allow charm to be deployed to a machine running
+            an unsupported series
+        :param int num_units: Number of units to deploy
+        :param str plan: Plan under which to deploy charm
+        :param dict resources: <resource name>:<file path> pairs
+        :param str series: Series on which to deploy
+        :param dict storage: Storage constraints TODO how do these look?
+        :param to: Placement directive as a string. For example:
+
+            '23' - place on machine 23
+            'lxd:7' - place in new lxd container on machine 7
+            '24/lxd/3' - place in container 3 on machine 24
+
+            If None, a new machine is provisioned.
+
+
+        TODO::
+
+            - support local resources
+
+        """
+        if storage:
+            storage = {
+                k: client.Constraints(**v)
+                for k, v in storage.items()
+            }
+
+        entity_path = Path(entity_url.replace('local:', ''))
+        bundle_path = entity_path / 'bundle.yaml'
+        metadata_path = entity_path / 'metadata.yaml'
+
+        is_local = (
+            entity_url.startswith('local:') or
+            entity_path.is_dir() or
+            entity_path.is_file()
+        )
+        if is_local:
+            entity_id = entity_url.replace('local:', '')
+        else:
+            entity = await self.charmstore.entity(entity_url, channel=channel,
+                                                  include_stats=False)
+            entity_id = entity['Id']
+
+        client_facade = client.ClientFacade.from_connection(self.connection())
+
+        is_bundle = ((is_local and
+                      (entity_id.endswith('.yaml') and entity_path.exists()) or
+                      bundle_path.exists()) or
+                     (not is_local and 'bundle/' in entity_id))
+
+        if is_bundle:
+            handler = BundleHandler(self)
+            await handler.fetch_plan(entity_id)
+            await handler.execute_plan()
+            extant_apps = {app for app in self.applications}
+            pending_apps = set(handler.applications) - extant_apps
+            if pending_apps:
+                # new apps will usually be in the model by now, but if some
+                # haven't made it yet we'll need to wait on them to be added
+                await asyncio.gather(*[
+                    asyncio.ensure_future(
+                        self._wait_for_new('application', app_name),
+                        loop=self._connector.loop)
+                    for app_name in pending_apps
+                ], loop=self._connector.loop)
+            return [app for name, app in self.applications.items()
+                    if name in handler.applications]
+        else:
+            if not is_local:
+                if not application_name:
+                    application_name = entity['Meta']['charm-metadata']['Name']
+                if not series:
+                    series = self._get_series(entity_url, entity)
+                await client_facade.AddCharm(channel, entity_id)
+                # XXX: we're dropping local resources here, but we don't
+                # actually support them yet anyway
+                resources = await self._add_store_resources(application_name,
+                                                            entity_id,
+                                                            entity=entity)
+            else:
+                if not application_name:
+                    metadata = yaml.load(metadata_path.read_text())
+                    application_name = metadata['name']
+                # We have a local charm dir that needs to be uploaded
+                charm_dir = os.path.abspath(
+                    os.path.expanduser(entity_id))
+                series = series or get_charm_series(charm_dir)
+                if not series:
+                    raise JujuError(
+                        "Couldn't determine series for charm at {}. "
+                        "Pass a 'series' kwarg to Model.deploy().".format(
+                            charm_dir))
+                entity_id = await self.add_local_charm_dir(charm_dir, series)
+            return await self._deploy(
+                charm_url=entity_id,
+                application=application_name,
+                series=series,
+                config=config or {},
+                constraints=constraints,
+                endpoint_bindings=bind,
+                resources=resources,
+                storage=storage,
+                channel=channel,
+                num_units=num_units,
+                placement=parse_placement(to),
+                devices=devices,
+            )
+
+    async def _add_store_resources(self, application, entity_url,
+                                   overrides=None, entity=None):
+        if not entity:
+            # avoid extra charm store call if one was already made
+            entity = await self.charmstore.entity(entity_url,
+                                                  include_stats=False)
+        resources = [
+            {
+                'description': resource['Description'],
+                'fingerprint': resource['Fingerprint'],
+                'name': resource['Name'],
+                'path': resource['Path'],
+                'revision': resource['Revision'],
+                'size': resource['Size'],
+                'type_': resource['Type'],
+                'origin': 'store',
+            } for resource in entity['Meta']['resources']
+        ]
+
+        if overrides:
+            names = {r['name'] for r in resources}
+            unknown = overrides.keys() - names
+            if unknown:
+                raise JujuError('Unrecognized resource{}: {}'.format(
+                    's' if len(unknown) > 1 else '',
+                    ', '.join(unknown)))
+            for resource in resources:
+                if resource['name'] in overrides:
+                    resource['revision'] = overrides[resource['name']]
+
+        if not resources:
+            return None
+
+        resources_facade = client.ResourcesFacade.from_connection(
+            self.connection())
+        response = await resources_facade.AddPendingResources(
+            tag.application(application),
+            entity_url,
+            [client.CharmResource(**resource) for resource in resources])
+        resource_map = {resource['name']: pid
+                        for resource, pid
+                        in zip(resources, response.pending_ids)}
+        return resource_map
+
+    async def _deploy(self, charm_url, application, series, config,
+                      constraints, endpoint_bindings, resources, storage,
+                      channel=None, num_units=None, placement=None,
+                      devices=None):
+        """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
+        """
+        log.info('Deploying %s', charm_url)
+
+        # stringify all config values for API, and convert to YAML
+        config = {k: str(v) for k, v in config.items()}
+        config = yaml.dump({application: config},
+                           default_flow_style=False)
+
+        app_facade = client.ApplicationFacade.from_connection(
+            self.connection())
+
+        app = client.ApplicationDeploy(
+            charm_url=charm_url,
+            application=application,
+            series=series,
+            channel=channel,
+            config_yaml=config,
+            constraints=parse_constraints(constraints),
+            endpoint_bindings=endpoint_bindings,
+            num_units=num_units,
+            resources=resources,
+            storage=storage,
+            placement=placement,
+            devices=devices,
+        )
+        result = await app_facade.Deploy([app])
+        errors = [r.error.message for r in result.results if r.error]
+        if errors:
+            raise JujuError('\n'.join(errors))
+        return await self._wait_for_new('application', application)
+
+    async def destroy(self):
+        """Terminate all machines and resources for this model.
+            Is already implemented in controller.py.
+        """
+        raise NotImplementedError()
+
+    async def destroy_unit(self, *unit_names):
+        """Destroy units by name.
+
+        """
+        connection = self.connection()
+        app_facade = client.ApplicationFacade.from_connection(connection)
+
+        log.debug(
+            'Destroying unit%s %s',
+            's' if len(unit_names) == 1 else '',
+            ' '.join(unit_names))
+
+        return await app_facade.DestroyUnits(list(unit_names))
+    destroy_units = destroy_unit
+
+    def get_backup(self, archive_id):
+        """Download a backup archive file.
+
+        :param str archive_id: The id of the archive to download
+        :return str: Path to the archive file
+
+        """
+        raise NotImplementedError()
+
+    def enable_ha(
+            self, num_controllers=0, constraints=None, series=None, to=None):
+        """Ensure sufficient controllers exist to provide redundancy.
+
+        :param int num_controllers: Number of controllers to make available
+        :param constraints: Constraints to apply to the controller machines
+        :type constraints: :class:`juju.Constraints`
+        :param str series: Series of the controller machines
+        :param list to: Placement directives for controller machines, e.g.::
+
+            '23' - machine 23
+            'lxc:7' - new lxc container on machine 7
+            '24/lxc/3' - lxc container 3 or machine 24
+
+            If None, a new machine is provisioned.
+
+        """
+        raise NotImplementedError()
+
+    async def get_config(self):
+        """Return the configuration settings for this model.
+
+        :returns: A ``dict`` mapping keys to `ConfigValue` instances,
+            which have `source` and `value` attributes.
+        """
+        config_facade = client.ModelConfigFacade.from_connection(
+            self.connection()
+        )
+        result = await config_facade.ModelGet()
+        config = result.config
+        for key, value in config.items():
+            config[key] = ConfigValue.from_json(value)
+        return config
+
+    async def get_constraints(self):
+        """Return the machine constraints for this model.
+
+        :returns: A ``dict`` of constraints.
+        """
+        constraints = {}
+        client_facade = client.ClientFacade.from_connection(self.connection())
+        result = await client_facade.GetModelConstraints()
+
+        # GetModelConstraints returns GetConstraintsResults which has a
+        # 'constraints' attribute. If no constraints have been set
+        # GetConstraintsResults.constraints is None. Otherwise
+        # GetConstraintsResults.constraints has an attribute for each possible
+        # constraint, each of these in turn will be None if they have not been
+        # set.
+        if result.constraints:
+            constraint_types = [a for a in dir(result.constraints)
+                                if a in Value._toSchema.keys()]
+            for constraint in constraint_types:
+                value = getattr(result.constraints, constraint)
+                if value is not None:
+                    constraints[constraint] = getattr(result.constraints,
+                                                      constraint)
+        return constraints
+
+    def import_ssh_key(self, identity):
+        """Add a public SSH key from a trusted indentity source to this model.
+
+        :param str identity: User identity in the form <lp|gh>:<username>
+
+        """
+        raise NotImplementedError()
+    import_ssh_keys = import_ssh_key
+
+    async def get_machines(self):
+        """Return list of machines in this model.
+
+        """
+        return list(self.state.machines.keys())
+
+    def get_shares(self):
+        """Return list of all users with access to this model.
+
+        """
+        raise NotImplementedError()
+
+    def get_spaces(self):
+        """Return list of all known spaces, including associated subnets.
+
+        """
+        raise NotImplementedError()
+
+    async def get_ssh_key(self, raw_ssh=False):
+        """Return known SSH keys for this model.
+        :param bool raw_ssh: if True, returns the raw ssh key,
+            else it's fingerprint
+
+        """
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
+        entity = {'tag': tag.model(self.info.uuid)}
+        entities = client.Entities([entity])
+        return await key_facade.ListKeys(entities, raw_ssh)
+    get_ssh_keys = get_ssh_key
+
+    def get_storage(self, filesystem=False, volume=False):
+        """Return details of storage instances.
+
+        :param bool filesystem: Include filesystem storage
+        :param bool volume: Include volume storage
+
+        """
+        raise NotImplementedError()
+
+    def get_storage_pools(self, names=None, providers=None):
+        """Return list of storage pools.
+
+        :param list names: Only include pools with these names
+        :param list providers: Only include pools for these providers
+
+        """
+        raise NotImplementedError()
+
+    def get_subnets(self, space=None, zone=None):
+        """Return list of known subnets.
+
+        :param str space: Only include subnets in this space
+        :param str zone: Only include subnets in this zone
+
+        """
+        raise NotImplementedError()
+
+    def remove_blocks(self):
+        """Remove all blocks from this model.
+
+        """
+        raise NotImplementedError()
+
+    def remove_backup(self, backup_id):
+        """Delete a backup.
+
+        :param str backup_id: The id of the backup to remove
+
+        """
+        raise NotImplementedError()
+
+    def remove_cached_images(self, arch=None, kind=None, series=None):
+        """Remove cached OS images.
+
+        :param str arch: Architecture of the images to remove
+        :param str kind: Image kind to remove, e.g. 'lxd'
+        :param str series: Image series to remove, e.g. 'xenial'
+
+        """
+        raise NotImplementedError()
+
+    def remove_machine(self, *machine_ids):
+        """Remove a machine from this model.
+
+        :param str *machine_ids: Ids of the machines to remove
+
+        """
+        raise NotImplementedError()
+    remove_machines = remove_machine
+
+    async def remove_ssh_key(self, user, key):
+        """Remove a public SSH key(s) from this model.
+
+        :param str key: Full ssh key
+        :param str user: Juju user to which the key is registered
+
+        """
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
+        key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
+        key = hashlib.md5(key).hexdigest()
+        key = ':'.join(a + b for a, b in zip(key[::2], key[1::2]))
+        await key_facade.DeleteKeys([key], user)
+    remove_ssh_keys = remove_ssh_key
+
+    def restore_backup(
+            self, bootstrap=False, constraints=None, archive=None,
+            backup_id=None, upload_tools=False):
+        """Restore a backup archive to a new controller.
+
+        :param bool bootstrap: Bootstrap a new state machine
+        :param constraints: Model constraints
+        :type constraints: :class:`juju.Constraints`
+        :param str archive: Path to backup archive to restore
+        :param str backup_id: Id of backup to restore
+        :param bool upload_tools: Upload tools if bootstrapping a new machine
+
+        """
+        raise NotImplementedError()
+
+    def retry_provisioning(self):
+        """Retry provisioning for failed machines.
+
+        """
+        raise NotImplementedError()
+
+    def run(self, command, timeout=None):
+        """Run command on all machines in this model.
+
+        :param str command: The command to run
+        :param int timeout: Time to wait before command is considered failed
+
+        """
+        raise NotImplementedError()
+
+    async def set_config(self, config):
+        """Set configuration keys on this model.
+
+        :param dict config: Mapping of config keys to either string values or
+            `ConfigValue` instances, as returned by `get_config`.
+        """
+        config_facade = client.ModelConfigFacade.from_connection(
+            self.connection()
+        )
+        for key, value in config.items():
+            if isinstance(value, ConfigValue):
+                config[key] = value.value
+        await config_facade.ModelSet(config)
+
+    async def set_constraints(self, constraints):
+        """Set machine constraints on this model.
+
+        :param dict config: Mapping of model constraints
+        """
+        client_facade = client.ClientFacade.from_connection(self.connection())
+        await client_facade.SetModelConstraints(
+            application='',
+            constraints=constraints)
+
+    async def get_action_output(self, action_uuid, wait=None):
+        """Get the results of an action by ID.
+
+        :param str action_uuid: Id of the action
+        :param int wait: Time in seconds to wait for action to complete.
+        :return dict: Output from action
+        :raises: :class:`JujuError` if invalid action_uuid
+        """
+        action_facade = client.ActionFacade.from_connection(
+            self.connection()
+        )
+        entity = [{'tag': tag.action(action_uuid)}]
+        # Cannot use self.wait_for_action as the action event has probably
+        # already happened and self.wait_for_action works by processing
+        # model deltas and checking if they match our type. If the action
+        # has already occured then the delta has gone.
+
+        async def _wait_for_action_status():
+            while True:
+                action_output = await action_facade.Actions(entity)
+                if action_output.results[0].status in ('completed', 'failed'):
+                    return
+                else:
+                    await asyncio.sleep(1)
+        await asyncio.wait_for(
+            _wait_for_action_status(),
+            timeout=wait)
+        action_output = await action_facade.Actions(entity)
+        # ActionResult.output is None if the action produced no output
+        if action_output.results[0].output is None:
+            output = {}
+        else:
+            output = action_output.results[0].output
+        return output
+
+    async def get_action_status(self, uuid_or_prefix=None, name=None):
+        """Get the status of all actions, filtered by ID, ID prefix, or name.
+
+        :param str uuid_or_prefix: Filter by action uuid or prefix
+        :param str name: Filter by action name
+
+        """
+        results = {}
+        action_results = []
+        action_facade = client.ActionFacade.from_connection(
+            self.connection()
+        )
+        if name:
+            name_results = await action_facade.FindActionsByNames([name])
+            action_results.extend(name_results.actions[0].actions)
+        if uuid_or_prefix:
+            # Collect list of actions matching uuid or prefix
+            matching_actions = await action_facade.FindActionTagsByPrefix(
+                [uuid_or_prefix])
+            entities = []
+            for actions in matching_actions.matches.values():
+                entities = [{'tag': a.tag} for a in actions]
+            # Get action results matching action tags
+            uuid_results = await action_facade.Actions(entities)
+            action_results.extend(uuid_results.results)
+        for a in action_results:
+            results[tag.untag('action-', a.action.tag)] = a.status
+        return results
+
+    def get_budget(self, budget_name):
+        """Get budget usage info.
+
+        :param str budget_name: Name of budget
+
+        """
+        raise NotImplementedError()
+
+    async def get_status(self, filters=None, utc=False):
+        """Return the status of the model.
+
+        :param str filters: Optional list of applications, units, or machines
+            to include, which can use wildcards ('*').
+        :param bool utc: Display time as UTC in RFC3339 format
+
+        """
+        client_facade = client.ClientFacade.from_connection(self.connection())
+        return await client_facade.FullStatus(filters)
+
+    def sync_tools(
+            self, all_=False, destination=None, dry_run=False, public=False,
+            source=None, stream=None, version=None):
+        """Copy Juju tools into this model.
+
+        :param bool all_: Copy all versions, not just the latest
+        :param str destination: Path to local destination directory
+        :param bool dry_run: Don't do the actual copy
+        :param bool public: Tools are for a public cloud, so generate mirrors
+            information
+        :param str source: Path to local source directory
+        :param str stream: Simplestreams stream for which to sync metadata
+        :param str version: Copy a specific major.minor version
+
+        """
+        raise NotImplementedError()
+
+    def unblock(self, *commands):
+        """Unblock an operation that would alter this model.
+
+        :param str *commands: The commands to unblock. Valid values are
+            'all-changes', 'destroy-model', 'remove-object'
+
+        """
+        raise NotImplementedError()
+
+    def unset_config(self, *keys):
+        """Unset configuration on this model.
+
+        :param str *keys: The keys to unset
+
+        """
+        raise NotImplementedError()
+
+    def upgrade_gui(self):
+        """Upgrade the Juju GUI for this model.
+
+        """
+        raise NotImplementedError()
+
+    def upgrade_juju(
+            self, dry_run=False, reset_previous_upgrade=False,
+            upload_tools=False, version=None):
+        """Upgrade Juju on all machines in a model.
+
+        :param bool dry_run: Don't do the actual upgrade
+        :param bool reset_previous_upgrade: Clear the previous (incomplete)
+            upgrade status
+        :param bool upload_tools: Upload local version of tools
+        :param str version: Upgrade to a specific version
+
+        """
+        raise NotImplementedError()
+
+    def upload_backup(self, archive_path):
+        """Store a backup archive remotely in Juju.
+
+        :param str archive_path: Path to local archive
+
+        """
+        raise NotImplementedError()
+
+    @property
+    def charmstore(self):
+        return self._charmstore
+
+    async def get_metrics(self, *tags):
+        """Retrieve metrics.
+
+        :param str *tags: Tags of entities from which to retrieve metrics.
+            No tags retrieves the metrics of all units in the model.
+        :return: Dictionary of unit_name:metrics
+
+        """
+        log.debug("Retrieving metrics for %s",
+                  ', '.join(tags) if tags else "all units")
+
+        metrics_facade = client.MetricsDebugFacade.from_connection(
+            self.connection())
+
+        entities = [client.Entity(tag) for tag in tags]
+        metrics_result = await metrics_facade.GetMetrics(entities)
+
+        metrics = collections.defaultdict(list)
+
+        for entity_metrics in metrics_result.results:
+            error = entity_metrics.error
+            if error:
+                if "is not a valid tag" in error:
+                    raise ValueError(error.message)
+                else:
+                    raise Exception(error.message)
+
+            for metric in entity_metrics.metrics:
+                metrics[metric.unit].append(vars(metric))
+
+        return metrics
+
+
+def get_charm_series(path):
+    """Inspects the charm directory at ``path`` and returns a default
+    series from its metadata.yaml (the first item in the 'series' list).
+
+    Returns None if no series can be determined.
+
+    """
+    md = Path(path) / "metadata.yaml"
+    if not md.exists():
+        return None
+    data = yaml.load(md.open())
+    series = data.get('series')
+    return series[0] if series else None
+
+
+class BundleHandler:
+    """
+    Handle bundles by using the API to translate bundle YAML into a plan of
+    steps and then dispatching each of those using the API.
+    """
+    def __init__(self, model):
+        self.model = model
+        self.charmstore = model.charmstore
+        self.plan = []
+        self.references = {}
+        self._units_by_app = {}
+        for unit_name, unit in model.units.items():
+            app_units = self._units_by_app.setdefault(unit.application, [])
+            app_units.append(unit_name)
+        self.bundle_facade = client.BundleFacade.from_connection(
+            model.connection())
+        self.client_facade = client.ClientFacade.from_connection(
+            model.connection())
+        self.app_facade = client.ApplicationFacade.from_connection(
+            model.connection())
+        self.ann_facade = client.AnnotationsFacade.from_connection(
+            model.connection())
+
+    async def _handle_local_charms(self, bundle):
+        """Search for references to local charms (i.e. filesystem paths)
+        in the bundle. Upload the local charms to the model, and replace
+        the filesystem paths with appropriate 'local:' paths in the bundle.
+
+        Return the modified bundle.
+
+        :param dict bundle: Bundle dictionary
+        :return: Modified bundle dictionary
+
+        """
+        apps, args = [], []
+
+        default_series = bundle.get('series')
+        apps_dict = bundle.get('applications', bundle.get('services', {}))
+        for app_name in self.applications:
+            app_dict = apps_dict[app_name]
+            charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
+            if not os.path.isdir(charm_dir):
+                continue
+            series = (
+                app_dict.get('series') or
+                default_series or
+                get_charm_series(charm_dir)
+            )
+            if not series:
+                raise JujuError(
+                    "Couldn't determine series for charm at {}. "
+                    "Add a 'series' key to the bundle.".format(charm_dir))
+
+            # Keep track of what we need to update. We keep a list of apps
+            # that need to be updated, and a corresponding list of args
+            # needed to update those apps.
+            apps.append(app_name)
+            args.append((charm_dir, series))
+
+        if apps:
+            # If we have apps to update, spawn all the coroutines concurrently
+            # and wait for them to finish.
+            charm_urls = await asyncio.gather(*[
+                self.model.add_local_charm_dir(*params)
+                for params in args
+            ], loop=self.model.loop)
+            # Update the 'charm:' entry for each app with the new 'local:' url.
+            for app_name, charm_url in zip(apps, charm_urls):
+                apps_dict[app_name]['charm'] = charm_url
+
+        return bundle
+
+    async def fetch_plan(self, entity_id):
+        is_store_url = entity_id.startswith('cs:')
+
+        if not is_store_url and os.path.isfile(entity_id):
+            bundle_yaml = Path(entity_id).read_text()
+        elif not is_store_url and os.path.isdir(entity_id):
+            bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
+        else:
+            bundle_yaml = await self.charmstore.files(entity_id,
+                                                      filename='bundle.yaml',
+                                                      read_file=True)
+        self.bundle = yaml.safe_load(bundle_yaml)
+        self.bundle = await self._handle_local_charms(self.bundle)
+
+        self.plan = await self.bundle_facade.GetChanges(
+            yaml.dump(self.bundle))
+
+        if self.plan.errors:
+            raise JujuError(self.plan.errors)
+
+    async def execute_plan(self):
+        for step in self.plan.changes:
+            method = getattr(self, step.method)
+            result = await method(*step.args)
+            self.references[step.id_] = result
+
+    @property
+    def applications(self):
+        apps_dict = self.bundle.get('applications',
+                                    self.bundle.get('services', {}))
+        return list(apps_dict.keys())
+
+    def resolve(self, reference):
+        if reference and reference.startswith('$'):
+            reference = self.references[reference[1:]]
+        return reference
+
+    async def addCharm(self, charm, series):
+        """
+        :param charm string:
+            Charm holds the URL of the charm to be added.
+
+        :param series string:
+            Series holds the series of the charm to be added
+            if the charm default is not sufficient.
+        """
+        # We don't add local charms because they've already been added
+        # by self._handle_local_charms
+        if charm.startswith('local:'):
+            return charm
+
+        entity_id = await self.charmstore.entityId(charm)
+        log.debug('Adding %s', entity_id)
+        await self.client_facade.AddCharm(None, entity_id)
+        return entity_id
+
+    async def addMachines(self, params=None):
+        """
+        :param params dict:
+            Dictionary specifying the machine to add. All keys are optional.
+            Keys include:
+
+            series: string specifying the machine OS series.
+
+            constraints: string holding machine constraints, if any. We'll
+                parse this into the json friendly dict that the juju api
+                expects.
+
+            container_type: string holding the type of the container (for
+                instance ""lxd" or kvm"). It is not specified for top level
+                machines.
+
+            parent_id: string holding a placeholder pointing to another
+                machine change or to a unit change. This value is only
+                specified in the case this machine is a container, in
+                which case also ContainerType is set.
+
+        """
+        params = params or {}
+
+        # Normalize keys
+        params = {normalize_key(k): params[k] for k in params.keys()}
+
+        # Fix up values, as necessary.
+        if 'parent_id' in params:
+            if params['parent_id'].startswith('$addUnit'):
+                unit = self.resolve(params['parent_id'])[0]
+                params['parent_id'] = unit.machine.entity_id
+            else:
+                params['parent_id'] = self.resolve(params['parent_id'])
+
+        params['constraints'] = parse_constraints(
+            params.get('constraints'))
+        params['jobs'] = params.get('jobs', ['JobHostUnits'])
+
+        if params.get('container_type') == 'lxc':
+            log.warning('Juju 2.0 does not support lxc containers. '
+                        'Converting containers to lxd.')
+            params['container_type'] = 'lxd'
+
+        # Submit the request.
+        params = client.AddMachineParams(**params)
+        results = await self.client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s" % error.message)
+        machine = results.machines[0].machine
+        log.debug('Added new machine %s', machine)
+        return machine
+
+    async def addRelation(self, endpoint1, endpoint2):
+        """
+        :param endpoint1 string:
+        :param endpoint2 string:
+            Endpoint1 and Endpoint2 hold relation endpoints in the
+            "application:interface" form, where the application is always a
+            placeholder pointing to an application change, and the interface is
+            optional. Examples are "$deploy-42:web" or just "$deploy-42".
+        """
+        endpoints = [endpoint1, endpoint2]
+        # resolve indirect references
+        for i in range(len(endpoints)):
+            parts = endpoints[i].split(':')
+            parts[0] = self.resolve(parts[0])
+            endpoints[i] = ':'.join(parts)
+
+        log.info('Relating %s <-> %s', *endpoints)
+        return await self.model.add_relation(*endpoints)
+
+    async def deploy(self, charm, series, application, options, constraints,
+                     storage, endpoint_bindings, *args):
+        """
+        :param charm string:
+            Charm holds the URL of the charm to be used to deploy this
+            application.
+
+        :param series string:
+            Series holds the series of the application to be deployed
+            if the charm default is not sufficient.
+
+        :param application string:
+            Application holds the application name.
+
+        :param options map[string]interface{}:
+            Options holds application options.
+
+        :param constraints string:
+            Constraints holds the optional application constraints.
+
+        :param storage map[string]string:
+            Storage holds the optional storage constraints.
+
+        :param endpoint_bindings map[string]string:
+            EndpointBindings holds the optional endpoint bindings
+
+        :param devices map[string]string:
+            Devices holds the optional devices constraints.
+            (Only given on Juju 2.5+)
+
+        :param resources map[string]int:
+            Resources identifies the revision to use for each resource
+            of the application's charm.
+
+        :param num_units int:
+            NumUnits holds the number of units required.  For IAAS models, this
+            will be 0 and separate AddUnitChanges will be used.  For Kubernetes
+            models, this will be used to scale the application.
+            (Only given on Juju 2.5+)
+        """
+        # resolve indirect references
+        charm = self.resolve(charm)
+
+        if len(args) == 1:
+            # Juju 2.4 and below only sends the resources
+            resources = args[0]
+            devices, num_units = None, None
+        else:
+            # Juju 2.5+ sends devices before resources, as well as num_units
+            # There might be placement but we need to ignore that.
+            devices, resources, num_units = args[:3]
+
+        if not charm.startswith('local:'):
+            resources = await self.model._add_store_resources(
+                application, charm, overrides=resources)
+        await self.model._deploy(
+            charm_url=charm,
+            application=application,
+            series=series,
+            config=options,
+            constraints=constraints,
+            endpoint_bindings=endpoint_bindings,
+            resources=resources,
+            storage=storage,
+            devices=devices,
+            num_units=num_units,
+        )
+        return application
+
+    async def addUnit(self, application, to):
+        """
+        :param application string:
+            Application holds the application placeholder name for which a unit
+            is added.
+
+        :param to string:
+            To holds the optional location where to add the unit, as a
+            placeholder pointing to another unit change or to a machine change.
+        """
+        application = self.resolve(application)
+        placement = self.resolve(to)
+        if self._units_by_app.get(application):
+            # enough units for this application already exist;
+            # claim one, and carry on
+            # NB: this should probably honor placement, but the juju client
+            # doesn't, so we're not bothering, either
+            unit_name = self._units_by_app[application].pop()
+            log.debug('Reusing unit %s for %s', unit_name, application)
+            return self.model.units[unit_name]
+
+        log.debug('Adding new unit for %s%s', application,
+                  ' to %s' % placement if placement else '')
+        return await self.model.applications[application].add_unit(
+            count=1,
+            to=placement,
+        )
+
+    async def scale(self, application, scale):
+        """
+        Handle a change of scale to a k8s application.
+
+        :param string application:
+            Application holds the application placeholder name for which a unit
+            is added.
+
+        :param int scale:
+            New scale value to use.
+        """
+        application = self.resolve(application)
+        return await self.model.applications[application].scale(scale=scale)
+
+    async def expose(self, application):
+        """
+        :param application string:
+            Application holds the placeholder name of the application that must
+            be exposed.
+        """
+        application = self.resolve(application)
+        log.info('Exposing %s', application)
+        return await self.model.applications[application].expose()
+
+    async def setAnnotations(self, id_, entity_type, annotations):
+        """
+        :param id_ string:
+            Id is the placeholder for the application or machine change
+            corresponding to the entity to be annotated.
+
+        :param entity_type EntityType:
+            EntityType holds the type of the entity, "application" or
+            "machine".
+
+        :param annotations map[string]string:
+            Annotations holds the annotations as key/value pairs.
+        """
+        entity_id = self.resolve(id_)
+        try:
+            entity = self.model.state.get_entity(entity_type, entity_id)
+        except KeyError:
+            entity = await self.model._wait_for_new(entity_type, entity_id)
+        return await entity.set_annotations(annotations)
+
+
+class CharmStore:
+    """
+    Async wrapper around theblues.charmstore.CharmStore
+    """
+    def __init__(self, loop, cs_timeout=20):
+        self.loop = loop
+        self._cs = theblues.charmstore.CharmStore(timeout=cs_timeout)
+
+    def __getattr__(self, name):
+        """
+        Wrap method calls in coroutines that use run_in_executor to make them
+        async.
+        """
+        attr = getattr(self._cs, name)
+        if not callable(attr):
+            wrapper = partial(getattr, self._cs, name)
+            setattr(self, name, wrapper)
+        else:
+            async def coro(*args, **kwargs):
+                method = partial(attr, *args, **kwargs)
+                for attempt in range(1, 4):
+                    try:
+                        return await self.loop.run_in_executor(None, method)
+                    except theblues.errors.ServerError:
+                        if attempt == 3:
+                            raise
+                        await asyncio.sleep(1, loop=self.loop)
+            setattr(self, name, coro)
+            wrapper = coro
+        return wrapper
+
+
+class CharmArchiveGenerator:
+    """
+    Create a Zip archive of a local charm directory for upload to a controller.
+
+    This is used automatically by
+    `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_.
+    """
+    def __init__(self, path):
+        self.path = os.path.abspath(os.path.expanduser(path))
+
+    def make_archive(self, path):
+        """Create archive of directory and write to ``path``.
+
+        :param path: Path to archive
+
+        Ignored::
+
+            * build/* - This is used for packing the charm itself and any
+                          similar tasks.
+            * */.*    - Hidden files are all ignored for now.  This will most
+                          likely be changed into a specific ignore list
+                          (.bzr, etc)
+
+        """
+        zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
+        for dirpath, dirnames, filenames in os.walk(self.path):
+            relative_path = dirpath[len(self.path) + 1:]
+            if relative_path and not self._ignore(relative_path):
+                zf.write(dirpath, relative_path)
+            for name in filenames:
+                archive_name = os.path.join(relative_path, name)
+                if not self._ignore(archive_name):
+                    real_path = os.path.join(dirpath, name)
+                    self._check_type(real_path)
+                    if os.path.islink(real_path):
+                        self._check_link(real_path)
+                        self._write_symlink(
+                            zf, os.readlink(real_path), archive_name)
+                    else:
+                        zf.write(real_path, archive_name)
+        zf.close()
+        return path
+
+    def _check_type(self, path):
+        """Check the path
+        """
+        s = os.stat(path)
+        if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
+            return path
+        raise ValueError("Invalid Charm at % %s" % (
+            path, "Invalid file type for a charm"))
+
+    def _check_link(self, path):
+        link_path = os.readlink(path)
+        if link_path[0] == "/":
+            raise ValueError(
+                "Invalid Charm at %s: %s" % (
+                    path, "Absolute links are invalid"))
+        path_dir = os.path.dirname(path)
+        link_path = os.path.join(path_dir, link_path)
+        if not link_path.startswith(os.path.abspath(self.path)):
+            raise ValueError(
+                "Invalid charm at %s %s" % (
+                    path, "Only internal symlinks are allowed"))
+
+    def _write_symlink(self, zf, link_target, link_path):
+        """Package symlinks with appropriate zipfile metadata."""
+        info = zipfile.ZipInfo()
+        info.filename = link_path
+        info.create_system = 3
+        # Magic code for symlinks / py2/3 compat
+        # 27166663808 = (stat.S_IFLNK | 0755) << 16
+        info.external_attr = 2716663808
+        zf.writestr(info, link_target)
+
+    def _ignore(self, path):
+        if path == "build" or path.startswith("build/"):
+            return True
+        if path.startswith('.'):
+            return True