Replace pass with NotImplementedError in method stubs
[osm/N2VC.git] / juju / model.py
index fe07f80..1177f00 100644 (file)
@@ -1,33 +1,92 @@
 import asyncio
 import collections
+import json
 import logging
+import os
+import re
+import stat
+import tempfile
+import weakref
+import zipfile
 from concurrent.futures import CancelledError
 from functools import partial
+from pathlib import Path
 
+import yaml
 from theblues import charmstore
 
 from .client import client
 from .client import watcher
 from .client import connection
+from .constraints import parse as parse_constraints, normalize_key
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
-from .errors import JujuAPIError
+from .errors import JujuError, JujuAPIError
+from .placement import parse as parse_placement
 
 log = logging.getLogger(__name__)
 
 
+class _Observer(object):
+    """Wrapper around an observer callable.
+
+    This wrapper allows filter criteria to be associated with the
+    callable so that it's only called for changes that meet the criteria.
+
+    """
+    def __init__(self, callable_, entity_type, action, entity_id, predicate):
+        self.callable_ = callable_
+        self.entity_type = entity_type
+        self.action = action
+        self.entity_id = entity_id
+        self.predicate = predicate
+        if self.entity_id:
+            self.entity_id = str(self.entity_id)
+            if not self.entity_id.startswith('^'):
+                self.entity_id = '^' + self.entity_id
+            if not self.entity_id.endswith('$'):
+                self.entity_id += '$'
+
+    async def __call__(self, delta, old, new, model):
+        await self.callable_(delta, old, new, model)
+
+    def cares_about(self, delta):
+        """Return True if this observer "cares about" (i.e. wants to be
+        called) for a this delta.
+
+        """
+        if (self.entity_id and delta.get_id() and
+                not re.match(self.entity_id, str(delta.get_id()))):
+            return False
+
+        if self.entity_type and self.entity_type != delta.entity:
+            return False
+
+        if self.action and self.action != delta.type:
+            return False
+
+        if self.predicate and not self.predicate(delta):
+            return False
+
+        return True
+
+
 class ModelObserver(object):
     async def __call__(self, delta, old, new, model):
-        if old is None and new is not None:
-            type_ = 'add'
-        else:
-            type_ = delta.type
-        handler_name = 'on_{}_{}'.format(delta.entity, type_)
+        handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
         method = getattr(self, handler_name, self.on_change)
         await method(delta, old, new, model)
 
     async def on_change(self, delta, old, new, model):
+        """Generic model-change handler.
+
+        :param delta: :class:`juju.client.overrides.Delta`
+        :param old: :class:`juju.model.ModelEntity`
+        :param new: :class:`juju.model.ModelEntity`
+        :param model: :class:`juju.model.Model`
+
+        """
         pass
 
 
@@ -116,12 +175,18 @@ class ModelState(object):
 
     def get_entity(
             self, entity_type, entity_id, history_index=-1, connected=True):
-        """Return an object instance representing the entity created or
-        updated by ``delta``
+        """Return an object instance for the given entity_type and id.
+
+        By default the object state matches the most recent state from
+        Juju. To get an instance of the object in an older state, pass
+        history_index, an index into the history deque for the entity.
 
         """
+
         if history_index < 0 and history_index != -1:
             history_index += len(self.entity_history(entity_type, entity_id))
+            if history_index < 0:
+                return None
 
         try:
             self.entity_data(entity_type, entity_id, history_index)
@@ -155,18 +220,33 @@ class ModelEntity(object):
         self.connected = connected
         self.connection = model.connection
 
+    def __repr__(self):
+        return '<{} entity_id="{}">'.format(type(self).__name__,
+                                            self.entity_id)
+
     def __getattr__(self, name):
         """Fetch object attributes from the underlying data dict held in the
         model.
 
         """
-        if self.data is None:
-            raise DeadEntityException(
-                "Entity {}:{} is dead - its attributes can no longer be "
-                "accessed. Use the .previous() method on this object to get "
-                "a copy of the object at its previous state.".format(
-                    self.entity_type, self.entity_id))
-        return self.data[name]
+        return self.safe_data[name]
+
+    def __bool__(self):
+        return bool(self.data)
+
+    def on_change(self, callable_):
+        """Add a change observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'change', self.entity_id)
+
+    def on_remove(self, callable_):
+        """Add a remove observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'remove', self.entity_id)
 
     @property
     def entity_type(self):
@@ -182,7 +262,7 @@ class ModelEntity(object):
         entity in the underlying model.
 
         This will be True except when the object represents an entity at a
-        prior state in history, e.g. if the object was obtained by calling
+        non-latest state in history, e.g. if the object was obtained by calling
         .previous() on another object.
 
         """
@@ -216,6 +296,22 @@ class ModelEntity(object):
         return self.model.state.entity_data(
             self.entity_type, self.entity_id, self._history_index)
 
+    @property
+    def safe_data(self):
+        """The data dictionary for this entity.
+
+        If this `ModelEntity` points to the dead state, it will
+        raise `DeadEntityException`.
+
+        """
+        if self.data is None:
+            raise DeadEntityException(
+                "Entity {}:{} is dead - its attributes can no longer be "
+                "accessed. Use the .previous() method on this object to get "
+                "a copy of the object at its previous state.".format(
+                    self.entity_type, self.entity_id))
+        return self.data
+
     def previous(self):
         """Return a copy of this object as was at its previous state in
         history.
@@ -276,20 +372,46 @@ class Model(object):
         """
         self.loop = loop or asyncio.get_event_loop()
         self.connection = None
-        self.observers = set()
+        self.observers = weakref.WeakValueDictionary()
         self.state = ModelState(self)
+        self.info = None
         self._watcher_task = None
         self._watch_shutdown = asyncio.Event(loop=loop)
         self._watch_received = asyncio.Event(loop=loop)
         self._charmstore = CharmStore(self.loop)
 
+    async def connect(self, *args, **kw):
+        """Connect to an arbitrary Juju model.
+
+        args and kw are passed through to Connection.connect()
+
+        """
+        self.connection = await connection.Connection.connect(*args, **kw)
+        await self._after_connect()
+
     async def connect_current(self):
         """Connect to the current Juju model.
 
         """
         self.connection = await connection.Connection.connect_current()
+        await self._after_connect()
+
+    async def connect_model(self, model_name):
+        """Connect to a specific Juju model by name.
+
+        :param model_name:  Format [controller:][user/]model
+
+        """
+        self.connection = await connection.Connection.connect_model(model_name)
+        await self._after_connect()
+
+    async def _after_connect(self):
+        """Run initialization steps after connecting to websocket.
+
+        """
         self._watch()
         await self._watch_received.wait()
+        await self.get_info()
 
     async def disconnect(self):
         """Shut down the watcher task and close websockets.
@@ -302,6 +424,59 @@ class Model(object):
             await self.connection.close()
             self.connection = None
 
+    async def add_local_charm_dir(self, charm_dir, series):
+        """Upload a local charm to the model.
+
+        This will automatically generate an archive from
+        the charm dir.
+
+        :param charm_dir: Path to the charm directory
+        :param series: Charm series
+
+        """
+        fh = tempfile.NamedTemporaryFile()
+        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
+        with fh:
+            func = partial(
+                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
+            charm_url = await self.loop.run_in_executor(None, func)
+
+        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
+        return charm_url
+
+    def add_local_charm(self, charm_file, series, size=None):
+        """Upload a local charm archive to the model.
+
+        Returns the 'local:...' url that should be used to deploy the charm.
+
+        :param charm_file: Path to charm zip archive
+        :param series: Charm series
+        :param size: Size of the archive, in bytes
+        :return str: 'local:...' url for deploying the charm
+        :raises: :class:`JujuError` if the upload fails
+
+        Uses an https endpoint at the same host:port as the wss.
+        Supports large file uploads.
+
+        .. warning::
+
+           This method will block. Consider using :meth:`add_local_charm_dir`
+           instead.
+
+        """
+        conn, headers, path_prefix = self.connection.https_connection()
+        path = "%s/charms?series=%s" % (path_prefix, series)
+        headers['Content-Type'] = 'application/zip'
+        if size:
+            headers['Content-Length'] = size
+        conn.request("POST", path, charm_file, headers)
+        response = conn.getresponse()
+        result = response.read().decode()
+        if not response.status == 200:
+            raise JujuError(result)
+        result = json.loads(result)
+        return result['charm-url']
+
     def all_units_idle(self):
         """Return True if all units are idle.
 
@@ -330,13 +505,13 @@ class Model(object):
             lambda: len(self.machines) == 0
         )
 
-    async def block_until(self, *conditions, timeout=None):
+    async def block_until(self, *conditions, timeout=None, wait_period=0.5):
         """Return only after all conditions are true.
 
         """
         async def _block():
             while not all(c() for c in conditions):
-                await asyncio.sleep(.1)
+                await asyncio.sleep(wait_period)
         await asyncio.wait_for(_block(), timeout)
 
     @property
@@ -363,11 +538,34 @@ class Model(object):
         """
         return self.state.units
 
-    def add_observer(self, callable_):
+    async def get_info(self):
+        """Return a client.ModelInfo object for this Model.
+
+        Retrieves latest info for this Model from the api server. The
+        return value is cached on the Model.info attribute so that the
+        valued may be accessed again without another api call, if
+        desired.
+
+        This method is called automatically when the Model is connected,
+        resulting in Model.info being initialized without requiring an
+        explicit call to this method.
+
+        """
+        facade = client.ClientFacade()
+        facade.connect(self.connection)
+
+        self.info = await facade.ModelInfo()
+        log.debug('Got ModelInfo: %s', vars(self.info))
+
+        return self.info
+
+    def add_observer(
+            self, callable_, entity_type=None, action=None, entity_id=None,
+            predicate=None):
         """Register an "on-model-change" callback
 
-        Once a watch is started (Model.watch() is called), ``callable_``
-        will be called each time the model changes. callable_ should
+        Once the model is connected, ``callable_``
+        will be called each time the model changes. ``callable_`` should
         be Awaitable and accept the following positional arguments:
 
             delta - An instance of :class:`juju.delta.EntityDelta`
@@ -385,8 +583,21 @@ class Model(object):
 
             model - The :class:`Model` itself.
 
+        Events for which ``callable_`` is called can be specified by passing
+        entity_type, action, and/or entitiy_id filter criteria, e.g.::
+
+            add_observer(
+                myfunc,
+                entity_type='application', action='add', entity_id='ubuntu')
+
+        For more complex filtering conditions, pass a predicate function. It
+        will be called with a delta as its only argument. If the predicate
+        function returns True, the ``callable_`` will be called.
+
         """
-        self.observers.add(callable_)
+        observer = _Observer(
+            callable_, entity_type, action, entity_id, predicate)
+        self.observers[observer] = callable_
 
     def _watch(self):
         """Start an asynchronous watch against this model.
@@ -442,15 +653,77 @@ class Model(object):
             by applying this delta.
 
         """
+        if new_obj and not old_obj:
+            delta.type = 'add'
+
         log.debug(
             'Model changed: %s %s %s',
             delta.entity, delta.type, delta.get_id())
+
         for o in self.observers:
-            asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+            if o.cares_about(delta):
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+
+    async def _wait(self, entity_type, entity_id, action, predicate=None):
+        """
+        Block the calling routine until a given action has happened to the
+        given entity
+
+        :param entity_type: The entity's type.
+        :param entity_id: The entity's id.
+        :param action: the type of action (e.g., 'add', 'change', or 'remove')
+        :param predicate: optional callable that must take as an
+            argument a delta, and must return a boolean, indicating
+            whether the delta contains the specific action we're looking
+            for. For example, you might check to see whether a 'change'
+            has a 'completed' status. See the _Observer class for details.
+
+        """
+        q = asyncio.Queue(loop=self.loop)
 
-    def add_machine(
-            self, spec=None, constraints=None, disks=None, series=None,
-            count=1):
+        async def callback(delta, old, new, model):
+            await q.put(delta.get_id())
+
+        self.add_observer(callback, entity_type, action, entity_id, predicate)
+        entity_id = await q.get()
+        # object might not be in the entity_map if we were waiting for a
+        # 'remove' action
+        return self.state._live_entity_map(entity_type).get(entity_id)
+
+    async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
+        """Wait for a new object to appear in the Model and return it.
+
+        Waits for an object of type ``entity_type`` with id ``entity_id``.
+        If ``entity_id`` is ``None``, it will wait for the first new entity
+        of the correct type.
+
+        This coroutine blocks until the new object appears in the model.
+
+        """
+        # if the entity is already in the model, just return it
+        if entity_id in self.state._live_entity_map(entity_type):
+            return self.state._live_entity_map(entity_type)[entity_id]
+        # if we know the entity_id, we can trigger on any action that puts
+        # the enitty into the model; otherwise, we have to watch for the
+        # next "add" action on that entity_type
+        action = 'add' if entity_id is None else None
+        return await self._wait(entity_type, entity_id, action, predicate)
+
+    async def wait_for_action(self, action_id):
+        """Given an action, wait for it to complete."""
+
+        if action_id.startswith("action-"):
+            # if we've been passed action.tag, transform it into the
+            # id that the api deltas will use.
+            action_id = action_id[7:]
+
+        def predicate(delta):
+            return delta.data['status'] in ('completed', 'failed')
+
+        return await self._wait('action', action_id, 'change', predicate)
+
+    async def add_machine(
+            self, spec=None, constraints=None, disks=None, series=None):
         """Start a new, empty machine and optionally a container, or add a
         container to a machine.
 
@@ -458,25 +731,64 @@ class Model(object):
             Examples::
 
                 (None) - starts a new machine
-                'lxc' - starts a new machine with on lxc container
-                'lxc:4' - starts a new lxc container on machine 4
+                'lxd' - starts a new machine with one lxd container
+                'lxd:4' - starts a new lxd container on machine 4
                 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
                 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
                 'maas2.name' - acquire machine maas2.name on MAAS
-        :param constraints: Machine constraints
-        :type constraints: :class:`juju.Constraints`
-        :param list disks: List of disk :class:`constraints <juju.Constraints>`
-        :param str series: Series
-        :param int count: Number of machines to deploy
 
-        Supported container types are: lxc, lxd, kvm
+        :param dict constraints: Machine constraints
+            Example::
+
+                constraints={
+                    'mem': 256 * MB,
+                }
+
+        :param list disks: List of disk constraint dictionaries
+            Example::
+
+                disks=[{
+                    'pool': 'rootfs',
+                    'size': 10 * GB,
+                    'count': 1,
+                }]
+
+        :param str series: Series, e.g. 'xenial'
+
+        Supported container types are: lxd, kvm
 
         When deploying a container to an existing machine, constraints cannot
         be used.
 
         """
-        pass
-    add_machines = add_machine
+        params = client.AddMachineParams()
+        params.jobs = ['JobHostUnits']
+
+        if spec:
+            placement = parse_placement(spec)
+            if placement:
+                params.placement = placement[0]
+
+        if constraints:
+            params.constraints = client.Value.from_json(constraints)
+
+        if disks:
+            params.disks = [
+                client.Constraints.from_json(o) for o in disks]
+
+        if series:
+            params.series = series
+
+        # Submit the request.
+        client_facade = client.ClientFacade()
+        client_facade.connect(self.connection)
+        results = await client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s", error.message)
+        machine_id = results.machines[0].machine
+        log.debug('Added new machine %s', machine_id)
+        return await self._wait_for_new('machine', machine_id)
 
     async def add_relation(self, relation1, relation2):
         """Add a relation between two applications.
@@ -491,7 +803,24 @@ class Model(object):
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
-        return await app_facade.AddRelation([relation1, relation2])
+        try:
+            result = await app_facade.AddRelation([relation1, relation2])
+        except JujuAPIError as e:
+            if 'relation already exists' not in e.message:
+                raise
+            log.debug(
+                'Relation %s <-> %s already exists', relation1, relation2)
+            # TODO: if relation already exists we should return the
+            # Relation ModelEntity here
+            return None
+
+        def predicate(delta):
+            endpoints = {}
+            for endpoint in delta.data['endpoints']:
+                endpoints[endpoint['application-name']] = endpoint['relation']
+            return endpoints == result.endpoints
+
+        return await self._wait_for_new('relation', None, predicate)
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
@@ -503,7 +832,7 @@ class Model(object):
         :param \*cidrs: Optional list of existing subnet CIDRs
 
         """
-        pass
+        raise NotImplementedError()
 
     def add_ssh_key(self, key):
         """Add a public SSH key to this model.
@@ -511,7 +840,7 @@ class Model(object):
         :param str key: The public ssh key
 
         """
-        pass
+        raise NotImplementedError()
     add_ssh_keys = add_ssh_key
 
     def add_subnet(self, cidr_or_id, space, *zones):
@@ -522,13 +851,13 @@ class Model(object):
         :param str \*zones: Zone(s) in which the subnet resides
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_backups(self):
         """Retrieve metadata for backups in this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def block(self, *commands):
         """Add a new block to this model.
@@ -537,13 +866,13 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_blocks(self):
         """List blocks for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_cached_images(self, arch=None, kind=None, series=None):
         """Return a list of cached OS images.
@@ -553,7 +882,7 @@ class Model(object):
         :param str series: Filter by image series, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_backup(self, note=None, no_download=False):
         """Create a backup of this model.
@@ -563,7 +892,7 @@ class Model(object):
         :return str: Path to downloaded archive
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_storage_pool(self, name, provider_type, **pool_config):
         """Create or define a storage pool.
@@ -573,7 +902,7 @@ class Model(object):
         :param \*\*pool_config: key/value pool configuration pairs
 
         """
-        pass
+        raise NotImplementedError()
 
     def debug_log(
             self, no_tail=False, exclude_module=None, include_module=None,
@@ -597,17 +926,17 @@ class Model(object):
         :param list exclude: Do not show log messages for these entities
 
         """
-        pass
+        raise NotImplementedError()
 
     async def deploy(
-            self, entity_url, service_name=None, bind=None, budget=None,
+            self, entity_url, application_name=None, bind=None, budget=None,
             channel=None, config=None, constraints=None, force=False,
             num_units=1, plan=None, resources=None, series=None, storage=None,
             to=None):
         """Deploy a new service or bundle.
 
         :param str entity_url: Charm or bundle url
-        :param str service_name: Name to give the service
+        :param str application_name: Name to give the service
         :param dict bind: <charm endpoint>:<network space> pairs
         :param dict budget: <budget name>:<limit> pairs
         :param str channel: Charm store channel from which to retrieve
@@ -622,31 +951,24 @@ class Model(object):
         :param dict resources: <resource name>:<file path> pairs
         :param str series: Series on which to deploy
         :param dict storage: Storage constraints TODO how do these look?
-        :param str to: Placement directive, e.g.::
+        :param to: Placement directive as a string. For example:
 
-            '23' - machine 23
-            'lxc:7' - new lxc container on machine 7
-            '24/lxc/3' - lxc container 3 or machine 24
+            '23' - place on machine 23
+            'lxd:7' - place in new lxd container on machine 7
+            '24/lxd/3' - place in container 3 on machine 24
 
             If None, a new machine is provisioned.
 
 
         TODO::
 
-            - entity_url must have a revision; look up latest automatically if
-              not provided by caller
-            - service_name is required; fill this in automatically if not
+            - application_name is required; fill this in automatically if not
               provided by caller
             - series is required; how do we pick a default?
 
         """
-        if constraints:
-            constraints = client.Value(**constraints)
-
         if to:
-            placement = [
-                client.Placement(**p) for p in to
-            ]
+            placement = parse_placement(to)
         else:
             placement = []
 
@@ -656,43 +978,96 @@ class Model(object):
                 for k, v in storage.items()
             }
 
-        entity_id = await self.charmstore.entityId(entity_url)
+        is_local = (
+            entity_url.startswith('local:') or
+            os.path.isdir(entity_url)
+        )
+        entity_id = await self.charmstore.entityId(entity_url) \
+            if not is_local else entity_url
 
         app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
         app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
-        if 'bundle/' in entity_id:
+        is_bundle = ((is_local and
+                      (Path(entity_id) / 'bundle.yaml').exists()) or
+                     (not is_local and 'bundle/' in entity_id))
+
+        if is_bundle:
             handler = BundleHandler(self)
             await handler.fetch_plan(entity_id)
             await handler.execute_plan()
+            extant_apps = {app for app in self.applications}
+            pending_apps = set(handler.applications) - extant_apps
+            if pending_apps:
+                # new apps will usually be in the model by now, but if some
+                # haven't made it yet we'll need to wait on them to be added
+                await asyncio.gather(*[
+                    asyncio.ensure_future(
+                        self._wait_for_new('application', app_name))
+                    for app_name in pending_apps
+                ])
+            return [app for name, app in self.applications.items()
+                    if name in handler.applications]
         else:
             log.debug(
                 'Deploying %s', entity_id)
 
-            await client_facade.AddCharm(channel, entity_id)
+            if not is_local:
+                await client_facade.AddCharm(channel, entity_id)
+            elif not entity_id.startswith('local:'):
+                # We have a local charm dir that needs to be uploaded
+                charm_dir = os.path.abspath(
+                    os.path.expanduser(entity_id))
+                series = series or get_charm_series(charm_dir)
+                if not series:
+                    raise JujuError(
+                        "Couldn't determine series for charm at {}. "
+                        "Pass a 'series' kwarg to Model.deploy().".format(
+                            charm_dir))
+                entity_id = await self.add_local_charm_dir(charm_dir, series)
+
             app = client.ApplicationDeploy(
-                application=service_name,
+                application=application_name,
                 channel=channel,
                 charm_url=entity_id,
                 config=config,
-                constraints=constraints,
+                constraints=parse_constraints(constraints),
                 endpoint_bindings=bind,
                 num_units=num_units,
-                placement=placement,
                 resources=resources,
                 series=series,
                 storage=storage,
             )
+            app.placement = placement
 
-            return await app_facade.Deploy([app])
+            result = await app_facade.Deploy([app])
+            errors = [r.error.message for r in result.results if r.error]
+            if errors:
+                raise JujuError('\n'.join(errors))
+            return await self._wait_for_new('application', application_name)
 
     def destroy(self):
         """Terminate all machines and resources for this model.
 
         """
-        pass
+        raise NotImplementedError()
+
+    async def destroy_unit(self, *unit_names):
+        """Destroy units by name.
+
+        """
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        log.debug(
+            'Destroying unit%s %s',
+            's' if len(unit_names) == 1 else '',
+            ' '.join(unit_names))
+
+        return await app_facade.DestroyUnits(list(unit_names))
+    destroy_units = destroy_unit
 
     def get_backup(self, archive_id):
         """Download a backup archive file.
@@ -701,7 +1076,7 @@ class Model(object):
         :return str: Path to the archive file
 
         """
-        pass
+        raise NotImplementedError()
 
     def enable_ha(
             self, num_controllers=0, constraints=None, series=None, to=None):
@@ -720,19 +1095,19 @@ class Model(object):
             If None, a new machine is provisioned.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_config(self):
         """Return the configuration settings for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_constraints(self):
         """Return the machine constraints for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def grant(self, username, acl='read'):
         """Grant a user access to this model.
@@ -741,7 +1116,7 @@ class Model(object):
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        raise NotImplementedError()
 
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
@@ -749,7 +1124,7 @@ class Model(object):
         :param str identity: User identity in the form <lp|gh>:<username>
 
         """
-        pass
+        raise NotImplementedError()
     import_ssh_keys = import_ssh_key
 
     def get_machines(self, machine, utc=False):
@@ -759,25 +1134,25 @@ class Model(object):
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_shares(self):
         """Return list of all users with access to this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_spaces(self):
         """Return list of all known spaces, including associated subnets.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_ssh_key(self):
         """Return known SSH keys for this model.
 
         """
-        pass
+        raise NotImplementedError()
     get_ssh_keys = get_ssh_key
 
     def get_storage(self, filesystem=False, volume=False):
@@ -787,7 +1162,7 @@ class Model(object):
         :param bool volume: Include volume storage
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_storage_pools(self, names=None, providers=None):
         """Return list of storage pools.
@@ -796,7 +1171,7 @@ class Model(object):
         :param list providers: Only include pools for these providers
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_subnets(self, space=None, zone=None):
         """Return list of known subnets.
@@ -805,13 +1180,13 @@ class Model(object):
         :param str zone: Only include subnets in this zone
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_blocks(self):
         """Remove all blocks from this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_backup(self, backup_id):
         """Delete a backup.
@@ -819,7 +1194,7 @@ class Model(object):
         :param str backup_id: The id of the backup to remove
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_cached_images(self, arch=None, kind=None, series=None):
         """Remove cached OS images.
@@ -829,7 +1204,7 @@ class Model(object):
         :param str series: Image series to remove, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_machine(self, *machine_ids):
         """Remove a machine from this model.
@@ -837,7 +1212,7 @@ class Model(object):
         :param str \*machine_ids: Ids of the machines to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_machines = remove_machine
 
     def remove_ssh_key(self, *keys):
@@ -846,7 +1221,7 @@ class Model(object):
         :param str \*keys: Keys to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_ssh_keys = remove_ssh_key
 
     def restore_backup(
@@ -862,13 +1237,13 @@ class Model(object):
         :param bool upload_tools: Upload tools if bootstrapping a new machine
 
         """
-        pass
+        raise NotImplementedError()
 
     def retry_provisioning(self):
         """Retry provisioning for failed machines.
 
         """
-        pass
+        raise NotImplementedError()
 
     def revoke(self, username, acl='read'):
         """Revoke a user's access to this model.
@@ -877,7 +1252,7 @@ class Model(object):
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        raise NotImplementedError()
 
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
@@ -886,7 +1261,7 @@ class Model(object):
         :param int timeout: Time to wait before command is considered failed
 
         """
-        pass
+        raise NotImplementedError()
 
     def set_config(self, **config):
         """Set configuration keys on this model.
@@ -894,7 +1269,7 @@ class Model(object):
         :param \*\*config: Config key/values
 
         """
-        pass
+        raise NotImplementedError()
 
     def set_constraints(self, constraints):
         """Set machine constraints on this model.
@@ -902,7 +1277,7 @@ class Model(object):
         :param :class:`juju.Constraints` constraints: Machine constraints
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_output(self, action_uuid, wait=-1):
         """Get the results of an action by ID.
@@ -911,7 +1286,7 @@ class Model(object):
         :param int wait: Time in seconds to wait for action to complete
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_status(self, uuid_or_prefix=None, name=None):
         """Get the status of all actions, filtered by ID, ID prefix, or action name.
@@ -920,7 +1295,7 @@ class Model(object):
         :param str name: Filter by action name
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_budget(self, budget_name):
         """Get budget usage info.
@@ -928,7 +1303,7 @@ class Model(object):
         :param str budget_name: Name of budget
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_status(self, filter_=None, utc=False):
         """Return the status of the model.
@@ -937,7 +1312,7 @@ class Model(object):
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
+        raise NotImplementedError()
     status = get_status
 
     def sync_tools(
@@ -955,7 +1330,7 @@ class Model(object):
         :param str version: Copy a specific major.minor version
 
         """
-        pass
+        raise NotImplementedError()
 
     def unblock(self, *commands):
         """Unblock an operation that would alter this model.
@@ -964,7 +1339,7 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def unset_config(self, *keys):
         """Unset configuration on this model.
@@ -972,13 +1347,13 @@ class Model(object):
         :param str \*keys: The keys to unset
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_gui(self):
         """Upgrade the Juju GUI for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_juju(
             self, dry_run=False, reset_previous_upgrade=False,
@@ -992,7 +1367,7 @@ class Model(object):
         :param str version: Upgrade to a specific version
 
         """
-        pass
+        raise NotImplementedError()
 
     def upload_backup(self, archive_path):
         """Store a backup archive remotely in Juju.
@@ -1000,12 +1375,59 @@ class Model(object):
         :param str archive_path: Path to local archive
 
         """
-        pass
+        raise NotImplementedError()
 
     @property
     def charmstore(self):
         return self._charmstore
 
+    async def get_metrics(self, *tags):
+        """Retrieve metrics.
+
+        :param str \*tags: Tags of entities from which to retrieve metrics.
+            No tags retrieves the metrics of all units in the model.
+        :return: Dictionary of unit_name:metrics
+
+        """
+        log.debug("Retrieving metrics for %s",
+                  ', '.join(tags) if tags else "all units")
+
+        metrics_facade = client.MetricsDebugFacade()
+        metrics_facade.connect(self.connection)
+
+        entities = [client.Entity(tag) for tag in tags]
+        metrics_result = await metrics_facade.GetMetrics(entities)
+
+        metrics = collections.defaultdict(list)
+
+        for entity_metrics in metrics_result.results:
+            error = entity_metrics.error
+            if error:
+                if "is not a valid tag" in error:
+                    raise ValueError(error.message)
+                else:
+                    raise Exception(error.message)
+
+            for metric in entity_metrics.metrics:
+                metrics[metric.unit].append(vars(metric))
+
+        return metrics
+
+
+def get_charm_series(path):
+    """Inspects the charm directory at ``path`` and returns a default
+    series from its metadata.yaml (the first item in the 'series' list).
+
+    Returns None if no series can be determined.
+
+    """
+    md = Path(path) / "metadata.yaml"
+    if not md.exists():
+        return None
+    data = yaml.load(md.open())
+    series = data.get('series')
+    return series[0] if series else None
+
 
 class BundleHandler(object):
     """
@@ -1028,11 +1450,70 @@ class BundleHandler(object):
         self.ann_facade = client.AnnotationsFacade()
         self.ann_facade.connect(model.connection)
 
+    async def _handle_local_charms(self, bundle):
+        """Search for references to local charms (i.e. filesystem paths)
+        in the bundle. Upload the local charms to the model, and replace
+        the filesystem paths with appropriate 'local:' paths in the bundle.
+
+        Return the modified bundle.
+
+        :param dict bundle: Bundle dictionary
+        :return: Modified bundle dictionary
+
+        """
+        apps, args = [], []
+
+        default_series = bundle.get('series')
+        for app_name in self.applications:
+            app_dict = bundle['services'][app_name]
+            charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
+            if not os.path.isdir(charm_dir):
+                continue
+            series = (
+                app_dict.get('series') or
+                default_series or
+                get_charm_series(charm_dir)
+            )
+            if not series:
+                raise JujuError(
+                    "Couldn't determine series for charm at {}. "
+                    "Add a 'series' key to the bundle.".format(charm_dir))
+
+            # Keep track of what we need to update. We keep a list of apps
+            # that need to be updated, and a corresponding list of args
+            # needed to update those apps.
+            apps.append(app_name)
+            args.append((charm_dir, series))
+
+        if apps:
+            # If we have apps to update, spawn all the coroutines concurrently
+            # and wait for them to finish.
+            charm_urls = await asyncio.gather(*[
+                self.model.add_local_charm_dir(*params)
+                for params in args
+            ])
+            # Update the 'charm:' entry for each app with the new 'local:' url.
+            for app_name, charm_url in zip(apps, charm_urls):
+                bundle['services'][app_name]['charm'] = charm_url
+
+        return bundle
+
     async def fetch_plan(self, entity_id):
-        yaml = await self.charmstore.files(entity_id,
-                                           filename='bundle.yaml',
-                                           read_file=True)
-        self.plan = await self.client_facade.GetBundleChanges(yaml)
+        is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
+        if is_local:
+            bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
+        else:
+            bundle_yaml = await self.charmstore.files(entity_id,
+                                                      filename='bundle.yaml',
+                                                      read_file=True)
+        self.bundle = yaml.safe_load(bundle_yaml)
+        self.bundle = await self._handle_local_charms(self.bundle)
+
+        self.plan = await self.client_facade.GetBundleChanges(
+            yaml.dump(self.bundle))
+
+        if self.plan.errors:
+            raise JujuError('\n'.join(self.plan.errors))
 
     async def execute_plan(self):
         for step in self.plan.changes:
@@ -1040,6 +1521,10 @@ class BundleHandler(object):
             result = await method(*step.args)
             self.references[step.id_] = result
 
+    @property
+    def applications(self):
+        return list(self.bundle['services'].keys())
+
     def resolve(self, reference):
         if reference and reference.startswith('$'):
             reference = self.references[reference[1:]]
@@ -1054,40 +1539,65 @@ class BundleHandler(object):
             Series holds the series of the charm to be added
             if the charm default is not sufficient.
         """
+        # We don't add local charms because they've already been added
+        # by self._handle_local_charms
+        if charm.startswith('local:'):
+            return charm
+
         entity_id = await self.charmstore.entityId(charm)
         log.debug('Adding %s', entity_id)
         await self.client_facade.AddCharm(None, entity_id)
         return entity_id
 
-    async def addMachines(self, series, constraints, container_type,
-                          parent_id):
+    async def addMachines(self, params=None):
         """
-        :param series string:
-            Series holds the optional machine OS series.
+        :param params dict:
+            Dictionary specifying the machine to add. All keys are optional.
+            Keys include:
 
-        :param constraints string:
-            Constraints holds the optional machine constraints.
+            series: string specifying the machine OS series.
+
+            constraints: string holding machine constraints, if any. We'll
+                parse this into the json friendly dict that the juju api
+                expects.
 
-        :param Container_type string:
-            ContainerType optionally holds the type of the container (for
-            instance ""lxc" or kvm"). It is not specified for top level
-            machines.
+            container_type: string holding the type of the container (for
+                instance ""lxd" or kvm"). It is not specified for top level
+                machines.
+
+            parent_id: string holding a placeholder pointing to another
+                machine change or to a unit change. This value is only
+                specified in the case this machine is a container, in
+                which case also ContainerType is set.
 
-        :param parent_id string:
-            ParentId optionally holds a placeholder pointing to another machine
-            change or to a unit change. This value is only specified in the
-            case this machine is a container, in which case also ContainerType
-            is set.
         """
-        params = client.AddMachineParams(
-            series=series,
-            constraints=constraints,
-            container_type=container_type,
-            parent_id=self.resolve(parent_id),
-        )
-        results = await self.client_facade.AddMachines(params)
-        log.debug('Added new machine %s', results[0].machine)
-        return results[0].machine
+        params = params or {}
+
+        # Normalize keys
+        params = {normalize_key(k): params[k] for k in params.keys()}
+
+        # Fix up values, as necessary.
+        if 'parent_id' in params:
+            params['parent_id'] = self.resolve(params['parent_id'])
+
+        params['constraints'] = parse_constraints(
+            params.get('constraints'))
+        params['jobs'] = params.get('jobs', ['JobHostUnits'])
+
+        if params.get('container_type') == 'lxc':
+            log.warning('Juju 2.0 does not support lxc containers. '
+                        'Converting containers to lxd.')
+            params['container_type'] = 'lxd'
+
+        # Submit the request.
+        params = client.AddMachineParams(**params)
+        results = await self.client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s", error.message)
+        machine = results.machines[0].machine
+        log.debug('Added new machine %s', machine)
+        return machine
 
     async def addRelation(self, endpoint1, endpoint2):
         """
@@ -1104,14 +1614,9 @@ class BundleHandler(object):
             parts = endpoints[i].split(':')
             parts[0] = self.resolve(parts[0])
             endpoints[i] = ':'.join(parts)
-        try:
-            await self.app_facade.AddRelation(endpoints)
-            log.debug('Added relation %s <-> %s', *endpoints)
-        except JujuAPIError as e:
-            if 'relation already exists' not in e.message:
-                raise
-            log.debug('Relation %s <-> %s already exists', *endpoints)
-        return None
+
+        log.info('Relating %s <-> %s', *endpoints)
+        return await self.model.add_relation(*endpoints)
 
     async def deploy(self, charm, series, application, options, constraints,
                      storage, endpoint_bindings, resources):
@@ -1153,14 +1658,16 @@ class BundleHandler(object):
             series=series,
             application=application,
             config=options,
-            constraints=constraints,
+            constraints=parse_constraints(constraints),
             storage=storage,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
         )
         # do the do
-        log.debug('Deploying %s', charm)
+        log.info('Deploying %s', charm)
         await self.app_facade.Deploy([app])
+        # ensure the app is in the model for future operations
+        await self.model._wait_for_new('application', application)
         return application
 
     async def addUnit(self, application, to):
@@ -1182,16 +1689,14 @@ class BundleHandler(object):
             # doesn't, so we're not bothering, either
             unit_name = self._units_by_app[application].pop()
             log.debug('Reusing unit %s for %s', unit_name, application)
-            return unit_name
-        log.debug('Adding unit of %s%s',
-                  application,
-                  (' to %s' % placement) if placement else '')
-        result = await self.app_facade.AddUnits(
-            application=application,
-            placement=placement,
-            num_units=1,
+            return self.model.units[unit_name]
+
+        log.debug('Adding new unit for %s%s', application,
+                  ' to %s' % placement if placement else '')
+        return await self.model.applications[application].add_unit(
+            count=1,
+            to=placement,
         )
-        return result.units[0]
 
     async def expose(self, application):
         """
@@ -1200,9 +1705,8 @@ class BundleHandler(object):
             be exposed.
         """
         application = self.resolve(application)
-        log.debug('Exposing %s', application)
-        await self.app_facade.Expose(application)
-        return None
+        log.info('Exposing %s', application)
+        return await self.model.applications[application].expose()
 
     async def setAnnotations(self, id_, entity_type, annotations):
         """
@@ -1218,13 +1722,11 @@ class BundleHandler(object):
             Annotations holds the annotations as key/value pairs.
         """
         entity_id = self.resolve(id_)
-        log.debug('Updating annotations of %s', entity_id)
-        ann = client.EntityAnnotations(
-            entity=entity_id,
-            annotations=annotations,
-        )
-        await self.ann_facade.Set([ann])
-        return None
+        try:
+            entity = self.model.state.get_entity(entity_type, entity_id)
+        except KeyError:
+            entity = await self.model._wait_for_new(entity_type, entity_id)
+        return await entity.set_annotations(annotations)
 
 
 class CharmStore(object):
@@ -1251,3 +1753,79 @@ class CharmStore(object):
             setattr(self, name, coro)
             wrapper = coro
         return wrapper
+
+
+class CharmArchiveGenerator(object):
+    def __init__(self, path):
+        self.path = os.path.abspath(os.path.expanduser(path))
+
+    def make_archive(self, path):
+        """Create archive of directory and write to ``path``.
+
+        :param path: Path to archive
+
+        Ignored::
+
+            * build/\* - This is used for packing the charm itself and any
+                          similar tasks.
+            * \*/.\*    - Hidden files are all ignored for now.  This will most
+                          likely be changed into a specific ignore list
+                          (.bzr, etc)
+
+        """
+        zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
+        for dirpath, dirnames, filenames in os.walk(self.path):
+            relative_path = dirpath[len(self.path) + 1:]
+            if relative_path and not self._ignore(relative_path):
+                zf.write(dirpath, relative_path)
+            for name in filenames:
+                archive_name = os.path.join(relative_path, name)
+                if not self._ignore(archive_name):
+                    real_path = os.path.join(dirpath, name)
+                    self._check_type(real_path)
+                    if os.path.islink(real_path):
+                        self._check_link(real_path)
+                        self._write_symlink(
+                            zf, os.readlink(real_path), archive_name)
+                    else:
+                        zf.write(real_path, archive_name)
+        zf.close()
+        return path
+
+    def _check_type(self, path):
+        """Check the path
+        """
+        s = os.stat(path)
+        if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
+            return path
+        raise ValueError("Invalid Charm at % %s" % (
+            path, "Invalid file type for a charm"))
+
+    def _check_link(self, path):
+        link_path = os.readlink(path)
+        if link_path[0] == "/":
+            raise ValueError(
+                "Invalid Charm at %s: %s" % (
+                    path, "Absolute links are invalid"))
+        path_dir = os.path.dirname(path)
+        link_path = os.path.join(path_dir, link_path)
+        if not link_path.startswith(os.path.abspath(self.path)):
+            raise ValueError(
+                "Invalid charm at %s %s" % (
+                    path, "Only internal symlinks are allowed"))
+
+    def _write_symlink(self, zf, link_target, link_path):
+        """Package symlinks with appropriate zipfile metadata."""
+        info = zipfile.ZipInfo()
+        info.filename = link_path
+        info.create_system = 3
+        # Magic code for symlinks / py2/3 compat
+        # 27166663808 = (stat.S_IFLNK | 0755) << 16
+        info.external_attr = 2716663808
+        zf.writestr(info, link_target)
+
+    def _ignore(self, path):
+        if path == "build" or path.startswith("build/"):
+            return True
+        if path.startswith('.'):
+            return True