Expand integration tests to use stable/edge versions of juju (#155)
[osm/N2VC.git] / juju / model.py
index 7622178..bd8709a 100644 (file)
@@ -1,24 +1,34 @@
 import asyncio
 import asyncio
+import base64
 import collections
 import collections
+import hashlib
+import json
 import logging
 import os
 import re
 import logging
 import os
 import re
+import stat
+import tempfile
 import weakref
 import weakref
+import zipfile
 from concurrent.futures import CancelledError
 from functools import partial
 from pathlib import Path
 
 from concurrent.futures import CancelledError
 from functools import partial
 from pathlib import Path
 
+import websockets
 import yaml
 import yaml
-from theblues import charmstore
+import theblues.charmstore
+import theblues.errors
 
 
+from . import tag, utils
 from .client import client
 from .client import client
-from .client import watcher
 from .client import connection
 from .client import connection
+from .client.client import ConfigValue
 from .constraints import parse as parse_constraints, normalize_key
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
 from .constraints import parse as parse_constraints, normalize_key
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
-from .errors import JujuAPIError
+from .errors import JujuError, JujuAPIError
+from .placement import parse as parse_placement
 
 log = logging.getLogger(__name__)
 
 
 log = logging.getLogger(__name__)
 
@@ -68,6 +78,9 @@ class _Observer(object):
 
 
 class ModelObserver(object):
 
 
 class ModelObserver(object):
+    """
+    Base class for creating observers that react to changes in a model.
+    """
     async def __call__(self, delta, old, new, model):
         handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
         method = getattr(self, handler_name, self.on_change)
     async def __call__(self, delta, old, new, model):
         handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
         method = getattr(self, handler_name, self.on_change)
@@ -76,6 +89,8 @@ class ModelObserver(object):
     async def on_change(self, delta, old, new, model):
         """Generic model-change handler.
 
     async def on_change(self, delta, old, new, model):
         """Generic model-change handler.
 
+        This should be overridden in a subclass.
+
         :param delta: :class:`juju.client.overrides.Delta`
         :param old: :class:`juju.model.ModelEntity`
         :param new: :class:`juju.model.ModelEntity`
         :param delta: :class:`juju.client.overrides.Delta`
         :param old: :class:`juju.model.ModelEntity`
         :param new: :class:`juju.model.ModelEntity`
@@ -224,7 +239,14 @@ class ModelEntity(object):
         model.
 
         """
         model.
 
         """
-        return self.safe_data[name]
+        try:
+            return self.safe_data[name]
+        except KeyError:
+            name = name.replace('_', '-')
+            if name in self.safe_data:
+                return self.safe_data[name]
+            else:
+                raise
 
     def __bool__(self):
         return bool(self.data)
 
     def __bool__(self):
         return bool(self.data)
@@ -359,28 +381,49 @@ class ModelEntity(object):
 
 
 class Model(object):
 
 
 class Model(object):
-    def __init__(self, loop=None):
+    """
+    The main API for interacting with a Juju model.
+    """
+    def __init__(self, loop=None,
+                 max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE):
         """Instantiate a new connected Model.
 
         :param loop: an asyncio event loop
         """Instantiate a new connected Model.
 
         :param loop: an asyncio event loop
+        :param max_frame_size: See
+            `juju.client.connection.Connection.MAX_FRAME_SIZE`
 
         """
         self.loop = loop or asyncio.get_event_loop()
 
         """
         self.loop = loop or asyncio.get_event_loop()
+        self.max_frame_size = max_frame_size
         self.connection = None
         self.observers = weakref.WeakValueDictionary()
         self.state = ModelState(self)
         self.info = None
         self.connection = None
         self.observers = weakref.WeakValueDictionary()
         self.state = ModelState(self)
         self.info = None
-        self._watcher_task = None
-        self._watch_shutdown = asyncio.Event(loop=loop)
-        self._watch_received = asyncio.Event(loop=loop)
+        self._watch_stopping = asyncio.Event(loop=self.loop)
+        self._watch_stopped = asyncio.Event(loop=self.loop)
+        self._watch_received = asyncio.Event(loop=self.loop)
         self._charmstore = CharmStore(self.loop)
 
         self._charmstore = CharmStore(self.loop)
 
+    async def __aenter__(self):
+        await self.connect_current()
+        return self
+
+    async def __aexit__(self, exc_type, exc, tb):
+        await self.disconnect()
+
+        if exc_type is not None:
+            return False
+
     async def connect(self, *args, **kw):
         """Connect to an arbitrary Juju model.
 
         args and kw are passed through to Connection.connect()
 
         """
     async def connect(self, *args, **kw):
         """Connect to an arbitrary Juju model.
 
         args and kw are passed through to Connection.connect()
 
         """
+        if 'loop' not in kw:
+            kw['loop'] = self.loop
+        if 'max_frame_size' not in kw:
+            kw['max_frame_size'] = self.max_frame_size
         self.connection = await connection.Connection.connect(*args, **kw)
         await self._after_connect()
 
         self.connection = await connection.Connection.connect(*args, **kw)
         await self._after_connect()
 
@@ -388,7 +431,8 @@ class Model(object):
         """Connect to the current Juju model.
 
         """
         """Connect to the current Juju model.
 
         """
-        self.connection = await connection.Connection.connect_current()
+        self.connection = await connection.Connection.connect_current(
+            self.loop, max_frame_size=self.max_frame_size)
         await self._after_connect()
 
     async def connect_model(self, model_name):
         await self._after_connect()
 
     async def connect_model(self, model_name):
@@ -397,7 +441,8 @@ class Model(object):
         :param model_name:  Format [controller:][user/]model
 
         """
         :param model_name:  Format [controller:][user/]model
 
         """
-        self.connection = await connection.Connection.connect_model(model_name)
+        self.connection = await connection.Connection.connect_model(
+            model_name, self.loop, self.max_frame_size)
         await self._after_connect()
 
     async def _after_connect(self):
         await self._after_connect()
 
     async def _after_connect(self):
@@ -412,13 +457,67 @@ class Model(object):
         """Shut down the watcher task and close websockets.
 
         """
         """Shut down the watcher task and close websockets.
 
         """
-        self._stop_watching()
         if self.connection and self.connection.is_open:
         if self.connection and self.connection.is_open:
-            await self._watch_shutdown.wait()
+            log.debug('Stopping watcher task')
+            self._watch_stopping.set()
+            await self._watch_stopped.wait()
             log.debug('Closing model connection')
             await self.connection.close()
             self.connection = None
 
             log.debug('Closing model connection')
             await self.connection.close()
             self.connection = None
 
+    async def add_local_charm_dir(self, charm_dir, series):
+        """Upload a local charm to the model.
+
+        This will automatically generate an archive from
+        the charm dir.
+
+        :param charm_dir: Path to the charm directory
+        :param series: Charm series
+
+        """
+        fh = tempfile.NamedTemporaryFile()
+        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
+        with fh:
+            func = partial(
+                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
+            charm_url = await self.loop.run_in_executor(None, func)
+
+        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
+        return charm_url
+
+    def add_local_charm(self, charm_file, series, size=None):
+        """Upload a local charm archive to the model.
+
+        Returns the 'local:...' url that should be used to deploy the charm.
+
+        :param charm_file: Path to charm zip archive
+        :param series: Charm series
+        :param size: Size of the archive, in bytes
+        :return str: 'local:...' url for deploying the charm
+        :raises: :class:`JujuError` if the upload fails
+
+        Uses an https endpoint at the same host:port as the wss.
+        Supports large file uploads.
+
+        .. warning::
+
+           This method will block. Consider using :meth:`add_local_charm_dir`
+           instead.
+
+        """
+        conn, headers, path_prefix = self.connection.https_connection()
+        path = "%s/charms?series=%s" % (path_prefix, series)
+        headers['Content-Type'] = 'application/zip'
+        if size:
+            headers['Content-Length'] = size
+        conn.request("POST", path, charm_file, headers)
+        response = conn.getresponse()
+        result = response.read().decode()
+        if not response.status == 200:
+            raise JujuError(result)
+        result = json.loads(result)
+        return result['charm-url']
+
     def all_units_idle(self):
         """Return True if all units are idle.
 
     def all_units_idle(self):
         """Return True if all units are idle.
 
@@ -453,8 +552,10 @@ class Model(object):
         """
         async def _block():
             while not all(c() for c in conditions):
         """
         async def _block():
             while not all(c() for c in conditions):
-                await asyncio.sleep(wait_period)
-        await asyncio.wait_for(_block(), timeout)
+                if not (self.connection and self.connection.is_open):
+                    raise websockets.ConnectionClosed(1006, 'no reason')
+                await asyncio.sleep(wait_period, loop=self.loop)
+        await asyncio.wait_for(_block(), timeout, loop=self.loop)
 
     @property
     def applications(self):
 
     @property
     def applications(self):
@@ -493,8 +594,7 @@ class Model(object):
         explicit call to this method.
 
         """
         explicit call to this method.
 
         """
-        facade = client.ClientFacade()
-        facade.connect(self.connection)
+        facade = client.ClientFacade.from_connection(self.connection)
 
         self.info = await facade.ModelInfo()
         log.debug('Got ModelInfo: %s', vars(self.info))
 
         self.info = await facade.ModelInfo()
         log.debug('Got ModelInfo: %s', vars(self.info))
@@ -547,42 +647,64 @@ class Model(object):
         See :meth:`add_observer` to register an onchange callback.
 
         """
         See :meth:`add_observer` to register an onchange callback.
 
         """
-        async def _start_watch():
-            self._watch_shutdown.clear()
+        async def _all_watcher():
             try:
             try:
-                allwatcher = watcher.AllWatcher()
-                self._watch_conn = await self.connection.clone()
-                allwatcher.connect(self._watch_conn)
-                while True:
-                    results = await allwatcher.Next()
+                allwatcher = client.AllWatcherFacade.from_connection(
+                    self.connection)
+                while not self._watch_stopping.is_set():
+                    try:
+                        results = await utils.run_with_interrupt(
+                            allwatcher.Next(),
+                            self._watch_stopping,
+                            self.loop)
+                    except JujuAPIError as e:
+                        if 'watcher was stopped' not in str(e):
+                            raise
+                        if self._watch_stopping.is_set():
+                            # this shouldn't ever actually happen, because
+                            # the event should trigger before the controller
+                            # has a chance to tell us the watcher is stopped
+                            # but handle it gracefully, just in case
+                            break
+                        # controller stopped our watcher for some reason
+                        # but we're not actually stopping, so just restart it
+                        log.warning(
+                            'Watcher: watcher stopped, restarting')
+                        del allwatcher.Id
+                        continue
+                    except websockets.ConnectionClosed:
+                        monitor = self.connection.monitor
+                        if monitor.status == monitor.ERROR:
+                            # closed unexpectedly, try to reopen
+                            log.warning(
+                                'Watcher: connection closed, reopening')
+                            await self.connection.reconnect()
+                            del allwatcher.Id
+                            continue
+                        else:
+                            # closed on request, go ahead and shutdown
+                            break
+                    if self._watch_stopping.is_set():
+                        await allwatcher.Stop()
+                        break
                     for delta in results.deltas:
                         delta = get_entity_delta(delta)
                         old_obj, new_obj = self.state.apply_delta(delta)
                     for delta in results.deltas:
                         delta = get_entity_delta(delta)
                         old_obj, new_obj = self.state.apply_delta(delta)
-                        # XXX: Might not want to shield at this level
-                        # We are shielding because when the watcher is
-                        # canceled (on disconnect()), we don't want all of
-                        # its children (every observer callback) to be
-                        # canceled with it. So we shield them. But this means
-                        # they can *never* be canceled.
-                        await asyncio.shield(
-                            self._notify_observers(delta, old_obj, new_obj))
+                        await self._notify_observers(delta, old_obj, new_obj)
                     self._watch_received.set()
             except CancelledError:
                     self._watch_received.set()
             except CancelledError:
-                log.debug('Closing watcher connection')
-                await self._watch_conn.close()
-                self._watch_shutdown.set()
-                self._watch_conn = None
+                pass
+            except Exception:
+                log.exception('Error in watcher')
+                raise
+            finally:
+                self._watch_stopped.set()
 
         log.debug('Starting watcher task')
 
         log.debug('Starting watcher task')
-        self._watcher_task = self.loop.create_task(_start_watch())
-
-    def _stop_watching(self):
-        """Stop the asynchronous watch against this model.
-
-        """
-        log.debug('Stopping watcher task')
-        if self._watcher_task:
-            self._watcher_task.cancel()
+        self._watch_received.clear()
+        self._watch_stopping.clear()
+        self._watch_stopped.clear()
+        self.loop.create_task(_all_watcher())
 
     async def _notify_observers(self, delta, old_obj, new_obj):
         """Call observing callbacks, notifying them of a change in model state
 
     async def _notify_observers(self, delta, old_obj, new_obj):
         """Call observing callbacks, notifying them of a change in model state
@@ -604,7 +726,8 @@ class Model(object):
 
         for o in self.observers:
             if o.cares_about(delta):
 
         for o in self.observers:
             if o.cares_about(delta):
-                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+                                      loop=self.loop)
 
     async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
 
     async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
@@ -613,7 +736,7 @@ class Model(object):
 
         :param entity_type: The entity's type.
         :param entity_id: The entity's id.
 
         :param entity_type: The entity's type.
         :param entity_id: The entity's id.
-        :param action: the type of action (e.g., 'add' or 'change')
+        :param action: the type of action (e.g., 'add', 'change', or 'remove')
         :param predicate: optional callable that must take as an
             argument a delta, and must return a boolean, indicating
             whether the delta contains the specific action we're looking
         :param predicate: optional callable that must take as an
             argument a delta, and must return a boolean, indicating
             whether the delta contains the specific action we're looking
@@ -628,7 +751,9 @@ class Model(object):
 
         self.add_observer(callback, entity_type, action, entity_id, predicate)
         entity_id = await q.get()
 
         self.add_observer(callback, entity_type, action, entity_id, predicate)
         entity_id = await q.get()
-        return self.state._live_entity_map(entity_type)[entity_id]
+        # object might not be in the entity_map if we were waiting for a
+        # 'remove' action
+        return self.state._live_entity_map(entity_type).get(entity_id)
 
     async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
         """Wait for a new object to appear in the Model and return it.
 
     async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
         """Wait for a new object to appear in the Model and return it.
@@ -662,9 +787,8 @@ class Model(object):
 
         return await self._wait('action', action_id, 'change', predicate)
 
 
         return await self._wait('action', action_id, 'change', predicate)
 
-    def add_machine(
-            self, spec=None, constraints=None, disks=None, series=None,
-            count=1):
+    async def add_machine(
+            self, spec=None, constraints=None, disks=None, series=None):
         """Start a new, empty machine and optionally a container, or add a
         container to a machine.
 
         """Start a new, empty machine and optionally a container, or add a
         container to a machine.
 
@@ -672,25 +796,83 @@ class Model(object):
             Examples::
 
                 (None) - starts a new machine
             Examples::
 
                 (None) - starts a new machine
-                'lxc' - starts a new machine with on lxc container
-                'lxc:4' - starts a new lxc container on machine 4
+                'lxd' - starts a new machine with one lxd container
+                'lxd:4' - starts a new lxd container on machine 4
                 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
                 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
                 'maas2.name' - acquire machine maas2.name on MAAS
                 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
                 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
                 'maas2.name' - acquire machine maas2.name on MAAS
-        :param constraints: Machine constraints
-        :type constraints: :class:`juju.Constraints`
-        :param list disks: List of disk :class:`constraints <juju.Constraints>`
-        :param str series: Series
-        :param int count: Number of machines to deploy
 
 
-        Supported container types are: lxc, lxd, kvm
+        :param dict constraints: Machine constraints, which can contain the
+            the following keys::
+
+                arch : str
+                container : str
+                cores : int
+                cpu_power : int
+                instance_type : str
+                mem : int
+                root_disk : int
+                spaces : list(str)
+                tags : list(str)
+                virt_type : str
+
+            Example::
+
+                constraints={
+                    'mem': 256 * MB,
+                    'tags': ['virtual'],
+                }
+
+        :param list disks: List of disk constraint dictionaries, which can
+            contain the following keys::
+
+                count : int
+                pool : str
+                size : int
+
+            Example::
+
+                disks=[{
+                    'pool': 'rootfs',
+                    'size': 10 * GB,
+                    'count': 1,
+                }]
+
+        :param str series: Series, e.g. 'xenial'
+
+        Supported container types are: lxd, kvm
 
         When deploying a container to an existing machine, constraints cannot
         be used.
 
         """
 
         When deploying a container to an existing machine, constraints cannot
         be used.
 
         """
-        pass
-    add_machines = add_machine
+        params = client.AddMachineParams()
+        params.jobs = ['JobHostUnits']
+
+        if spec:
+            placement = parse_placement(spec)
+            if placement:
+                params.placement = placement[0]
+
+        if constraints:
+            params.constraints = client.Value.from_json(constraints)
+
+        if disks:
+            params.disks = [
+                client.Constraints.from_json(o) for o in disks]
+
+        if series:
+            params.series = series
+
+        # Submit the request.
+        client_facade = client.ClientFacade.from_connection(self.connection)
+        results = await client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s" % error.message)
+        machine_id = results.machines[0].machine
+        log.debug('Added new machine %s', machine_id)
+        return await self._wait_for_new('machine', machine_id)
 
     async def add_relation(self, relation1, relation2):
         """Add a relation between two applications.
 
     async def add_relation(self, relation1, relation2):
         """Add a relation between two applications.
@@ -699,8 +881,7 @@ class Model(object):
         :param str relation2: '<application>[:<relation_name>]'
 
         """
         :param str relation2: '<application>[:<relation_name>]'
 
         """
-        app_facade = client.ApplicationFacade()
-        app_facade.connect(self.connection)
+        app_facade = client.ApplicationFacade.from_connection(self.connection)
 
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
@@ -734,15 +915,17 @@ class Model(object):
         :param \*cidrs: Optional list of existing subnet CIDRs
 
         """
         :param \*cidrs: Optional list of existing subnet CIDRs
 
         """
-        pass
+        raise NotImplementedError()
 
 
-    def add_ssh_key(self, key):
+    async def add_ssh_key(self, user, key):
         """Add a public SSH key to this model.
 
         """Add a public SSH key to this model.
 
+        :param str user: The username of the user
         :param str key: The public ssh key
 
         """
         :param str key: The public ssh key
 
         """
-        pass
+        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        return await key_facade.AddKeys([key], user)
     add_ssh_keys = add_ssh_key
 
     def add_subnet(self, cidr_or_id, space, *zones):
     add_ssh_keys = add_ssh_key
 
     def add_subnet(self, cidr_or_id, space, *zones):
@@ -753,13 +936,13 @@ class Model(object):
         :param str \*zones: Zone(s) in which the subnet resides
 
         """
         :param str \*zones: Zone(s) in which the subnet resides
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_backups(self):
         """Retrieve metadata for backups in this model.
 
         """
 
     def get_backups(self):
         """Retrieve metadata for backups in this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def block(self, *commands):
         """Add a new block to this model.
 
     def block(self, *commands):
         """Add a new block to this model.
@@ -768,13 +951,13 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_blocks(self):
         """List blocks for this model.
 
         """
 
     def get_blocks(self):
         """List blocks for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_cached_images(self, arch=None, kind=None, series=None):
         """Return a list of cached OS images.
 
     def get_cached_images(self, arch=None, kind=None, series=None):
         """Return a list of cached OS images.
@@ -784,7 +967,7 @@ class Model(object):
         :param str series: Filter by image series, e.g. 'xenial'
 
         """
         :param str series: Filter by image series, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_backup(self, note=None, no_download=False):
         """Create a backup of this model.
 
     def create_backup(self, note=None, no_download=False):
         """Create a backup of this model.
@@ -794,7 +977,7 @@ class Model(object):
         :return str: Path to downloaded archive
 
         """
         :return str: Path to downloaded archive
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_storage_pool(self, name, provider_type, **pool_config):
         """Create or define a storage pool.
 
     def create_storage_pool(self, name, provider_type, **pool_config):
         """Create or define a storage pool.
@@ -804,7 +987,7 @@ class Model(object):
         :param \*\*pool_config: key/value pool configuration pairs
 
         """
         :param \*\*pool_config: key/value pool configuration pairs
 
         """
-        pass
+        raise NotImplementedError()
 
     def debug_log(
             self, no_tail=False, exclude_module=None, include_module=None,
 
     def debug_log(
             self, no_tail=False, exclude_module=None, include_module=None,
@@ -828,7 +1011,23 @@ class Model(object):
         :param list exclude: Do not show log messages for these entities
 
         """
         :param list exclude: Do not show log messages for these entities
 
         """
-        pass
+        raise NotImplementedError()
+
+    def _get_series(self, entity_url, entity):
+        # try to get the series from the provided charm URL
+        if entity_url.startswith('cs:'):
+            parts = entity_url[3:].split('/')
+        else:
+            parts = entity_url.split('/')
+        if parts[0].startswith('~'):
+            parts.pop(0)
+        if len(parts) > 1:
+            # series was specified in the URL
+            return parts[0]
+        # series was not supplied at all, so use the newest
+        # supported series according to the charm store
+        ss = entity['Meta']['supported-series']
+        return ss['SupportedSeries'][0]
 
     async def deploy(
             self, entity_url, application_name=None, bind=None, budget=None,
 
     async def deploy(
             self, entity_url, application_name=None, bind=None, budget=None,
@@ -842,7 +1041,7 @@ class Model(object):
         :param dict bind: <charm endpoint>:<network space> pairs
         :param dict budget: <budget name>:<limit> pairs
         :param str channel: Charm store channel from which to retrieve
         :param dict bind: <charm endpoint>:<network space> pairs
         :param dict budget: <budget name>:<limit> pairs
         :param str channel: Charm store channel from which to retrieve
-            the charm or bundle, e.g. 'development'
+            the charm or bundle, e.g. 'edge'
         :param dict config: Charm configuration dictionary
         :param constraints: Service constraints
         :type constraints: :class:`juju.Constraints`
         :param dict config: Charm configuration dictionary
         :param constraints: Service constraints
         :type constraints: :class:`juju.Constraints`
@@ -853,44 +1052,37 @@ class Model(object):
         :param dict resources: <resource name>:<file path> pairs
         :param str series: Series on which to deploy
         :param dict storage: Storage constraints TODO how do these look?
         :param dict resources: <resource name>:<file path> pairs
         :param str series: Series on which to deploy
         :param dict storage: Storage constraints TODO how do these look?
-        :param str to: Placement directive, e.g.::
+        :param to: Placement directive as a string. For example:
 
 
-            '23' - machine 23
-            'lxc:7' - new lxc container on machine 7
-            '24/lxc/3' - lxc container 3 or machine 24
+            '23' - place on machine 23
+            'lxd:7' - place in new lxd container on machine 7
+            '24/lxd/3' - place in container 3 on machine 24
 
             If None, a new machine is provisioned.
 
 
         TODO::
 
 
             If None, a new machine is provisioned.
 
 
         TODO::
 
-            - application_name is required; fill this in automatically if not
-              provided by caller
-            - series is required; how do we pick a default?
+            - support local resources
 
         """
 
         """
-        if to:
-            placement = [
-                client.Placement(**p) for p in to
-            ]
-        else:
-            placement = []
-
         if storage:
             storage = {
                 k: client.Constraints(**v)
                 for k, v in storage.items()
             }
 
         if storage:
             storage = {
                 k: client.Constraints(**v)
                 for k, v in storage.items()
             }
 
-        is_local = not entity_url.startswith('cs:') and \
+        is_local = (
+            entity_url.startswith('local:') or
             os.path.isdir(entity_url)
             os.path.isdir(entity_url)
-        entity_id = await self.charmstore.entityId(entity_url) \
-            if not is_local else entity_url
+        )
+        if is_local:
+            entity_id = entity_url.replace('local:', '')
+        else:
+            entity = await self.charmstore.entity(entity_url, channel=channel)
+            entity_id = entity['Id']
 
 
-        app_facade = client.ApplicationFacade()
-        client_facade = client.ClientFacade()
-        app_facade.connect(self.connection)
-        client_facade.connect(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection)
 
         is_bundle = ((is_local and
                       (Path(entity_id) / 'bundle.yaml').exists()) or
 
         is_bundle = ((is_local and
                       (Path(entity_id) / 'bundle.yaml').exists()) or
@@ -907,52 +1099,133 @@ class Model(object):
                 # haven't made it yet we'll need to wait on them to be added
                 await asyncio.gather(*[
                     asyncio.ensure_future(
                 # haven't made it yet we'll need to wait on them to be added
                 await asyncio.gather(*[
                     asyncio.ensure_future(
-                        self._wait_for_new('application', app_name))
+                        self._wait_for_new('application', app_name),
+                        loop=self.loop)
                     for app_name in pending_apps
                     for app_name in pending_apps
-                ])
+                ], loop=self.loop)
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
-            log.debug(
-                'Deploying %s', entity_id)
-
-            await client_facade.AddCharm(channel, entity_id)
-            app = client.ApplicationDeploy(
-                application=application_name,
-                channel=channel,
+            if not is_local:
+                if not application_name:
+                    application_name = entity['Meta']['charm-metadata']['Name']
+                if not series:
+                    series = self._get_series(entity_url, entity)
+                await client_facade.AddCharm(channel, entity_id)
+                # XXX: we're dropping local resources here, but we don't
+                # actually support them yet anyway
+                resources = await self._add_store_resources(application_name,
+                                                            entity_id,
+                                                            entity)
+            else:
+                # We have a local charm dir that needs to be uploaded
+                charm_dir = os.path.abspath(
+                    os.path.expanduser(entity_id))
+                series = series or get_charm_series(charm_dir)
+                if not series:
+                    raise JujuError(
+                        "Couldn't determine series for charm at {}. "
+                        "Pass a 'series' kwarg to Model.deploy().".format(
+                            charm_dir))
+                entity_id = await self.add_local_charm_dir(charm_dir, series)
+            return await self._deploy(
                 charm_url=entity_id,
                 charm_url=entity_id,
-                config=config,
-                constraints=parse_constraints(constraints),
+                application=application_name,
+                series=series,
+                config=config or {},
+                constraints=constraints,
                 endpoint_bindings=bind,
                 endpoint_bindings=bind,
-                num_units=num_units,
-                placement=placement,
                 resources=resources,
                 resources=resources,
-                series=series,
                 storage=storage,
                 storage=storage,
+                channel=channel,
+                num_units=num_units,
+                placement=parse_placement(to)
             )
 
             )
 
-            await app_facade.Deploy([app])
-            return await self._wait_for_new('application', application_name)
+    async def _add_store_resources(self, application, entity_url, entity=None):
+        if not entity:
+            # avoid extra charm store call if one was already made
+            entity = await self.charmstore.entity(entity_url)
+        resources = [
+            {
+                'description': resource['Description'],
+                'fingerprint': resource['Fingerprint'],
+                'name': resource['Name'],
+                'path': resource['Path'],
+                'revision': resource['Revision'],
+                'size': resource['Size'],
+                'type_': resource['Type'],
+                'origin': 'store',
+            } for resource in entity['Meta']['resources']
+        ]
+
+        if not resources:
+            return None
 
 
-    def destroy(self):
-        """Terminate all machines and resources for this model.
+        resources_facade = client.ResourcesFacade.from_connection(
+            self.connection)
+        response = await resources_facade.AddPendingResources(
+            tag.application(application),
+            entity_url,
+            [client.CharmResource(**resource) for resource in resources])
+        resource_map = {resource['name']: pid
+                        for resource, pid
+                        in zip(resources, response.pending_ids)}
+        return resource_map
 
 
+    async def _deploy(self, charm_url, application, series, config,
+                      constraints, endpoint_bindings, resources, storage,
+                      channel=None, num_units=None, placement=None):
+        """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
         """
         """
-        pass
+        log.info('Deploying %s', charm_url)
+
+        # stringify all config values for API, and convert to YAML
+        config = {k: str(v) for k, v in config.items()}
+        config = yaml.dump({application: config},
+                           default_flow_style=False)
+
+        app_facade = client.ApplicationFacade.from_connection(
+            self.connection)
+
+        app = client.ApplicationDeploy(
+            charm_url=charm_url,
+            application=application,
+            series=series,
+            channel=channel,
+            config_yaml=config,
+            constraints=parse_constraints(constraints),
+            endpoint_bindings=endpoint_bindings,
+            num_units=num_units,
+            resources=resources,
+            storage=storage,
+            placement=placement
+        )
+
+        result = await app_facade.Deploy([app])
+        errors = [r.error.message for r in result.results if r.error]
+        if errors:
+            raise JujuError('\n'.join(errors))
+        return await self._wait_for_new('application', application)
+
+    async def destroy(self):
+        """Terminate all machines and resources for this model.
+            Is already implemented in controller.py.
+        """
+        raise NotImplementedError()
 
     async def destroy_unit(self, *unit_names):
         """Destroy units by name.
 
         """
 
     async def destroy_unit(self, *unit_names):
         """Destroy units by name.
 
         """
-        app_facade = client.ApplicationFacade()
-        app_facade.connect(self.connection)
+        app_facade = client.ApplicationFacade.from_connection(self.connection)
 
         log.debug(
             'Destroying unit%s %s',
             's' if len(unit_names) == 1 else '',
             ' '.join(unit_names))
 
 
         log.debug(
             'Destroying unit%s %s',
             's' if len(unit_names) == 1 else '',
             ' '.join(unit_names))
 
-        return await app_facade.Destroy(self.name)
+        return await app_facade.DestroyUnits(list(unit_names))
     destroy_units = destroy_unit
 
     def get_backup(self, archive_id):
     destroy_units = destroy_unit
 
     def get_backup(self, archive_id):
@@ -962,7 +1235,7 @@ class Model(object):
         :return str: Path to the archive file
 
         """
         :return str: Path to the archive file
 
         """
-        pass
+        raise NotImplementedError()
 
     def enable_ha(
             self, num_controllers=0, constraints=None, series=None, to=None):
 
     def enable_ha(
             self, num_controllers=0, constraints=None, series=None, to=None):
@@ -981,28 +1254,44 @@ class Model(object):
             If None, a new machine is provisioned.
 
         """
             If None, a new machine is provisioned.
 
         """
-        pass
+        raise NotImplementedError()
 
 
-    def get_config(self):
+    async def get_config(self):
         """Return the configuration settings for this model.
 
         """Return the configuration settings for this model.
 
+        :returns: A ``dict`` mapping keys to `ConfigValue` instances,
+            which have `source` and `value` attributes.
         """
         """
-        pass
+        config_facade = client.ModelConfigFacade.from_connection(
+            self.connection
+        )
+        result = await config_facade.ModelGet()
+        config = result.config
+        for key, value in config.items():
+            config[key] = ConfigValue.from_json(value)
+        return config
 
     def get_constraints(self):
         """Return the machine constraints for this model.
 
         """
 
     def get_constraints(self):
         """Return the machine constraints for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
 
-    def grant(self, username, acl='read'):
+    async def grant(self, username, acl='read'):
         """Grant a user access to this model.
 
         :param str username: Username
         :param str acl: Access control ('read' or 'write')
 
         """
         """Grant a user access to this model.
 
         :param str username: Username
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        controller_conn = await self.connection.controller()
+        model_facade = client.ModelManagerFacade.from_connection(
+            controller_conn)
+        user = tag.user(username)
+        model = tag.model(self.info.uuid)
+        changes = client.ModifyModelAccess(acl, 'grant', model, user)
+        await self.revoke(username)
+        return await model_facade.ModifyModelAccess([changes])
 
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
 
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
@@ -1010,35 +1299,37 @@ class Model(object):
         :param str identity: User identity in the form <lp|gh>:<username>
 
         """
         :param str identity: User identity in the form <lp|gh>:<username>
 
         """
-        pass
+        raise NotImplementedError()
     import_ssh_keys = import_ssh_key
 
     import_ssh_keys = import_ssh_key
 
-    def get_machines(self, machine, utc=False):
+    async def get_machines(self):
         """Return list of machines in this model.
 
         """Return list of machines in this model.
 
-        :param str machine: Machine id, e.g. '0'
-        :param bool utc: Display time as UTC in RFC3339 format
-
         """
         """
-        pass
+        return list(self.state.machines.keys())
 
     def get_shares(self):
         """Return list of all users with access to this model.
 
         """
 
     def get_shares(self):
         """Return list of all users with access to this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_spaces(self):
         """Return list of all known spaces, including associated subnets.
 
         """
 
     def get_spaces(self):
         """Return list of all known spaces, including associated subnets.
 
         """
-        pass
+        raise NotImplementedError()
 
 
-    def get_ssh_key(self):
+    async def get_ssh_key(self, raw_ssh=False):
         """Return known SSH keys for this model.
         """Return known SSH keys for this model.
+        :param bool raw_ssh: if True, returns the raw ssh key,
+            else it's fingerprint
 
         """
 
         """
-        pass
+        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        entity = {'tag': tag.model(self.info.uuid)}
+        entities = client.Entities([entity])
+        return await key_facade.ListKeys(entities, raw_ssh)
     get_ssh_keys = get_ssh_key
 
     def get_storage(self, filesystem=False, volume=False):
     get_ssh_keys = get_ssh_key
 
     def get_storage(self, filesystem=False, volume=False):
@@ -1048,7 +1339,7 @@ class Model(object):
         :param bool volume: Include volume storage
 
         """
         :param bool volume: Include volume storage
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_storage_pools(self, names=None, providers=None):
         """Return list of storage pools.
 
     def get_storage_pools(self, names=None, providers=None):
         """Return list of storage pools.
@@ -1057,7 +1348,7 @@ class Model(object):
         :param list providers: Only include pools for these providers
 
         """
         :param list providers: Only include pools for these providers
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_subnets(self, space=None, zone=None):
         """Return list of known subnets.
 
     def get_subnets(self, space=None, zone=None):
         """Return list of known subnets.
@@ -1066,13 +1357,13 @@ class Model(object):
         :param str zone: Only include subnets in this zone
 
         """
         :param str zone: Only include subnets in this zone
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_blocks(self):
         """Remove all blocks from this model.
 
         """
 
     def remove_blocks(self):
         """Remove all blocks from this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_backup(self, backup_id):
         """Delete a backup.
 
     def remove_backup(self, backup_id):
         """Delete a backup.
@@ -1080,7 +1371,7 @@ class Model(object):
         :param str backup_id: The id of the backup to remove
 
         """
         :param str backup_id: The id of the backup to remove
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_cached_images(self, arch=None, kind=None, series=None):
         """Remove cached OS images.
 
     def remove_cached_images(self, arch=None, kind=None, series=None):
         """Remove cached OS images.
@@ -1090,7 +1381,7 @@ class Model(object):
         :param str series: Image series to remove, e.g. 'xenial'
 
         """
         :param str series: Image series to remove, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_machine(self, *machine_ids):
         """Remove a machine from this model.
 
     def remove_machine(self, *machine_ids):
         """Remove a machine from this model.
@@ -1098,16 +1389,21 @@ class Model(object):
         :param str \*machine_ids: Ids of the machines to remove
 
         """
         :param str \*machine_ids: Ids of the machines to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_machines = remove_machine
 
     remove_machines = remove_machine
 
-    def remove_ssh_key(self, *keys):
+    async def remove_ssh_key(self, user, key):
         """Remove a public SSH key(s) from this model.
 
         """Remove a public SSH key(s) from this model.
 
-        :param str \*keys: Keys to remove
+        :param str key: Full ssh key
+        :param str user: Juju user to which the key is registered
 
         """
 
         """
-        pass
+        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
+        key = hashlib.md5(key).hexdigest()
+        key = ':'.join(a+b for a, b in zip(key[::2], key[1::2]))
+        await key_facade.DeleteKeys([key], user)
     remove_ssh_keys = remove_ssh_key
 
     def restore_backup(
     remove_ssh_keys = remove_ssh_key
 
     def restore_backup(
@@ -1123,22 +1419,27 @@ class Model(object):
         :param bool upload_tools: Upload tools if bootstrapping a new machine
 
         """
         :param bool upload_tools: Upload tools if bootstrapping a new machine
 
         """
-        pass
+        raise NotImplementedError()
 
     def retry_provisioning(self):
         """Retry provisioning for failed machines.
 
         """
 
     def retry_provisioning(self):
         """Retry provisioning for failed machines.
 
         """
-        pass
+        raise NotImplementedError()
 
 
-    def revoke(self, username, acl='read'):
+    async def revoke(self, username):
         """Revoke a user's access to this model.
 
         :param str username: Username to revoke
         """Revoke a user's access to this model.
 
         :param str username: Username to revoke
-        :param str acl: Access control ('read' or 'write')
 
         """
 
         """
-        pass
+        controller_conn = await self.connection.controller()
+        model_facade = client.ModelManagerFacade.from_connection(
+            controller_conn)
+        user = tag.user(username)
+        model = tag.model(self.info.uuid)
+        changes = client.ModifyModelAccess('read', 'revoke', model, user)
+        return await model_facade.ModifyModelAccess([changes])
 
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
 
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
@@ -1147,15 +1448,21 @@ class Model(object):
         :param int timeout: Time to wait before command is considered failed
 
         """
         :param int timeout: Time to wait before command is considered failed
 
         """
-        pass
+        raise NotImplementedError()
 
 
-    def set_config(self, **config):
+    async def set_config(self, config):
         """Set configuration keys on this model.
 
         """Set configuration keys on this model.
 
-        :param \*\*config: Config key/values
-
+        :param dict config: Mapping of config keys to either string values or
+            `ConfigValue` instances, as returned by `get_config`.
         """
         """
-        pass
+        config_facade = client.ModelConfigFacade.from_connection(
+            self.connection
+        )
+        for key, value in config.items():
+            if isinstance(value, ConfigValue):
+                config[key] = value.value
+        await config_facade.ModelSet(config)
 
     def set_constraints(self, constraints):
         """Set machine constraints on this model.
 
     def set_constraints(self, constraints):
         """Set machine constraints on this model.
@@ -1163,7 +1470,7 @@ class Model(object):
         :param :class:`juju.Constraints` constraints: Machine constraints
 
         """
         :param :class:`juju.Constraints` constraints: Machine constraints
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_output(self, action_uuid, wait=-1):
         """Get the results of an action by ID.
 
     def get_action_output(self, action_uuid, wait=-1):
         """Get the results of an action by ID.
@@ -1172,7 +1479,7 @@ class Model(object):
         :param int wait: Time in seconds to wait for action to complete
 
         """
         :param int wait: Time in seconds to wait for action to complete
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_status(self, uuid_or_prefix=None, name=None):
         """Get the status of all actions, filtered by ID, ID prefix, or action name.
 
     def get_action_status(self, uuid_or_prefix=None, name=None):
         """Get the status of all actions, filtered by ID, ID prefix, or action name.
@@ -1181,7 +1488,7 @@ class Model(object):
         :param str name: Filter by action name
 
         """
         :param str name: Filter by action name
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_budget(self, budget_name):
         """Get budget usage info.
 
     def get_budget(self, budget_name):
         """Get budget usage info.
@@ -1189,17 +1496,18 @@ class Model(object):
         :param str budget_name: Name of budget
 
         """
         :param str budget_name: Name of budget
 
         """
-        pass
+        raise NotImplementedError()
 
 
-    def get_status(self, filter_=None, utc=False):
+    async def get_status(self, filters=None, utc=False):
         """Return the status of the model.
 
         """Return the status of the model.
 
-        :param str filter_: Service or unit name or wildcard ('*')
+        :param str filters: Optional list of applications, units, or machines
+            to include, which can use wildcards ('*').
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
-    status = get_status
+        client_facade = client.ClientFacade.from_connection(self.connection)
+        return await client_facade.FullStatus(filters)
 
     def sync_tools(
             self, all_=False, destination=None, dry_run=False, public=False,
 
     def sync_tools(
             self, all_=False, destination=None, dry_run=False, public=False,
@@ -1216,7 +1524,7 @@ class Model(object):
         :param str version: Copy a specific major.minor version
 
         """
         :param str version: Copy a specific major.minor version
 
         """
-        pass
+        raise NotImplementedError()
 
     def unblock(self, *commands):
         """Unblock an operation that would alter this model.
 
     def unblock(self, *commands):
         """Unblock an operation that would alter this model.
@@ -1225,7 +1533,7 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def unset_config(self, *keys):
         """Unset configuration on this model.
 
     def unset_config(self, *keys):
         """Unset configuration on this model.
@@ -1233,13 +1541,13 @@ class Model(object):
         :param str \*keys: The keys to unset
 
         """
         :param str \*keys: The keys to unset
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_gui(self):
         """Upgrade the Juju GUI for this model.
 
         """
 
     def upgrade_gui(self):
         """Upgrade the Juju GUI for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_juju(
             self, dry_run=False, reset_previous_upgrade=False,
 
     def upgrade_juju(
             self, dry_run=False, reset_previous_upgrade=False,
@@ -1253,7 +1561,7 @@ class Model(object):
         :param str version: Upgrade to a specific version
 
         """
         :param str version: Upgrade to a specific version
 
         """
-        pass
+        raise NotImplementedError()
 
     def upload_backup(self, archive_path):
         """Store a backup archive remotely in Juju.
 
     def upload_backup(self, archive_path):
         """Store a backup archive remotely in Juju.
@@ -1261,7 +1569,7 @@ class Model(object):
         :param str archive_path: Path to local archive
 
         """
         :param str archive_path: Path to local archive
 
         """
-        pass
+        raise NotImplementedError()
 
     @property
     def charmstore(self):
 
     @property
     def charmstore(self):
@@ -1278,8 +1586,8 @@ class Model(object):
         log.debug("Retrieving metrics for %s",
                   ', '.join(tags) if tags else "all units")
 
         log.debug("Retrieving metrics for %s",
                   ', '.join(tags) if tags else "all units")
 
-        metrics_facade = client.MetricsDebugFacade()
-        metrics_facade.connect(self.connection)
+        metrics_facade = client.MetricsDebugFacade.from_connection(
+            self.connection)
 
         entities = [client.Entity(tag) for tag in tags]
         metrics_result = await metrics_facade.GetMetrics(entities)
 
         entities = [client.Entity(tag) for tag in tags]
         metrics_result = await metrics_facade.GetMetrics(entities)
@@ -1300,6 +1608,21 @@ class Model(object):
         return metrics
 
 
         return metrics
 
 
+def get_charm_series(path):
+    """Inspects the charm directory at ``path`` and returns a default
+    series from its metadata.yaml (the first item in the 'series' list).
+
+    Returns None if no series can be determined.
+
+    """
+    md = Path(path) / "metadata.yaml"
+    if not md.exists():
+        return None
+    data = yaml.load(md.open())
+    series = data.get('series')
+    return series[0] if series else None
+
+
 class BundleHandler(object):
     """
     Handle bundles by using the API to translate bundle YAML into a plan of
 class BundleHandler(object):
     """
     Handle bundles by using the API to translate bundle YAML into a plan of
@@ -1314,12 +1637,60 @@ class BundleHandler(object):
         for unit_name, unit in model.units.items():
             app_units = self._units_by_app.setdefault(unit.application, [])
             app_units.append(unit_name)
         for unit_name, unit in model.units.items():
             app_units = self._units_by_app.setdefault(unit.application, [])
             app_units.append(unit_name)
-        self.client_facade = client.ClientFacade()
-        self.client_facade.connect(model.connection)
-        self.app_facade = client.ApplicationFacade()
-        self.app_facade.connect(model.connection)
-        self.ann_facade = client.AnnotationsFacade()
-        self.ann_facade.connect(model.connection)
+        self.client_facade = client.ClientFacade.from_connection(
+            model.connection)
+        self.app_facade = client.ApplicationFacade.from_connection(
+            model.connection)
+        self.ann_facade = client.AnnotationsFacade.from_connection(
+            model.connection)
+
+    async def _handle_local_charms(self, bundle):
+        """Search for references to local charms (i.e. filesystem paths)
+        in the bundle. Upload the local charms to the model, and replace
+        the filesystem paths with appropriate 'local:' paths in the bundle.
+
+        Return the modified bundle.
+
+        :param dict bundle: Bundle dictionary
+        :return: Modified bundle dictionary
+
+        """
+        apps, args = [], []
+
+        default_series = bundle.get('series')
+        for app_name in self.applications:
+            app_dict = bundle['services'][app_name]
+            charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
+            if not os.path.isdir(charm_dir):
+                continue
+            series = (
+                app_dict.get('series') or
+                default_series or
+                get_charm_series(charm_dir)
+            )
+            if not series:
+                raise JujuError(
+                    "Couldn't determine series for charm at {}. "
+                    "Add a 'series' key to the bundle.".format(charm_dir))
+
+            # Keep track of what we need to update. We keep a list of apps
+            # that need to be updated, and a corresponding list of args
+            # needed to update those apps.
+            apps.append(app_name)
+            args.append((charm_dir, series))
+
+        if apps:
+            # If we have apps to update, spawn all the coroutines concurrently
+            # and wait for them to finish.
+            charm_urls = await asyncio.gather(*[
+                self.model.add_local_charm_dir(*params)
+                for params in args
+            ], loop=self.model.loop)
+            # Update the 'charm:' entry for each app with the new 'local:' url.
+            for app_name, charm_url in zip(apps, charm_urls):
+                bundle['services'][app_name]['charm'] = charm_url
+
+        return bundle
 
     async def fetch_plan(self, entity_id):
         is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
 
     async def fetch_plan(self, entity_id):
         is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
@@ -1330,7 +1701,10 @@ class BundleHandler(object):
                                                       filename='bundle.yaml',
                                                       read_file=True)
         self.bundle = yaml.safe_load(bundle_yaml)
                                                       filename='bundle.yaml',
                                                       read_file=True)
         self.bundle = yaml.safe_load(bundle_yaml)
-        self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
+        self.bundle = await self._handle_local_charms(self.bundle)
+
+        self.plan = await self.client_facade.GetBundleChanges(
+            yaml.dump(self.bundle))
 
     async def execute_plan(self):
         for step in self.plan.changes:
 
     async def execute_plan(self):
         for step in self.plan.changes:
@@ -1356,6 +1730,11 @@ class BundleHandler(object):
             Series holds the series of the charm to be added
             if the charm default is not sufficient.
         """
             Series holds the series of the charm to be added
             if the charm default is not sufficient.
         """
+        # We don't add local charms because they've already been added
+        # by self._handle_local_charms
+        if charm.startswith('local:'):
+            return charm
+
         entity_id = await self.charmstore.entityId(charm)
         log.debug('Adding %s', entity_id)
         await self.client_facade.AddCharm(None, entity_id)
         entity_id = await self.charmstore.entityId(charm)
         log.debug('Adding %s', entity_id)
         await self.client_facade.AddCharm(None, entity_id)
@@ -1406,7 +1785,7 @@ class BundleHandler(object):
         results = await self.client_facade.AddMachines([params])
         error = results.machines[0].error
         if error:
         results = await self.client_facade.AddMachines([params])
         error = results.machines[0].error
         if error:
-            raise ValueError("Error adding machine: %s", error.message)
+            raise ValueError("Error adding machine: %s" % error.message)
         machine = results.machines[0].machine
         log.debug('Added new machine %s', machine)
         return machine
         machine = results.machines[0].machine
         log.debug('Added new machine %s', machine)
         return machine
@@ -1462,24 +1841,21 @@ class BundleHandler(object):
         """
         # resolve indirect references
         charm = self.resolve(charm)
         """
         # resolve indirect references
         charm = self.resolve(charm)
-        # stringify all config values for API
-        options = {k: str(v) for k, v in options.items()}
-        # build param object
-        app = client.ApplicationDeploy(
+        # the bundle plan doesn't actually do anything with resources, even
+        # though it ostensibly gives us something (None) for that param
+        if not charm.startswith('local:'):
+            resources = await self.model._add_store_resources(application,
+                                                              charm)
+        await self.model._deploy(
             charm_url=charm,
             charm_url=charm,
-            series=series,
             application=application,
             application=application,
+            series=series,
             config=options,
             config=options,
-            constraints=parse_constraints(constraints),
-            storage=storage,
+            constraints=constraints,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
+            storage=storage,
         )
         )
-        # do the do
-        log.info('Deploying %s', charm)
-        await self.app_facade.Deploy([app])
-        # ensure the app is in the model for future operations
-        await self.model._wait_for_new('application', application)
         return application
 
     async def addUnit(self, application, to):
         return application
 
     async def addUnit(self, application, to):
@@ -1547,7 +1923,7 @@ class CharmStore(object):
     """
     def __init__(self, loop):
         self.loop = loop
     """
     def __init__(self, loop):
         self.loop = loop
-        self._cs = charmstore.CharmStore()
+        self._cs = theblues.charmstore.CharmStore(timeout=5)
 
     def __getattr__(self, name):
         """
 
     def __getattr__(self, name):
         """
@@ -1561,7 +1937,95 @@ class CharmStore(object):
         else:
             async def coro(*args, **kwargs):
                 method = partial(attr, *args, **kwargs)
         else:
             async def coro(*args, **kwargs):
                 method = partial(attr, *args, **kwargs)
-                return await self.loop.run_in_executor(None, method)
+                for attempt in range(1, 4):
+                    try:
+                        return await self.loop.run_in_executor(None, method)
+                    except theblues.errors.ServerError:
+                        if attempt == 3:
+                            raise
+                        await asyncio.sleep(1, loop=self.loop)
             setattr(self, name, coro)
             wrapper = coro
         return wrapper
             setattr(self, name, coro)
             wrapper = coro
         return wrapper
+
+
+class CharmArchiveGenerator(object):
+    """
+    Create a Zip archive of a local charm directory for upload to a controller.
+
+    This is used automatically by
+    `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_.
+    """
+    def __init__(self, path):
+        self.path = os.path.abspath(os.path.expanduser(path))
+
+    def make_archive(self, path):
+        """Create archive of directory and write to ``path``.
+
+        :param path: Path to archive
+
+        Ignored::
+
+            * build/\* - This is used for packing the charm itself and any
+                          similar tasks.
+            * \*/.\*    - Hidden files are all ignored for now.  This will most
+                          likely be changed into a specific ignore list
+                          (.bzr, etc)
+
+        """
+        zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
+        for dirpath, dirnames, filenames in os.walk(self.path):
+            relative_path = dirpath[len(self.path) + 1:]
+            if relative_path and not self._ignore(relative_path):
+                zf.write(dirpath, relative_path)
+            for name in filenames:
+                archive_name = os.path.join(relative_path, name)
+                if not self._ignore(archive_name):
+                    real_path = os.path.join(dirpath, name)
+                    self._check_type(real_path)
+                    if os.path.islink(real_path):
+                        self._check_link(real_path)
+                        self._write_symlink(
+                            zf, os.readlink(real_path), archive_name)
+                    else:
+                        zf.write(real_path, archive_name)
+        zf.close()
+        return path
+
+    def _check_type(self, path):
+        """Check the path
+        """
+        s = os.stat(path)
+        if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
+            return path
+        raise ValueError("Invalid Charm at % %s" % (
+            path, "Invalid file type for a charm"))
+
+    def _check_link(self, path):
+        link_path = os.readlink(path)
+        if link_path[0] == "/":
+            raise ValueError(
+                "Invalid Charm at %s: %s" % (
+                    path, "Absolute links are invalid"))
+        path_dir = os.path.dirname(path)
+        link_path = os.path.join(path_dir, link_path)
+        if not link_path.startswith(os.path.abspath(self.path)):
+            raise ValueError(
+                "Invalid charm at %s %s" % (
+                    path, "Only internal symlinks are allowed"))
+
+    def _write_symlink(self, zf, link_target, link_path):
+        """Package symlinks with appropriate zipfile metadata."""
+        info = zipfile.ZipInfo()
+        info.filename = link_path
+        info.create_system = 3
+        # Magic code for symlinks / py2/3 compat
+        # 27166663808 = (stat.S_IFLNK | 0755) << 16
+        info.external_attr = 2716663808
+        zf.writestr(info, link_target)
+
+    def _ignore(self, path):
+        if path == "build" or path.startswith("build/"):
+            return True
+        if path.startswith('.'):
+            return True