Fixes based on review
[osm/N2VC.git] / juju / model.py
index 8d6c666..3df2669 100644 (file)
@@ -1,22 +1,30 @@
 import asyncio
 import collections
+import json
 import logging
+import os
 import re
+import stat
+import tempfile
 import weakref
+import zipfile
 from concurrent.futures import CancelledError
 from functools import partial
+from pathlib import Path
 
 import yaml
-from theblues import charmstore
+import theblues.charmstore
+import theblues.errors
 
 from .client import client
 from .client import watcher
 from .client import connection
-from .constraints import parse as parse_constraints
+from .constraints import parse as parse_constraints, normalize_key
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
-from .errors import JujuAPIError
+from .errors import JujuError, JujuAPIError
+from .placement import parse as parse_placement
 
 log = logging.getLogger(__name__)
 
@@ -72,6 +80,14 @@ class ModelObserver(object):
         await method(delta, old, new, model)
 
     async def on_change(self, delta, old, new, model):
+        """Generic model-change handler.
+
+        :param delta: :class:`juju.client.overrides.Delta`
+        :param old: :class:`juju.model.ModelEntity`
+        :param new: :class:`juju.model.ModelEntity`
+        :param model: :class:`juju.model.Model`
+
+        """
         pass
 
 
@@ -205,18 +221,16 @@ class ModelEntity(object):
         self.connected = connected
         self.connection = model.connection
 
+    def __repr__(self):
+        return '<{} entity_id="{}">'.format(type(self).__name__,
+                                            self.entity_id)
+
     def __getattr__(self, name):
         """Fetch object attributes from the underlying data dict held in the
         model.
 
         """
-        if self.data is None:
-            raise DeadEntityException(
-                "Entity {}:{} is dead - its attributes can no longer be "
-                "accessed. Use the .previous() method on this object to get "
-                "a copy of the object at its previous state.".format(
-                    self.entity_type, self.entity_id))
-        return self.data[name]
+        return self.safe_data[name]
 
     def __bool__(self):
         return bool(self.data)
@@ -283,6 +297,22 @@ class ModelEntity(object):
         return self.model.state.entity_data(
             self.entity_type, self.entity_id, self._history_index)
 
+    @property
+    def safe_data(self):
+        """The data dictionary for this entity.
+
+        If this `ModelEntity` points to the dead state, it will
+        raise `DeadEntityException`.
+
+        """
+        if self.data is None:
+            raise DeadEntityException(
+                "Entity {}:{} is dead - its attributes can no longer be "
+                "accessed. Use the .previous() method on this object to get "
+                "a copy of the object at its previous state.".format(
+                    self.entity_type, self.entity_id))
+        return self.data
+
     def previous(self):
         """Return a copy of this object as was at its previous state in
         history.
@@ -345,9 +375,10 @@ class Model(object):
         self.connection = None
         self.observers = weakref.WeakValueDictionary()
         self.state = ModelState(self)
+        self.info = None
         self._watcher_task = None
-        self._watch_shutdown = asyncio.Event(loop=loop)
-        self._watch_received = asyncio.Event(loop=loop)
+        self._watch_shutdown = asyncio.Event(loop=self.loop)
+        self._watch_received = asyncio.Event(loop=self.loop)
         self._charmstore = CharmStore(self.loop)
 
     async def connect(self, *args, **kw):
@@ -356,26 +387,36 @@ class Model(object):
         args and kw are passed through to Connection.connect()
 
         """
+        if 'loop' not in kw:
+            kw['loop'] = self.loop
         self.connection = await connection.Connection.connect(*args, **kw)
-        self._watch()
-        await self._watch_received.wait()
+        await self._after_connect()
 
     async def connect_current(self):
         """Connect to the current Juju model.
 
         """
-        self.connection = await connection.Connection.connect_current()
-        self._watch()
-        await self._watch_received.wait()
+        self.connection = await connection.Connection.connect_current(
+            self.loop)
+        await self._after_connect()
+
+    async def connect_model(self, model_name):
+        """Connect to a specific Juju model by name.
+
+        :param model_name:  Format [controller:][user/]model
+
+        """
+        self.connection = await connection.Connection.connect_model(model_name,
+                                                                    self.loop)
+        await self._after_connect()
 
-    async def connect_model(self, arg):
-        """Connect to a specific Juju model.
-        :param arg:  <controller>:<user/model>
+    async def _after_connect(self):
+        """Run initialization steps after connecting to websocket.
 
         """
-        self.connection = await connection.Connection.connect_model(arg)
         self._watch()
         await self._watch_received.wait()
+        await self.get_info()
 
     async def disconnect(self):
         """Shut down the watcher task and close websockets.
@@ -388,6 +429,59 @@ class Model(object):
             await self.connection.close()
             self.connection = None
 
+    async def add_local_charm_dir(self, charm_dir, series):
+        """Upload a local charm to the model.
+
+        This will automatically generate an archive from
+        the charm dir.
+
+        :param charm_dir: Path to the charm directory
+        :param series: Charm series
+
+        """
+        fh = tempfile.NamedTemporaryFile()
+        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
+        with fh:
+            func = partial(
+                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
+            charm_url = await self.loop.run_in_executor(None, func)
+
+        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
+        return charm_url
+
+    def add_local_charm(self, charm_file, series, size=None):
+        """Upload a local charm archive to the model.
+
+        Returns the 'local:...' url that should be used to deploy the charm.
+
+        :param charm_file: Path to charm zip archive
+        :param series: Charm series
+        :param size: Size of the archive, in bytes
+        :return str: 'local:...' url for deploying the charm
+        :raises: :class:`JujuError` if the upload fails
+
+        Uses an https endpoint at the same host:port as the wss.
+        Supports large file uploads.
+
+        .. warning::
+
+           This method will block. Consider using :meth:`add_local_charm_dir`
+           instead.
+
+        """
+        conn, headers, path_prefix = self.connection.https_connection()
+        path = "%s/charms?series=%s" % (path_prefix, series)
+        headers['Content-Type'] = 'application/zip'
+        if size:
+            headers['Content-Length'] = size
+        conn.request("POST", path, charm_file, headers)
+        response = conn.getresponse()
+        result = response.read().decode()
+        if not response.status == 200:
+            raise JujuError(result)
+        result = json.loads(result)
+        return result['charm-url']
+
     def all_units_idle(self):
         """Return True if all units are idle.
 
@@ -416,14 +510,14 @@ class Model(object):
             lambda: len(self.machines) == 0
         )
 
-    async def block_until(self, *conditions, timeout=None):
+    async def block_until(self, *conditions, timeout=None, wait_period=0.5):
         """Return only after all conditions are true.
 
         """
         async def _block():
             while not all(c() for c in conditions):
-                await asyncio.sleep(0)
-        await asyncio.wait_for(_block(), timeout)
+                await asyncio.sleep(wait_period, loop=self.loop)
+        await asyncio.wait_for(_block(), timeout, loop=self.loop)
 
     @property
     def applications(self):
@@ -449,13 +543,34 @@ class Model(object):
         """
         return self.state.units
 
+    async def get_info(self):
+        """Return a client.ModelInfo object for this Model.
+
+        Retrieves latest info for this Model from the api server. The
+        return value is cached on the Model.info attribute so that the
+        valued may be accessed again without another api call, if
+        desired.
+
+        This method is called automatically when the Model is connected,
+        resulting in Model.info being initialized without requiring an
+        explicit call to this method.
+
+        """
+        facade = client.ClientFacade()
+        facade.connect(self.connection)
+
+        self.info = await facade.ModelInfo()
+        log.debug('Got ModelInfo: %s', vars(self.info))
+
+        return self.info
+
     def add_observer(
             self, callable_, entity_type=None, action=None, entity_id=None,
             predicate=None):
         """Register an "on-model-change" callback
 
         Once the model is connected, ``callable_``
-        will be called each time the model changes. callable_ should
+        will be called each time the model changes. ``callable_`` should
         be Awaitable and accept the following positional arguments:
 
             delta - An instance of :class:`juju.delta.EntityDelta`
@@ -474,14 +589,15 @@ class Model(object):
             model - The :class:`Model` itself.
 
         Events for which ``callable_`` is called can be specified by passing
-        entity_type, action, and/or id_ filter criteria, e.g.:
+        entity_type, action, and/or entitiy_id filter criteria, e.g.::
 
             add_observer(
-                myfunc, entity_type='application', action='add', id_='ubuntu')
+                myfunc,
+                entity_type='application', action='add', entity_id='ubuntu')
 
         For more complex filtering conditions, pass a predicate function. It
         will be called with a delta as its only argument. If the predicate
-        function returns True, the callable_ will be called.
+        function returns True, the ``callable_`` will be called.
 
         """
         observer = _Observer(
@@ -512,7 +628,8 @@ class Model(object):
                         # canceled with it. So we shield them. But this means
                         # they can *never* be canceled.
                         await asyncio.shield(
-                            self._notify_observers(delta, old_obj, new_obj))
+                            self._notify_observers(delta, old_obj, new_obj),
+                            loop=self.loop)
                     self._watch_received.set()
             except CancelledError:
                 log.debug('Closing watcher connection')
@@ -551,7 +668,8 @@ class Model(object):
 
         for o in self.observers:
             if o.cares_about(delta):
-                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+                                      loop=self.loop)
 
     async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
@@ -560,7 +678,7 @@ class Model(object):
 
         :param entity_type: The entity's type.
         :param entity_id: The entity's id.
-        :param action: the type of action (e.g., 'add' or 'change')
+        :param action: the type of action (e.g., 'add', 'change', or 'remove')
         :param predicate: optional callable that must take as an
             argument a delta, and must return a boolean, indicating
             whether the delta contains the specific action we're looking
@@ -575,17 +693,28 @@ class Model(object):
 
         self.add_observer(callback, entity_type, action, entity_id, predicate)
         entity_id = await q.get()
-        return self.state._live_entity_map(entity_type)[entity_id]
+        # object might not be in the entity_map if we were waiting for a
+        # 'remove' action
+        return self.state._live_entity_map(entity_type).get(entity_id)
 
-    async def _wait_for_new(self, entity_type, entity_id, predicate=None):
+    async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
         """Wait for a new object to appear in the Model and return it.
 
         Waits for an object of type ``entity_type`` with id ``entity_id``.
+        If ``entity_id`` is ``None``, it will wait for the first new entity
+        of the correct type.
 
         This coroutine blocks until the new object appears in the model.
 
         """
-        return await self._wait(entity_type, entity_id, 'add', predicate)
+        # if the entity is already in the model, just return it
+        if entity_id in self.state._live_entity_map(entity_type):
+            return self.state._live_entity_map(entity_type)[entity_id]
+        # if we know the entity_id, we can trigger on any action that puts
+        # the enitty into the model; otherwise, we have to watch for the
+        # next "add" action on that entity_type
+        action = 'add' if entity_id is None else None
+        return await self._wait(entity_type, entity_id, action, predicate)
 
     async def wait_for_action(self, action_id):
         """Given an action, wait for it to complete."""
@@ -600,9 +729,8 @@ class Model(object):
 
         return await self._wait('action', action_id, 'change', predicate)
 
-    def add_machine(
-            self, spec=None, constraints=None, disks=None, series=None,
-            count=1):
+    async def add_machine(
+            self, spec=None, constraints=None, disks=None, series=None):
         """Start a new, empty machine and optionally a container, or add a
         container to a machine.
 
@@ -610,25 +738,64 @@ class Model(object):
             Examples::
 
                 (None) - starts a new machine
-                'lxc' - starts a new machine with on lxc container
-                'lxc:4' - starts a new lxc container on machine 4
+                'lxd' - starts a new machine with one lxd container
+                'lxd:4' - starts a new lxd container on machine 4
                 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
                 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
                 'maas2.name' - acquire machine maas2.name on MAAS
-        :param constraints: Machine constraints
-        :type constraints: :class:`juju.Constraints`
-        :param list disks: List of disk :class:`constraints <juju.Constraints>`
-        :param str series: Series
-        :param int count: Number of machines to deploy
 
-        Supported container types are: lxc, lxd, kvm
+        :param dict constraints: Machine constraints
+            Example::
+
+                constraints={
+                    'mem': 256 * MB,
+                }
+
+        :param list disks: List of disk constraint dictionaries
+            Example::
+
+                disks=[{
+                    'pool': 'rootfs',
+                    'size': 10 * GB,
+                    'count': 1,
+                }]
+
+        :param str series: Series, e.g. 'xenial'
+
+        Supported container types are: lxd, kvm
 
         When deploying a container to an existing machine, constraints cannot
         be used.
 
         """
-        pass
-    add_machines = add_machine
+        params = client.AddMachineParams()
+        params.jobs = ['JobHostUnits']
+
+        if spec:
+            placement = parse_placement(spec)
+            if placement:
+                params.placement = placement[0]
+
+        if constraints:
+            params.constraints = client.Value.from_json(constraints)
+
+        if disks:
+            params.disks = [
+                client.Constraints.from_json(o) for o in disks]
+
+        if series:
+            params.series = series
+
+        # Submit the request.
+        client_facade = client.ClientFacade()
+        client_facade.connect(self.connection)
+        results = await client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s", error.message)
+        machine_id = results.machines[0].machine
+        log.debug('Added new machine %s', machine_id)
+        return await self._wait_for_new('machine', machine_id)
 
     async def add_relation(self, relation1, relation2):
         """Add a relation between two applications.
@@ -672,7 +839,7 @@ class Model(object):
         :param \*cidrs: Optional list of existing subnet CIDRs
 
         """
-        pass
+        raise NotImplementedError()
 
     def add_ssh_key(self, key):
         """Add a public SSH key to this model.
@@ -680,7 +847,7 @@ class Model(object):
         :param str key: The public ssh key
 
         """
-        pass
+        raise NotImplementedError()
     add_ssh_keys = add_ssh_key
 
     def add_subnet(self, cidr_or_id, space, *zones):
@@ -691,13 +858,13 @@ class Model(object):
         :param str \*zones: Zone(s) in which the subnet resides
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_backups(self):
         """Retrieve metadata for backups in this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def block(self, *commands):
         """Add a new block to this model.
@@ -706,13 +873,13 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_blocks(self):
         """List blocks for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_cached_images(self, arch=None, kind=None, series=None):
         """Return a list of cached OS images.
@@ -722,7 +889,7 @@ class Model(object):
         :param str series: Filter by image series, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_backup(self, note=None, no_download=False):
         """Create a backup of this model.
@@ -732,7 +899,7 @@ class Model(object):
         :return str: Path to downloaded archive
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_storage_pool(self, name, provider_type, **pool_config):
         """Create or define a storage pool.
@@ -742,7 +909,7 @@ class Model(object):
         :param \*\*pool_config: key/value pool configuration pairs
 
         """
-        pass
+        raise NotImplementedError()
 
     def debug_log(
             self, no_tail=False, exclude_module=None, include_module=None,
@@ -766,17 +933,17 @@ class Model(object):
         :param list exclude: Do not show log messages for these entities
 
         """
-        pass
+        raise NotImplementedError()
 
     async def deploy(
-            self, entity_url, service_name=None, bind=None, budget=None,
+            self, entity_url, application_name=None, bind=None, budget=None,
             channel=None, config=None, constraints=None, force=False,
             num_units=1, plan=None, resources=None, series=None, storage=None,
             to=None):
         """Deploy a new service or bundle.
 
         :param str entity_url: Charm or bundle url
-        :param str service_name: Name to give the service
+        :param str application_name: Name to give the service
         :param dict bind: <charm endpoint>:<network space> pairs
         :param dict budget: <budget name>:<limit> pairs
         :param str channel: Charm store channel from which to retrieve
@@ -791,43 +958,46 @@ class Model(object):
         :param dict resources: <resource name>:<file path> pairs
         :param str series: Series on which to deploy
         :param dict storage: Storage constraints TODO how do these look?
-        :param str to: Placement directive, e.g.::
+        :param to: Placement directive as a string. For example:
 
-            '23' - machine 23
-            'lxc:7' - new lxc container on machine 7
-            '24/lxc/3' - lxc container 3 or machine 24
+            '23' - place on machine 23
+            'lxd:7' - place in new lxd container on machine 7
+            '24/lxd/3' - place in container 3 on machine 24
 
             If None, a new machine is provisioned.
 
 
         TODO::
 
-            - service_name is required; fill this in automatically if not
+            - application_name is required; fill this in automatically if not
               provided by caller
             - series is required; how do we pick a default?
 
         """
-        if to:
-            placement = [
-                client.Placement(**p) for p in to
-            ]
-        else:
-            placement = []
-
         if storage:
             storage = {
                 k: client.Constraints(**v)
                 for k, v in storage.items()
             }
 
-        entity_id = await self.charmstore.entityId(entity_url)
+        is_local = (
+            entity_url.startswith('local:') or
+            os.path.isdir(entity_url)
+        )
+        if is_local:
+            entity_id = entity_url
+        else:
+            entity = await self.charmstore.entity(entity_url)
+            entity_id = entity['Id']
 
-        app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
-        app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
-        if 'bundle/' in entity_id:
+        is_bundle = ((is_local and
+                      (Path(entity_id) / 'bundle.yaml').exists()) or
+                     (not is_local and 'bundle/' in entity_id))
+
+        if is_bundle:
             handler = BundleHandler(self)
             await handler.fetch_plan(entity_id)
             await handler.execute_plan()
@@ -838,38 +1008,100 @@ class Model(object):
                 # haven't made it yet we'll need to wait on them to be added
                 await asyncio.gather(*[
                     asyncio.ensure_future(
-                        self.model._wait_for_new('application', app_name))
+                        self._wait_for_new('application', app_name),
+                        loop=self.loop)
                     for app_name in pending_apps
-                ])
+                ], loop=self.loop)
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
-            log.debug(
-                'Deploying %s', entity_id)
-
-            await client_facade.AddCharm(channel, entity_id)
-            app = client.ApplicationDeploy(
-                application=service_name,
-                channel=channel,
+            if not is_local:
+                if not application_name:
+                    application_name = entity['Meta']['charm-metadata']['Name']
+                if not series and '/' in entity_url:
+                    # try to get the series from the provided charm URL
+                    if entity_url.startswith('cs:'):
+                        parts = entity_url[3:].split('/')
+                    else:
+                        parts = entity_url.split('/')
+                    if parts[0].startswith('~'):
+                        parts.pop(0)
+                    if len(parts) > 1:
+                        # series was specified in the URL
+                        series = parts[0]
+                if not series:
+                    # series was not supplied at all, so use the newest
+                    # supported series according to the charm store
+                    ss = entity['Meta']['supported-series']
+                    series = ss['SupportedSeries'][0]
+                if not channel:
+                    channel = 'stable'
+                await client_facade.AddCharm(channel, entity_id)
+            else:
+                # We have a local charm dir that needs to be uploaded
+                charm_dir = os.path.abspath(
+                    os.path.expanduser(entity_id))
+                series = series or get_charm_series(charm_dir)
+                if not series:
+                    raise JujuError(
+                        "Couldn't determine series for charm at {}. "
+                        "Pass a 'series' kwarg to Model.deploy().".format(
+                            charm_dir))
+                entity_id = await self.add_local_charm_dir(charm_dir, series)
+            return await self._deploy(
                 charm_url=entity_id,
-                config=config,
+                application=application_name,
+                series=series,
+                config=config or {},
                 constraints=constraints,
                 endpoint_bindings=bind,
-                num_units=num_units,
-                placement=placement,
                 resources=resources,
-                series=series,
                 storage=storage,
+                channel=channel,
+                num_units=num_units,
+                placement=parse_placement(to),
             )
 
-            await app_facade.Deploy([app])
-            return await self._wait_for_new('application', service_name)
+    async def _deploy(self, charm_url, application, series, config,
+                      constraints, endpoint_bindings, resources, storage,
+                      channel=None, num_units=None, placement=None):
+        """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
+        """
+        log.info('Deploying %s', charm_url)
+
+        # stringify all config values for API, and convert to YAML
+        config = {k: str(v) for k, v in config.items()}
+        config = yaml.dump({application: config},
+                           default_flow_style=False)
+
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        app = client.ApplicationDeploy(
+            charm_url=charm_url,
+            application=application,
+            series=series,
+            channel=channel,
+            config_yaml=config,
+            constraints=parse_constraints(constraints),
+            endpoint_bindings=endpoint_bindings,
+            num_units=num_units,
+            resources=resources,
+            storage=storage,
+            placement=placement,
+        )
+
+        result = await app_facade.Deploy([app])
+        errors = [r.error.message for r in result.results if r.error]
+        if errors:
+            raise JujuError('\n'.join(errors))
+        return await self._wait_for_new('application', application)
 
     def destroy(self):
         """Terminate all machines and resources for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     async def destroy_unit(self, *unit_names):
         """Destroy units by name.
@@ -883,7 +1115,7 @@ class Model(object):
             's' if len(unit_names) == 1 else '',
             ' '.join(unit_names))
 
-        return await app_facade.Destroy(self.name)
+        return await app_facade.DestroyUnits(list(unit_names))
     destroy_units = destroy_unit
 
     def get_backup(self, archive_id):
@@ -893,7 +1125,7 @@ class Model(object):
         :return str: Path to the archive file
 
         """
-        pass
+        raise NotImplementedError()
 
     def enable_ha(
             self, num_controllers=0, constraints=None, series=None, to=None):
@@ -912,19 +1144,19 @@ class Model(object):
             If None, a new machine is provisioned.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_config(self):
         """Return the configuration settings for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_constraints(self):
         """Return the machine constraints for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def grant(self, username, acl='read'):
         """Grant a user access to this model.
@@ -933,7 +1165,7 @@ class Model(object):
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        raise NotImplementedError()
 
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
@@ -941,7 +1173,7 @@ class Model(object):
         :param str identity: User identity in the form <lp|gh>:<username>
 
         """
-        pass
+        raise NotImplementedError()
     import_ssh_keys = import_ssh_key
 
     def get_machines(self, machine, utc=False):
@@ -951,25 +1183,25 @@ class Model(object):
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_shares(self):
         """Return list of all users with access to this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_spaces(self):
         """Return list of all known spaces, including associated subnets.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_ssh_key(self):
         """Return known SSH keys for this model.
 
         """
-        pass
+        raise NotImplementedError()
     get_ssh_keys = get_ssh_key
 
     def get_storage(self, filesystem=False, volume=False):
@@ -979,7 +1211,7 @@ class Model(object):
         :param bool volume: Include volume storage
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_storage_pools(self, names=None, providers=None):
         """Return list of storage pools.
@@ -988,7 +1220,7 @@ class Model(object):
         :param list providers: Only include pools for these providers
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_subnets(self, space=None, zone=None):
         """Return list of known subnets.
@@ -997,13 +1229,13 @@ class Model(object):
         :param str zone: Only include subnets in this zone
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_blocks(self):
         """Remove all blocks from this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_backup(self, backup_id):
         """Delete a backup.
@@ -1011,7 +1243,7 @@ class Model(object):
         :param str backup_id: The id of the backup to remove
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_cached_images(self, arch=None, kind=None, series=None):
         """Remove cached OS images.
@@ -1021,7 +1253,7 @@ class Model(object):
         :param str series: Image series to remove, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_machine(self, *machine_ids):
         """Remove a machine from this model.
@@ -1029,7 +1261,7 @@ class Model(object):
         :param str \*machine_ids: Ids of the machines to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_machines = remove_machine
 
     def remove_ssh_key(self, *keys):
@@ -1038,7 +1270,7 @@ class Model(object):
         :param str \*keys: Keys to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_ssh_keys = remove_ssh_key
 
     def restore_backup(
@@ -1054,13 +1286,13 @@ class Model(object):
         :param bool upload_tools: Upload tools if bootstrapping a new machine
 
         """
-        pass
+        raise NotImplementedError()
 
     def retry_provisioning(self):
         """Retry provisioning for failed machines.
 
         """
-        pass
+        raise NotImplementedError()
 
     def revoke(self, username, acl='read'):
         """Revoke a user's access to this model.
@@ -1069,7 +1301,7 @@ class Model(object):
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        raise NotImplementedError()
 
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
@@ -1078,7 +1310,7 @@ class Model(object):
         :param int timeout: Time to wait before command is considered failed
 
         """
-        pass
+        raise NotImplementedError()
 
     def set_config(self, **config):
         """Set configuration keys on this model.
@@ -1086,7 +1318,7 @@ class Model(object):
         :param \*\*config: Config key/values
 
         """
-        pass
+        raise NotImplementedError()
 
     def set_constraints(self, constraints):
         """Set machine constraints on this model.
@@ -1094,7 +1326,7 @@ class Model(object):
         :param :class:`juju.Constraints` constraints: Machine constraints
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_output(self, action_uuid, wait=-1):
         """Get the results of an action by ID.
@@ -1103,7 +1335,7 @@ class Model(object):
         :param int wait: Time in seconds to wait for action to complete
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_status(self, uuid_or_prefix=None, name=None):
         """Get the status of all actions, filtered by ID, ID prefix, or action name.
@@ -1112,7 +1344,7 @@ class Model(object):
         :param str name: Filter by action name
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_budget(self, budget_name):
         """Get budget usage info.
@@ -1120,17 +1352,19 @@ class Model(object):
         :param str budget_name: Name of budget
 
         """
-        pass
+        raise NotImplementedError()
 
-    def get_status(self, filter_=None, utc=False):
+    async def get_status(self, filters=None, utc=False):
         """Return the status of the model.
 
-        :param str filter_: Service or unit name or wildcard ('*')
+        :param str filters: Optional list of applications, units, or machines
+            to include, which can use wildcards ('*').
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
-    status = get_status
+        client_facade = client.ClientFacade()
+        client_facade.connect(self.connection)
+        return await client_facade.FullStatus(filters)
 
     def sync_tools(
             self, all_=False, destination=None, dry_run=False, public=False,
@@ -1147,7 +1381,7 @@ class Model(object):
         :param str version: Copy a specific major.minor version
 
         """
-        pass
+        raise NotImplementedError()
 
     def unblock(self, *commands):
         """Unblock an operation that would alter this model.
@@ -1156,7 +1390,7 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def unset_config(self, *keys):
         """Unset configuration on this model.
@@ -1164,13 +1398,13 @@ class Model(object):
         :param str \*keys: The keys to unset
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_gui(self):
         """Upgrade the Juju GUI for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_juju(
             self, dry_run=False, reset_previous_upgrade=False,
@@ -1184,7 +1418,7 @@ class Model(object):
         :param str version: Upgrade to a specific version
 
         """
-        pass
+        raise NotImplementedError()
 
     def upload_backup(self, archive_path):
         """Store a backup archive remotely in Juju.
@@ -1192,12 +1426,59 @@ class Model(object):
         :param str archive_path: Path to local archive
 
         """
-        pass
+        raise NotImplementedError()
 
     @property
     def charmstore(self):
         return self._charmstore
 
+    async def get_metrics(self, *tags):
+        """Retrieve metrics.
+
+        :param str \*tags: Tags of entities from which to retrieve metrics.
+            No tags retrieves the metrics of all units in the model.
+        :return: Dictionary of unit_name:metrics
+
+        """
+        log.debug("Retrieving metrics for %s",
+                  ', '.join(tags) if tags else "all units")
+
+        metrics_facade = client.MetricsDebugFacade()
+        metrics_facade.connect(self.connection)
+
+        entities = [client.Entity(tag) for tag in tags]
+        metrics_result = await metrics_facade.GetMetrics(entities)
+
+        metrics = collections.defaultdict(list)
+
+        for entity_metrics in metrics_result.results:
+            error = entity_metrics.error
+            if error:
+                if "is not a valid tag" in error:
+                    raise ValueError(error.message)
+                else:
+                    raise Exception(error.message)
+
+            for metric in entity_metrics.metrics:
+                metrics[metric.unit].append(vars(metric))
+
+        return metrics
+
+
+def get_charm_series(path):
+    """Inspects the charm directory at ``path`` and returns a default
+    series from its metadata.yaml (the first item in the 'series' list).
+
+    Returns None if no series can be determined.
+
+    """
+    md = Path(path) / "metadata.yaml"
+    if not md.exists():
+        return None
+    data = yaml.load(md.open())
+    series = data.get('series')
+    return series[0] if series else None
+
 
 class BundleHandler(object):
     """
@@ -1220,12 +1501,70 @@ class BundleHandler(object):
         self.ann_facade = client.AnnotationsFacade()
         self.ann_facade.connect(model.connection)
 
+    async def _handle_local_charms(self, bundle):
+        """Search for references to local charms (i.e. filesystem paths)
+        in the bundle. Upload the local charms to the model, and replace
+        the filesystem paths with appropriate 'local:' paths in the bundle.
+
+        Return the modified bundle.
+
+        :param dict bundle: Bundle dictionary
+        :return: Modified bundle dictionary
+
+        """
+        apps, args = [], []
+
+        default_series = bundle.get('series')
+        for app_name in self.applications:
+            app_dict = bundle['services'][app_name]
+            charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
+            if not os.path.isdir(charm_dir):
+                continue
+            series = (
+                app_dict.get('series') or
+                default_series or
+                get_charm_series(charm_dir)
+            )
+            if not series:
+                raise JujuError(
+                    "Couldn't determine series for charm at {}. "
+                    "Add a 'series' key to the bundle.".format(charm_dir))
+
+            # Keep track of what we need to update. We keep a list of apps
+            # that need to be updated, and a corresponding list of args
+            # needed to update those apps.
+            apps.append(app_name)
+            args.append((charm_dir, series))
+
+        if apps:
+            # If we have apps to update, spawn all the coroutines concurrently
+            # and wait for them to finish.
+            charm_urls = await asyncio.gather(*[
+                self.model.add_local_charm_dir(*params)
+                for params in args
+            ], loop=self.model.loop)
+            # Update the 'charm:' entry for each app with the new 'local:' url.
+            for app_name, charm_url in zip(apps, charm_urls):
+                bundle['services'][app_name]['charm'] = charm_url
+
+        return bundle
+
     async def fetch_plan(self, entity_id):
-        bundle_yaml = await self.charmstore.files(entity_id,
-                                                  filename='bundle.yaml',
-                                                  read_file=True)
+        is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
+        if is_local:
+            bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
+        else:
+            bundle_yaml = await self.charmstore.files(entity_id,
+                                                      filename='bundle.yaml',
+                                                      read_file=True)
         self.bundle = yaml.safe_load(bundle_yaml)
-        self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
+        self.bundle = await self._handle_local_charms(self.bundle)
+
+        self.plan = await self.client_facade.GetBundleChanges(
+            yaml.dump(self.bundle))
+
+        if self.plan.errors:
+            raise JujuError('\n'.join(self.plan.errors))
 
     async def execute_plan(self):
         for step in self.plan.changes:
@@ -1251,6 +1590,11 @@ class BundleHandler(object):
             Series holds the series of the charm to be added
             if the charm default is not sufficient.
         """
+        # We don't add local charms because they've already been added
+        # by self._handle_local_charms
+        if charm.startswith('local:'):
+            return charm
+
         entity_id = await self.charmstore.entityId(charm)
         log.debug('Adding %s', entity_id)
         await self.client_facade.AddCharm(None, entity_id)
@@ -1263,19 +1607,27 @@ class BundleHandler(object):
             Keys include:
 
             series: string specifying the machine OS series.
+
             constraints: string holding machine constraints, if any. We'll
                 parse this into the json friendly dict that the juju api
                 expects.
+
             container_type: string holding the type of the container (for
-                instance ""lxc" or kvm"). It is not specified for top level
+                instance ""lxd" or kvm"). It is not specified for top level
                 machines.
+
             parent_id: string holding a placeholder pointing to another
                 machine change or to a unit change. This value is only
                 specified in the case this machine is a container, in
                 which case also ContainerType is set.
+
         """
         params = params or {}
 
+        # Normalize keys
+        params = {normalize_key(k): params[k] for k in params.keys()}
+
+        # Fix up values, as necessary.
         if 'parent_id' in params:
             params['parent_id'] = self.resolve(params['parent_id'])
 
@@ -1283,6 +1635,12 @@ class BundleHandler(object):
             params.get('constraints'))
         params['jobs'] = params.get('jobs', ['JobHostUnits'])
 
+        if params.get('container_type') == 'lxc':
+            log.warning('Juju 2.0 does not support lxc containers. '
+                        'Converting containers to lxd.')
+            params['container_type'] = 'lxd'
+
+        # Submit the request.
         params = client.AddMachineParams(**params)
         results = await self.client_facade.AddMachines([params])
         error = results.machines[0].error
@@ -1343,22 +1701,16 @@ class BundleHandler(object):
         """
         # resolve indirect references
         charm = self.resolve(charm)
-        # stringify all config values for API
-        options = {k: str(v) for k, v in options.items()}
-        # build param object
-        app = client.ApplicationDeploy(
+        await self.model._deploy(
             charm_url=charm,
-            series=series,
             application=application,
+            series=series,
             config=options,
             constraints=constraints,
-            storage=storage,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
+            storage=storage,
         )
-        # do the do
-        log.info('Deploying %s', charm)
-        await self.app_facade.Deploy([app])
         return application
 
     async def addUnit(self, application, to):
@@ -1426,7 +1778,7 @@ class CharmStore(object):
     """
     def __init__(self, loop):
         self.loop = loop
-        self._cs = charmstore.CharmStore()
+        self._cs = theblues.charmstore.CharmStore(timeout=5)
 
     def __getattr__(self, name):
         """
@@ -1440,7 +1792,89 @@ class CharmStore(object):
         else:
             async def coro(*args, **kwargs):
                 method = partial(attr, *args, **kwargs)
-                return await self.loop.run_in_executor(None, method)
+                for attempt in range(1, 4):
+                    try:
+                        return await self.loop.run_in_executor(None, method)
+                    except theblues.errors.ServerError:
+                        if attempt == 3:
+                            raise
+                        await asyncio.sleep(1, loop=self.loop)
             setattr(self, name, coro)
             wrapper = coro
         return wrapper
+
+
+class CharmArchiveGenerator(object):
+    def __init__(self, path):
+        self.path = os.path.abspath(os.path.expanduser(path))
+
+    def make_archive(self, path):
+        """Create archive of directory and write to ``path``.
+
+        :param path: Path to archive
+
+        Ignored::
+
+            * build/\* - This is used for packing the charm itself and any
+                          similar tasks.
+            * \*/.\*    - Hidden files are all ignored for now.  This will most
+                          likely be changed into a specific ignore list
+                          (.bzr, etc)
+
+        """
+        zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
+        for dirpath, dirnames, filenames in os.walk(self.path):
+            relative_path = dirpath[len(self.path) + 1:]
+            if relative_path and not self._ignore(relative_path):
+                zf.write(dirpath, relative_path)
+            for name in filenames:
+                archive_name = os.path.join(relative_path, name)
+                if not self._ignore(archive_name):
+                    real_path = os.path.join(dirpath, name)
+                    self._check_type(real_path)
+                    if os.path.islink(real_path):
+                        self._check_link(real_path)
+                        self._write_symlink(
+                            zf, os.readlink(real_path), archive_name)
+                    else:
+                        zf.write(real_path, archive_name)
+        zf.close()
+        return path
+
+    def _check_type(self, path):
+        """Check the path
+        """
+        s = os.stat(path)
+        if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
+            return path
+        raise ValueError("Invalid Charm at % %s" % (
+            path, "Invalid file type for a charm"))
+
+    def _check_link(self, path):
+        link_path = os.readlink(path)
+        if link_path[0] == "/":
+            raise ValueError(
+                "Invalid Charm at %s: %s" % (
+                    path, "Absolute links are invalid"))
+        path_dir = os.path.dirname(path)
+        link_path = os.path.join(path_dir, link_path)
+        if not link_path.startswith(os.path.abspath(self.path)):
+            raise ValueError(
+                "Invalid charm at %s %s" % (
+                    path, "Only internal symlinks are allowed"))
+
+    def _write_symlink(self, zf, link_target, link_path):
+        """Package symlinks with appropriate zipfile metadata."""
+        info = zipfile.ZipInfo()
+        info.filename = link_path
+        info.create_system = 3
+        # Magic code for symlinks / py2/3 compat
+        # 27166663808 = (stat.S_IFLNK | 0755) << 16
+        info.external_attr = 2716663808
+        zf.writestr(info, link_target)
+
+    def _ignore(self, path):
+        if path == "build" or path.startswith("build/"):
+            return True
+        if path.startswith('.'):
+            return True