Fixes based on review
[osm/N2VC.git] / juju / model.py
index ecd764b..3df2669 100644 (file)
@@ -1,15 +1,20 @@
 import asyncio
 import collections
+import json
 import logging
 import os
 import re
+import stat
+import tempfile
 import weakref
+import zipfile
 from concurrent.futures import CancelledError
 from functools import partial
 from pathlib import Path
 
 import yaml
-from theblues import charmstore
+import theblues.charmstore
+import theblues.errors
 
 from .client import client
 from .client import watcher
@@ -19,6 +24,7 @@ from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
 from .errors import JujuError, JujuAPIError
+from .placement import parse as parse_placement
 
 log = logging.getLogger(__name__)
 
@@ -371,8 +377,8 @@ class Model(object):
         self.state = ModelState(self)
         self.info = None
         self._watcher_task = None
-        self._watch_shutdown = asyncio.Event(loop=loop)
-        self._watch_received = asyncio.Event(loop=loop)
+        self._watch_shutdown = asyncio.Event(loop=self.loop)
+        self._watch_received = asyncio.Event(loop=self.loop)
         self._charmstore = CharmStore(self.loop)
 
     async def connect(self, *args, **kw):
@@ -381,6 +387,8 @@ class Model(object):
         args and kw are passed through to Connection.connect()
 
         """
+        if 'loop' not in kw:
+            kw['loop'] = self.loop
         self.connection = await connection.Connection.connect(*args, **kw)
         await self._after_connect()
 
@@ -388,7 +396,8 @@ class Model(object):
         """Connect to the current Juju model.
 
         """
-        self.connection = await connection.Connection.connect_current()
+        self.connection = await connection.Connection.connect_current(
+            self.loop)
         await self._after_connect()
 
     async def connect_model(self, model_name):
@@ -397,7 +406,8 @@ class Model(object):
         :param model_name:  Format [controller:][user/]model
 
         """
-        self.connection = await connection.Connection.connect_model(model_name)
+        self.connection = await connection.Connection.connect_model(model_name,
+                                                                    self.loop)
         await self._after_connect()
 
     async def _after_connect(self):
@@ -419,6 +429,59 @@ class Model(object):
             await self.connection.close()
             self.connection = None
 
+    async def add_local_charm_dir(self, charm_dir, series):
+        """Upload a local charm to the model.
+
+        This will automatically generate an archive from
+        the charm dir.
+
+        :param charm_dir: Path to the charm directory
+        :param series: Charm series
+
+        """
+        fh = tempfile.NamedTemporaryFile()
+        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
+        with fh:
+            func = partial(
+                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
+            charm_url = await self.loop.run_in_executor(None, func)
+
+        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
+        return charm_url
+
+    def add_local_charm(self, charm_file, series, size=None):
+        """Upload a local charm archive to the model.
+
+        Returns the 'local:...' url that should be used to deploy the charm.
+
+        :param charm_file: Path to charm zip archive
+        :param series: Charm series
+        :param size: Size of the archive, in bytes
+        :return str: 'local:...' url for deploying the charm
+        :raises: :class:`JujuError` if the upload fails
+
+        Uses an https endpoint at the same host:port as the wss.
+        Supports large file uploads.
+
+        .. warning::
+
+           This method will block. Consider using :meth:`add_local_charm_dir`
+           instead.
+
+        """
+        conn, headers, path_prefix = self.connection.https_connection()
+        path = "%s/charms?series=%s" % (path_prefix, series)
+        headers['Content-Type'] = 'application/zip'
+        if size:
+            headers['Content-Length'] = size
+        conn.request("POST", path, charm_file, headers)
+        response = conn.getresponse()
+        result = response.read().decode()
+        if not response.status == 200:
+            raise JujuError(result)
+        result = json.loads(result)
+        return result['charm-url']
+
     def all_units_idle(self):
         """Return True if all units are idle.
 
@@ -453,8 +516,8 @@ class Model(object):
         """
         async def _block():
             while not all(c() for c in conditions):
-                await asyncio.sleep(wait_period)
-        await asyncio.wait_for(_block(), timeout)
+                await asyncio.sleep(wait_period, loop=self.loop)
+        await asyncio.wait_for(_block(), timeout, loop=self.loop)
 
     @property
     def applications(self):
@@ -565,7 +628,8 @@ class Model(object):
                         # canceled with it. So we shield them. But this means
                         # they can *never* be canceled.
                         await asyncio.shield(
-                            self._notify_observers(delta, old_obj, new_obj))
+                            self._notify_observers(delta, old_obj, new_obj),
+                            loop=self.loop)
                     self._watch_received.set()
             except CancelledError:
                 log.debug('Closing watcher connection')
@@ -604,7 +668,8 @@ class Model(object):
 
         for o in self.observers:
             if o.cares_about(delta):
-                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+                                      loop=self.loop)
 
     async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
@@ -613,7 +678,7 @@ class Model(object):
 
         :param entity_type: The entity's type.
         :param entity_id: The entity's id.
-        :param action: the type of action (e.g., 'add' or 'change')
+        :param action: the type of action (e.g., 'add', 'change', or 'remove')
         :param predicate: optional callable that must take as an
             argument a delta, and must return a boolean, indicating
             whether the delta contains the specific action we're looking
@@ -628,7 +693,9 @@ class Model(object):
 
         self.add_observer(callback, entity_type, action, entity_id, predicate)
         entity_id = await q.get()
-        return self.state._live_entity_map(entity_type)[entity_id]
+        # object might not be in the entity_map if we were waiting for a
+        # 'remove' action
+        return self.state._live_entity_map(entity_type).get(entity_id)
 
     async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
         """Wait for a new object to appear in the Model and return it.
@@ -662,9 +729,8 @@ class Model(object):
 
         return await self._wait('action', action_id, 'change', predicate)
 
-    def add_machine(
-            self, spec=None, constraints=None, disks=None, series=None,
-            count=1):
+    async def add_machine(
+            self, spec=None, constraints=None, disks=None, series=None):
         """Start a new, empty machine and optionally a container, or add a
         container to a machine.
 
@@ -672,25 +738,64 @@ class Model(object):
             Examples::
 
                 (None) - starts a new machine
-                'lxc' - starts a new machine with on lxc container
-                'lxc:4' - starts a new lxc container on machine 4
+                'lxd' - starts a new machine with one lxd container
+                'lxd:4' - starts a new lxd container on machine 4
                 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
                 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
                 'maas2.name' - acquire machine maas2.name on MAAS
-        :param constraints: Machine constraints
-        :type constraints: :class:`juju.Constraints`
-        :param list disks: List of disk :class:`constraints <juju.Constraints>`
-        :param str series: Series
-        :param int count: Number of machines to deploy
 
-        Supported container types are: lxc, lxd, kvm
+        :param dict constraints: Machine constraints
+            Example::
+
+                constraints={
+                    'mem': 256 * MB,
+                }
+
+        :param list disks: List of disk constraint dictionaries
+            Example::
+
+                disks=[{
+                    'pool': 'rootfs',
+                    'size': 10 * GB,
+                    'count': 1,
+                }]
+
+        :param str series: Series, e.g. 'xenial'
+
+        Supported container types are: lxd, kvm
 
         When deploying a container to an existing machine, constraints cannot
         be used.
 
         """
-        pass
-    add_machines = add_machine
+        params = client.AddMachineParams()
+        params.jobs = ['JobHostUnits']
+
+        if spec:
+            placement = parse_placement(spec)
+            if placement:
+                params.placement = placement[0]
+
+        if constraints:
+            params.constraints = client.Value.from_json(constraints)
+
+        if disks:
+            params.disks = [
+                client.Constraints.from_json(o) for o in disks]
+
+        if series:
+            params.series = series
+
+        # Submit the request.
+        client_facade = client.ClientFacade()
+        client_facade.connect(self.connection)
+        results = await client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s", error.message)
+        machine_id = results.machines[0].machine
+        log.debug('Added new machine %s', machine_id)
+        return await self._wait_for_new('machine', machine_id)
 
     async def add_relation(self, relation1, relation2):
         """Add a relation between two applications.
@@ -734,7 +839,7 @@ class Model(object):
         :param \*cidrs: Optional list of existing subnet CIDRs
 
         """
-        pass
+        raise NotImplementedError()
 
     def add_ssh_key(self, key):
         """Add a public SSH key to this model.
@@ -742,7 +847,7 @@ class Model(object):
         :param str key: The public ssh key
 
         """
-        pass
+        raise NotImplementedError()
     add_ssh_keys = add_ssh_key
 
     def add_subnet(self, cidr_or_id, space, *zones):
@@ -753,13 +858,13 @@ class Model(object):
         :param str \*zones: Zone(s) in which the subnet resides
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_backups(self):
         """Retrieve metadata for backups in this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def block(self, *commands):
         """Add a new block to this model.
@@ -768,13 +873,13 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_blocks(self):
         """List blocks for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_cached_images(self, arch=None, kind=None, series=None):
         """Return a list of cached OS images.
@@ -784,7 +889,7 @@ class Model(object):
         :param str series: Filter by image series, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_backup(self, note=None, no_download=False):
         """Create a backup of this model.
@@ -794,7 +899,7 @@ class Model(object):
         :return str: Path to downloaded archive
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_storage_pool(self, name, provider_type, **pool_config):
         """Create or define a storage pool.
@@ -804,7 +909,7 @@ class Model(object):
         :param \*\*pool_config: key/value pool configuration pairs
 
         """
-        pass
+        raise NotImplementedError()
 
     def debug_log(
             self, no_tail=False, exclude_module=None, include_module=None,
@@ -828,7 +933,7 @@ class Model(object):
         :param list exclude: Do not show log messages for these entities
 
         """
-        pass
+        raise NotImplementedError()
 
     async def deploy(
             self, entity_url, application_name=None, bind=None, budget=None,
@@ -853,11 +958,11 @@ class Model(object):
         :param dict resources: <resource name>:<file path> pairs
         :param str series: Series on which to deploy
         :param dict storage: Storage constraints TODO how do these look?
-        :param str to: Placement directive, e.g.::
+        :param to: Placement directive as a string. For example:
 
-            '23' - machine 23
-            'lxc:7' - new lxc container on machine 7
-            '24/lxc/3' - lxc container 3 or machine 24
+            '23' - place on machine 23
+            'lxd:7' - place in new lxd container on machine 7
+            '24/lxd/3' - place in container 3 on machine 24
 
             If None, a new machine is provisioned.
 
@@ -869,27 +974,23 @@ class Model(object):
             - series is required; how do we pick a default?
 
         """
-        if to:
-            placement = [
-                client.Placement(**p) for p in to
-            ]
-        else:
-            placement = []
-
         if storage:
             storage = {
                 k: client.Constraints(**v)
                 for k, v in storage.items()
             }
 
-        is_local = not entity_url.startswith('cs:') and \
+        is_local = (
+            entity_url.startswith('local:') or
             os.path.isdir(entity_url)
-        entity_id = await self.charmstore.entityId(entity_url) \
-            if not is_local else entity_url
+        )
+        if is_local:
+            entity_id = entity_url
+        else:
+            entity = await self.charmstore.entity(entity_url)
+            entity_id = entity['Id']
 
-        app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
-        app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
         is_bundle = ((is_local and
@@ -907,38 +1008,100 @@ class Model(object):
                 # haven't made it yet we'll need to wait on them to be added
                 await asyncio.gather(*[
                     asyncio.ensure_future(
-                        self._wait_for_new('application', app_name))
+                        self._wait_for_new('application', app_name),
+                        loop=self.loop)
                     for app_name in pending_apps
-                ])
+                ], loop=self.loop)
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
-            log.debug(
-                'Deploying %s', entity_id)
-
-            await client_facade.AddCharm(channel, entity_id)
-            app = client.ApplicationDeploy(
-                application=application_name,
-                channel=channel,
+            if not is_local:
+                if not application_name:
+                    application_name = entity['Meta']['charm-metadata']['Name']
+                if not series and '/' in entity_url:
+                    # try to get the series from the provided charm URL
+                    if entity_url.startswith('cs:'):
+                        parts = entity_url[3:].split('/')
+                    else:
+                        parts = entity_url.split('/')
+                    if parts[0].startswith('~'):
+                        parts.pop(0)
+                    if len(parts) > 1:
+                        # series was specified in the URL
+                        series = parts[0]
+                if not series:
+                    # series was not supplied at all, so use the newest
+                    # supported series according to the charm store
+                    ss = entity['Meta']['supported-series']
+                    series = ss['SupportedSeries'][0]
+                if not channel:
+                    channel = 'stable'
+                await client_facade.AddCharm(channel, entity_id)
+            else:
+                # We have a local charm dir that needs to be uploaded
+                charm_dir = os.path.abspath(
+                    os.path.expanduser(entity_id))
+                series = series or get_charm_series(charm_dir)
+                if not series:
+                    raise JujuError(
+                        "Couldn't determine series for charm at {}. "
+                        "Pass a 'series' kwarg to Model.deploy().".format(
+                            charm_dir))
+                entity_id = await self.add_local_charm_dir(charm_dir, series)
+            return await self._deploy(
                 charm_url=entity_id,
-                config=config,
-                constraints=parse_constraints(constraints),
+                application=application_name,
+                series=series,
+                config=config or {},
+                constraints=constraints,
                 endpoint_bindings=bind,
-                num_units=num_units,
-                placement=placement,
                 resources=resources,
-                series=series,
                 storage=storage,
+                channel=channel,
+                num_units=num_units,
+                placement=parse_placement(to),
             )
 
-            await app_facade.Deploy([app])
-            return await self._wait_for_new('application', application_name)
+    async def _deploy(self, charm_url, application, series, config,
+                      constraints, endpoint_bindings, resources, storage,
+                      channel=None, num_units=None, placement=None):
+        """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
+        """
+        log.info('Deploying %s', charm_url)
+
+        # stringify all config values for API, and convert to YAML
+        config = {k: str(v) for k, v in config.items()}
+        config = yaml.dump({application: config},
+                           default_flow_style=False)
+
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        app = client.ApplicationDeploy(
+            charm_url=charm_url,
+            application=application,
+            series=series,
+            channel=channel,
+            config_yaml=config,
+            constraints=parse_constraints(constraints),
+            endpoint_bindings=endpoint_bindings,
+            num_units=num_units,
+            resources=resources,
+            storage=storage,
+            placement=placement,
+        )
+
+        result = await app_facade.Deploy([app])
+        errors = [r.error.message for r in result.results if r.error]
+        if errors:
+            raise JujuError('\n'.join(errors))
+        return await self._wait_for_new('application', application)
 
     def destroy(self):
         """Terminate all machines and resources for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     async def destroy_unit(self, *unit_names):
         """Destroy units by name.
@@ -952,7 +1115,7 @@ class Model(object):
             's' if len(unit_names) == 1 else '',
             ' '.join(unit_names))
 
-        return await app_facade.Destroy(self.name)
+        return await app_facade.DestroyUnits(list(unit_names))
     destroy_units = destroy_unit
 
     def get_backup(self, archive_id):
@@ -962,7 +1125,7 @@ class Model(object):
         :return str: Path to the archive file
 
         """
-        pass
+        raise NotImplementedError()
 
     def enable_ha(
             self, num_controllers=0, constraints=None, series=None, to=None):
@@ -981,19 +1144,19 @@ class Model(object):
             If None, a new machine is provisioned.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_config(self):
         """Return the configuration settings for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_constraints(self):
         """Return the machine constraints for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def grant(self, username, acl='read'):
         """Grant a user access to this model.
@@ -1002,7 +1165,7 @@ class Model(object):
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        raise NotImplementedError()
 
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
@@ -1010,7 +1173,7 @@ class Model(object):
         :param str identity: User identity in the form <lp|gh>:<username>
 
         """
-        pass
+        raise NotImplementedError()
     import_ssh_keys = import_ssh_key
 
     def get_machines(self, machine, utc=False):
@@ -1020,25 +1183,25 @@ class Model(object):
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_shares(self):
         """Return list of all users with access to this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_spaces(self):
         """Return list of all known spaces, including associated subnets.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_ssh_key(self):
         """Return known SSH keys for this model.
 
         """
-        pass
+        raise NotImplementedError()
     get_ssh_keys = get_ssh_key
 
     def get_storage(self, filesystem=False, volume=False):
@@ -1048,7 +1211,7 @@ class Model(object):
         :param bool volume: Include volume storage
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_storage_pools(self, names=None, providers=None):
         """Return list of storage pools.
@@ -1057,7 +1220,7 @@ class Model(object):
         :param list providers: Only include pools for these providers
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_subnets(self, space=None, zone=None):
         """Return list of known subnets.
@@ -1066,13 +1229,13 @@ class Model(object):
         :param str zone: Only include subnets in this zone
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_blocks(self):
         """Remove all blocks from this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_backup(self, backup_id):
         """Delete a backup.
@@ -1080,7 +1243,7 @@ class Model(object):
         :param str backup_id: The id of the backup to remove
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_cached_images(self, arch=None, kind=None, series=None):
         """Remove cached OS images.
@@ -1090,7 +1253,7 @@ class Model(object):
         :param str series: Image series to remove, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_machine(self, *machine_ids):
         """Remove a machine from this model.
@@ -1098,7 +1261,7 @@ class Model(object):
         :param str \*machine_ids: Ids of the machines to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_machines = remove_machine
 
     def remove_ssh_key(self, *keys):
@@ -1107,7 +1270,7 @@ class Model(object):
         :param str \*keys: Keys to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_ssh_keys = remove_ssh_key
 
     def restore_backup(
@@ -1123,13 +1286,13 @@ class Model(object):
         :param bool upload_tools: Upload tools if bootstrapping a new machine
 
         """
-        pass
+        raise NotImplementedError()
 
     def retry_provisioning(self):
         """Retry provisioning for failed machines.
 
         """
-        pass
+        raise NotImplementedError()
 
     def revoke(self, username, acl='read'):
         """Revoke a user's access to this model.
@@ -1138,7 +1301,7 @@ class Model(object):
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        raise NotImplementedError()
 
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
@@ -1147,7 +1310,7 @@ class Model(object):
         :param int timeout: Time to wait before command is considered failed
 
         """
-        pass
+        raise NotImplementedError()
 
     def set_config(self, **config):
         """Set configuration keys on this model.
@@ -1155,7 +1318,7 @@ class Model(object):
         :param \*\*config: Config key/values
 
         """
-        pass
+        raise NotImplementedError()
 
     def set_constraints(self, constraints):
         """Set machine constraints on this model.
@@ -1163,7 +1326,7 @@ class Model(object):
         :param :class:`juju.Constraints` constraints: Machine constraints
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_output(self, action_uuid, wait=-1):
         """Get the results of an action by ID.
@@ -1172,7 +1335,7 @@ class Model(object):
         :param int wait: Time in seconds to wait for action to complete
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_status(self, uuid_or_prefix=None, name=None):
         """Get the status of all actions, filtered by ID, ID prefix, or action name.
@@ -1181,7 +1344,7 @@ class Model(object):
         :param str name: Filter by action name
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_budget(self, budget_name):
         """Get budget usage info.
@@ -1189,17 +1352,19 @@ class Model(object):
         :param str budget_name: Name of budget
 
         """
-        pass
+        raise NotImplementedError()
 
-    def get_status(self, filter_=None, utc=False):
+    async def get_status(self, filters=None, utc=False):
         """Return the status of the model.
 
-        :param str filter_: Service or unit name or wildcard ('*')
+        :param str filters: Optional list of applications, units, or machines
+            to include, which can use wildcards ('*').
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
-    status = get_status
+        client_facade = client.ClientFacade()
+        client_facade.connect(self.connection)
+        return await client_facade.FullStatus(filters)
 
     def sync_tools(
             self, all_=False, destination=None, dry_run=False, public=False,
@@ -1216,7 +1381,7 @@ class Model(object):
         :param str version: Copy a specific major.minor version
 
         """
-        pass
+        raise NotImplementedError()
 
     def unblock(self, *commands):
         """Unblock an operation that would alter this model.
@@ -1225,7 +1390,7 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def unset_config(self, *keys):
         """Unset configuration on this model.
@@ -1233,13 +1398,13 @@ class Model(object):
         :param str \*keys: The keys to unset
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_gui(self):
         """Upgrade the Juju GUI for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_juju(
             self, dry_run=False, reset_previous_upgrade=False,
@@ -1253,7 +1418,7 @@ class Model(object):
         :param str version: Upgrade to a specific version
 
         """
-        pass
+        raise NotImplementedError()
 
     def upload_backup(self, archive_path):
         """Store a backup archive remotely in Juju.
@@ -1261,7 +1426,7 @@ class Model(object):
         :param str archive_path: Path to local archive
 
         """
-        pass
+        raise NotImplementedError()
 
     @property
     def charmstore(self):
@@ -1300,6 +1465,21 @@ class Model(object):
         return metrics
 
 
+def get_charm_series(path):
+    """Inspects the charm directory at ``path`` and returns a default
+    series from its metadata.yaml (the first item in the 'series' list).
+
+    Returns None if no series can be determined.
+
+    """
+    md = Path(path) / "metadata.yaml"
+    if not md.exists():
+        return None
+    data = yaml.load(md.open())
+    series = data.get('series')
+    return series[0] if series else None
+
+
 class BundleHandler(object):
     """
     Handle bundles by using the API to translate bundle YAML into a plan of
@@ -1321,6 +1501,54 @@ class BundleHandler(object):
         self.ann_facade = client.AnnotationsFacade()
         self.ann_facade.connect(model.connection)
 
+    async def _handle_local_charms(self, bundle):
+        """Search for references to local charms (i.e. filesystem paths)
+        in the bundle. Upload the local charms to the model, and replace
+        the filesystem paths with appropriate 'local:' paths in the bundle.
+
+        Return the modified bundle.
+
+        :param dict bundle: Bundle dictionary
+        :return: Modified bundle dictionary
+
+        """
+        apps, args = [], []
+
+        default_series = bundle.get('series')
+        for app_name in self.applications:
+            app_dict = bundle['services'][app_name]
+            charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
+            if not os.path.isdir(charm_dir):
+                continue
+            series = (
+                app_dict.get('series') or
+                default_series or
+                get_charm_series(charm_dir)
+            )
+            if not series:
+                raise JujuError(
+                    "Couldn't determine series for charm at {}. "
+                    "Add a 'series' key to the bundle.".format(charm_dir))
+
+            # Keep track of what we need to update. We keep a list of apps
+            # that need to be updated, and a corresponding list of args
+            # needed to update those apps.
+            apps.append(app_name)
+            args.append((charm_dir, series))
+
+        if apps:
+            # If we have apps to update, spawn all the coroutines concurrently
+            # and wait for them to finish.
+            charm_urls = await asyncio.gather(*[
+                self.model.add_local_charm_dir(*params)
+                for params in args
+            ], loop=self.model.loop)
+            # Update the 'charm:' entry for each app with the new 'local:' url.
+            for app_name, charm_url in zip(apps, charm_urls):
+                bundle['services'][app_name]['charm'] = charm_url
+
+        return bundle
+
     async def fetch_plan(self, entity_id):
         is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
         if is_local:
@@ -1330,7 +1558,10 @@ class BundleHandler(object):
                                                       filename='bundle.yaml',
                                                       read_file=True)
         self.bundle = yaml.safe_load(bundle_yaml)
-        self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
+        self.bundle = await self._handle_local_charms(self.bundle)
+
+        self.plan = await self.client_facade.GetBundleChanges(
+            yaml.dump(self.bundle))
 
         if self.plan.errors:
             raise JujuError('\n'.join(self.plan.errors))
@@ -1359,6 +1590,11 @@ class BundleHandler(object):
             Series holds the series of the charm to be added
             if the charm default is not sufficient.
         """
+        # We don't add local charms because they've already been added
+        # by self._handle_local_charms
+        if charm.startswith('local:'):
+            return charm
+
         entity_id = await self.charmstore.entityId(charm)
         log.debug('Adding %s', entity_id)
         await self.client_facade.AddCharm(None, entity_id)
@@ -1465,24 +1701,16 @@ class BundleHandler(object):
         """
         # resolve indirect references
         charm = self.resolve(charm)
-        # stringify all config values for API
-        options = {k: str(v) for k, v in options.items()}
-        # build param object
-        app = client.ApplicationDeploy(
+        await self.model._deploy(
             charm_url=charm,
-            series=series,
             application=application,
+            series=series,
             config=options,
-            constraints=parse_constraints(constraints),
-            storage=storage,
+            constraints=constraints,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
+            storage=storage,
         )
-        # do the do
-        log.info('Deploying %s', charm)
-        await self.app_facade.Deploy([app])
-        # ensure the app is in the model for future operations
-        await self.model._wait_for_new('application', application)
         return application
 
     async def addUnit(self, application, to):
@@ -1550,7 +1778,7 @@ class CharmStore(object):
     """
     def __init__(self, loop):
         self.loop = loop
-        self._cs = charmstore.CharmStore()
+        self._cs = theblues.charmstore.CharmStore(timeout=5)
 
     def __getattr__(self, name):
         """
@@ -1564,7 +1792,89 @@ class CharmStore(object):
         else:
             async def coro(*args, **kwargs):
                 method = partial(attr, *args, **kwargs)
-                return await self.loop.run_in_executor(None, method)
+                for attempt in range(1, 4):
+                    try:
+                        return await self.loop.run_in_executor(None, method)
+                    except theblues.errors.ServerError:
+                        if attempt == 3:
+                            raise
+                        await asyncio.sleep(1, loop=self.loop)
             setattr(self, name, coro)
             wrapper = coro
         return wrapper
+
+
+class CharmArchiveGenerator(object):
+    def __init__(self, path):
+        self.path = os.path.abspath(os.path.expanduser(path))
+
+    def make_archive(self, path):
+        """Create archive of directory and write to ``path``.
+
+        :param path: Path to archive
+
+        Ignored::
+
+            * build/\* - This is used for packing the charm itself and any
+                          similar tasks.
+            * \*/.\*    - Hidden files are all ignored for now.  This will most
+                          likely be changed into a specific ignore list
+                          (.bzr, etc)
+
+        """
+        zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
+        for dirpath, dirnames, filenames in os.walk(self.path):
+            relative_path = dirpath[len(self.path) + 1:]
+            if relative_path and not self._ignore(relative_path):
+                zf.write(dirpath, relative_path)
+            for name in filenames:
+                archive_name = os.path.join(relative_path, name)
+                if not self._ignore(archive_name):
+                    real_path = os.path.join(dirpath, name)
+                    self._check_type(real_path)
+                    if os.path.islink(real_path):
+                        self._check_link(real_path)
+                        self._write_symlink(
+                            zf, os.readlink(real_path), archive_name)
+                    else:
+                        zf.write(real_path, archive_name)
+        zf.close()
+        return path
+
+    def _check_type(self, path):
+        """Check the path
+        """
+        s = os.stat(path)
+        if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
+            return path
+        raise ValueError("Invalid Charm at % %s" % (
+            path, "Invalid file type for a charm"))
+
+    def _check_link(self, path):
+        link_path = os.readlink(path)
+        if link_path[0] == "/":
+            raise ValueError(
+                "Invalid Charm at %s: %s" % (
+                    path, "Absolute links are invalid"))
+        path_dir = os.path.dirname(path)
+        link_path = os.path.join(path_dir, link_path)
+        if not link_path.startswith(os.path.abspath(self.path)):
+            raise ValueError(
+                "Invalid charm at %s %s" % (
+                    path, "Only internal symlinks are allowed"))
+
+    def _write_symlink(self, zf, link_target, link_path):
+        """Package symlinks with appropriate zipfile metadata."""
+        info = zipfile.ZipInfo()
+        info.filename = link_path
+        info.create_system = 3
+        # Magic code for symlinks / py2/3 compat
+        # 27166663808 = (stat.S_IFLNK | 0755) << 16
+        info.external_attr = 2716663808
+        zf.writestr(info, link_target)
+
+    def _ignore(self, path):
+        if path == "build" or path.startswith("build/"):
+            return True
+        if path.startswith('.'):
+            return True