Fixes for landscape bundle.
[osm/N2VC.git] / juju / model.py
index fe07f80..4a8bc03 100644 (file)
@@ -1,14 +1,20 @@
 import asyncio
 import collections
 import logging
 import asyncio
 import collections
 import logging
+import os
+import re
+import weakref
 from concurrent.futures import CancelledError
 from functools import partial
 from concurrent.futures import CancelledError
 from functools import partial
+from pathlib import Path
 
 
+import yaml
 from theblues import charmstore
 
 from .client import client
 from .client import watcher
 from .client import connection
 from theblues import charmstore
 
 from .client import client
 from .client import watcher
 from .client import connection
+from .constraints import parse as parse_constraints, normalize_key
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
@@ -17,13 +23,53 @@ from .errors import JujuAPIError
 log = logging.getLogger(__name__)
 
 
 log = logging.getLogger(__name__)
 
 
+class _Observer(object):
+    """Wrapper around an observer callable.
+
+    This wrapper allows filter criteria to be associated with the
+    callable so that it's only called for changes that meet the criteria.
+
+    """
+    def __init__(self, callable_, entity_type, action, entity_id, predicate):
+        self.callable_ = callable_
+        self.entity_type = entity_type
+        self.action = action
+        self.entity_id = entity_id
+        self.predicate = predicate
+        if self.entity_id:
+            self.entity_id = str(self.entity_id)
+            if not self.entity_id.startswith('^'):
+                self.entity_id = '^' + self.entity_id
+            if not self.entity_id.endswith('$'):
+                self.entity_id += '$'
+
+    async def __call__(self, delta, old, new, model):
+        await self.callable_(delta, old, new, model)
+
+    def cares_about(self, delta):
+        """Return True if this observer "cares about" (i.e. wants to be
+        called) for a this delta.
+
+        """
+        if (self.entity_id and delta.get_id() and
+                not re.match(self.entity_id, str(delta.get_id()))):
+            return False
+
+        if self.entity_type and self.entity_type != delta.entity:
+            return False
+
+        if self.action and self.action != delta.type:
+            return False
+
+        if self.predicate and not self.predicate(delta):
+            return False
+
+        return True
+
+
 class ModelObserver(object):
     async def __call__(self, delta, old, new, model):
 class ModelObserver(object):
     async def __call__(self, delta, old, new, model):
-        if old is None and new is not None:
-            type_ = 'add'
-        else:
-            type_ = delta.type
-        handler_name = 'on_{}_{}'.format(delta.entity, type_)
+        handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
         method = getattr(self, handler_name, self.on_change)
         await method(delta, old, new, model)
 
         method = getattr(self, handler_name, self.on_change)
         await method(delta, old, new, model)
 
@@ -116,12 +162,18 @@ class ModelState(object):
 
     def get_entity(
             self, entity_type, entity_id, history_index=-1, connected=True):
 
     def get_entity(
             self, entity_type, entity_id, history_index=-1, connected=True):
-        """Return an object instance representing the entity created or
-        updated by ``delta``
+        """Return an object instance for the given entity_type and id.
+
+        By default the object state matches the most recent state from
+        Juju. To get an instance of the object in an older state, pass
+        history_index, an index into the history deque for the entity.
 
         """
 
         """
+
         if history_index < 0 and history_index != -1:
             history_index += len(self.entity_history(entity_type, entity_id))
         if history_index < 0 and history_index != -1:
             history_index += len(self.entity_history(entity_type, entity_id))
+            if history_index < 0:
+                return None
 
         try:
             self.entity_data(entity_type, entity_id, history_index)
 
         try:
             self.entity_data(entity_type, entity_id, history_index)
@@ -155,18 +207,33 @@ class ModelEntity(object):
         self.connected = connected
         self.connection = model.connection
 
         self.connected = connected
         self.connection = model.connection
 
+    def __repr__(self):
+        return '<{} entity_id="{}">'.format(type(self).__name__,
+                                            self.entity_id)
+
     def __getattr__(self, name):
         """Fetch object attributes from the underlying data dict held in the
         model.
 
         """
     def __getattr__(self, name):
         """Fetch object attributes from the underlying data dict held in the
         model.
 
         """
-        if self.data is None:
-            raise DeadEntityException(
-                "Entity {}:{} is dead - its attributes can no longer be "
-                "accessed. Use the .previous() method on this object to get "
-                "a copy of the object at its previous state.".format(
-                    self.entity_type, self.entity_id))
-        return self.data[name]
+        return self.safe_data[name]
+
+    def __bool__(self):
+        return bool(self.data)
+
+    def on_change(self, callable_):
+        """Add a change observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'change', self.entity_id)
+
+    def on_remove(self, callable_):
+        """Add a remove observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'remove', self.entity_id)
 
     @property
     def entity_type(self):
 
     @property
     def entity_type(self):
@@ -182,7 +249,7 @@ class ModelEntity(object):
         entity in the underlying model.
 
         This will be True except when the object represents an entity at a
         entity in the underlying model.
 
         This will be True except when the object represents an entity at a
-        prior state in history, e.g. if the object was obtained by calling
+        non-latest state in history, e.g. if the object was obtained by calling
         .previous() on another object.
 
         """
         .previous() on another object.
 
         """
@@ -216,6 +283,22 @@ class ModelEntity(object):
         return self.model.state.entity_data(
             self.entity_type, self.entity_id, self._history_index)
 
         return self.model.state.entity_data(
             self.entity_type, self.entity_id, self._history_index)
 
+    @property
+    def safe_data(self):
+        """The data dictionary for this entity.
+
+        If this `ModelEntity` points to the dead state, it will
+        raise `DeadEntityException`.
+
+        """
+        if self.data is None:
+            raise DeadEntityException(
+                "Entity {}:{} is dead - its attributes can no longer be "
+                "accessed. Use the .previous() method on this object to get "
+                "a copy of the object at its previous state.".format(
+                    self.entity_type, self.entity_id))
+        return self.data
+
     def previous(self):
         """Return a copy of this object as was at its previous state in
         history.
     def previous(self):
         """Return a copy of this object as was at its previous state in
         history.
@@ -276,20 +359,46 @@ class Model(object):
         """
         self.loop = loop or asyncio.get_event_loop()
         self.connection = None
         """
         self.loop = loop or asyncio.get_event_loop()
         self.connection = None
-        self.observers = set()
+        self.observers = weakref.WeakValueDictionary()
         self.state = ModelState(self)
         self.state = ModelState(self)
+        self.info = None
         self._watcher_task = None
         self._watch_shutdown = asyncio.Event(loop=loop)
         self._watch_received = asyncio.Event(loop=loop)
         self._charmstore = CharmStore(self.loop)
 
         self._watcher_task = None
         self._watch_shutdown = asyncio.Event(loop=loop)
         self._watch_received = asyncio.Event(loop=loop)
         self._charmstore = CharmStore(self.loop)
 
+    async def connect(self, *args, **kw):
+        """Connect to an arbitrary Juju model.
+
+        args and kw are passed through to Connection.connect()
+
+        """
+        self.connection = await connection.Connection.connect(*args, **kw)
+        await self._after_connect()
+
     async def connect_current(self):
         """Connect to the current Juju model.
 
         """
         self.connection = await connection.Connection.connect_current()
     async def connect_current(self):
         """Connect to the current Juju model.
 
         """
         self.connection = await connection.Connection.connect_current()
+        await self._after_connect()
+
+    async def connect_model(self, model_name):
+        """Connect to a specific Juju model by name.
+
+        :param model_name:  Format [controller:][user/]model
+
+        """
+        self.connection = await connection.Connection.connect_model(model_name)
+        await self._after_connect()
+
+    async def _after_connect(self):
+        """Run initialization steps after connecting to websocket.
+
+        """
         self._watch()
         await self._watch_received.wait()
         self._watch()
         await self._watch_received.wait()
+        await self.get_info()
 
     async def disconnect(self):
         """Shut down the watcher task and close websockets.
 
     async def disconnect(self):
         """Shut down the watcher task and close websockets.
@@ -330,13 +439,13 @@ class Model(object):
             lambda: len(self.machines) == 0
         )
 
             lambda: len(self.machines) == 0
         )
 
-    async def block_until(self, *conditions, timeout=None):
+    async def block_until(self, *conditions, timeout=None, wait_period=0.5):
         """Return only after all conditions are true.
 
         """
         async def _block():
             while not all(c() for c in conditions):
         """Return only after all conditions are true.
 
         """
         async def _block():
             while not all(c() for c in conditions):
-                await asyncio.sleep(.1)
+                await asyncio.sleep(wait_period)
         await asyncio.wait_for(_block(), timeout)
 
     @property
         await asyncio.wait_for(_block(), timeout)
 
     @property
@@ -363,11 +472,34 @@ class Model(object):
         """
         return self.state.units
 
         """
         return self.state.units
 
-    def add_observer(self, callable_):
+    async def get_info(self):
+        """Return a client.ModelInfo object for this Model.
+
+        Retrieves latest info for this Model from the api server. The
+        return value is cached on the Model.info attribute so that the
+        valued may be accessed again without another api call, if
+        desired.
+
+        This method is called automatically when the Model is connected,
+        resulting in Model.info being initialized without requiring an
+        explicit call to this method.
+
+        """
+        facade = client.ClientFacade()
+        facade.connect(self.connection)
+
+        self.info = await facade.ModelInfo()
+        log.debug('Got ModelInfo: %s', vars(self.info))
+
+        return self.info
+
+    def add_observer(
+            self, callable_, entity_type=None, action=None, entity_id=None,
+            predicate=None):
         """Register an "on-model-change" callback
 
         """Register an "on-model-change" callback
 
-        Once a watch is started (Model.watch() is called), ``callable_``
-        will be called each time the model changes. callable_ should
+        Once the model is connected, ``callable_``
+        will be called each time the model changes. ``callable_`` should
         be Awaitable and accept the following positional arguments:
 
             delta - An instance of :class:`juju.delta.EntityDelta`
         be Awaitable and accept the following positional arguments:
 
             delta - An instance of :class:`juju.delta.EntityDelta`
@@ -385,8 +517,21 @@ class Model(object):
 
             model - The :class:`Model` itself.
 
 
             model - The :class:`Model` itself.
 
+        Events for which ``callable_`` is called can be specified by passing
+        entity_type, action, and/or entitiy_id filter criteria, e.g.::
+
+            add_observer(
+                myfunc,
+                entity_type='application', action='add', entity_id='ubuntu')
+
+        For more complex filtering conditions, pass a predicate function. It
+        will be called with a delta as its only argument. If the predicate
+        function returns True, the ``callable_`` will be called.
+
         """
         """
-        self.observers.add(callable_)
+        observer = _Observer(
+            callable_, entity_type, action, entity_id, predicate)
+        self.observers[observer] = callable_
 
     def _watch(self):
         """Start an asynchronous watch against this model.
 
     def _watch(self):
         """Start an asynchronous watch against this model.
@@ -442,11 +587,72 @@ class Model(object):
             by applying this delta.
 
         """
             by applying this delta.
 
         """
+        if new_obj and not old_obj:
+            delta.type = 'add'
+
         log.debug(
             'Model changed: %s %s %s',
             delta.entity, delta.type, delta.get_id())
         log.debug(
             'Model changed: %s %s %s',
             delta.entity, delta.type, delta.get_id())
+
         for o in self.observers:
         for o in self.observers:
-            asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+            if o.cares_about(delta):
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+
+    async def _wait(self, entity_type, entity_id, action, predicate=None):
+        """
+        Block the calling routine until a given action has happened to the
+        given entity
+
+        :param entity_type: The entity's type.
+        :param entity_id: The entity's id.
+        :param action: the type of action (e.g., 'add' or 'change')
+        :param predicate: optional callable that must take as an
+            argument a delta, and must return a boolean, indicating
+            whether the delta contains the specific action we're looking
+            for. For example, you might check to see whether a 'change'
+            has a 'completed' status. See the _Observer class for details.
+
+        """
+        q = asyncio.Queue(loop=self.loop)
+
+        async def callback(delta, old, new, model):
+            await q.put(delta.get_id())
+
+        self.add_observer(callback, entity_type, action, entity_id, predicate)
+        entity_id = await q.get()
+        return self.state._live_entity_map(entity_type)[entity_id]
+
+    async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
+        """Wait for a new object to appear in the Model and return it.
+
+        Waits for an object of type ``entity_type`` with id ``entity_id``.
+        If ``entity_id`` is ``None``, it will wait for the first new entity
+        of the correct type.
+
+        This coroutine blocks until the new object appears in the model.
+
+        """
+        # if the entity is already in the model, just return it
+        if entity_id in self.state._live_entity_map(entity_type):
+            return self.state._live_entity_map(entity_type)[entity_id]
+        # if we know the entity_id, we can trigger on any action that puts
+        # the enitty into the model; otherwise, we have to watch for the
+        # next "add" action on that entity_type
+        action = 'add' if entity_id is None else None
+        return await self._wait(entity_type, entity_id, action, predicate)
+
+    async def wait_for_action(self, action_id):
+        """Given an action, wait for it to complete."""
+
+        if action_id.startswith("action-"):
+            # if we've been passed action.tag, transform it into the
+            # id that the api deltas will use.
+            action_id = action_id[7:]
+
+        def predicate(delta):
+            return delta.data['status'] in ('completed', 'failed')
+
+        return await self._wait('action', action_id, 'change', predicate)
 
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
 
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
@@ -491,7 +697,24 @@ class Model(object):
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
-        return await app_facade.AddRelation([relation1, relation2])
+        try:
+            result = await app_facade.AddRelation([relation1, relation2])
+        except JujuAPIError as e:
+            if 'relation already exists' not in e.message:
+                raise
+            log.debug(
+                'Relation %s <-> %s already exists', relation1, relation2)
+            # TODO: if relation already exists we should return the
+            # Relation ModelEntity here
+            return None
+
+        def predicate(delta):
+            endpoints = {}
+            for endpoint in delta.data['endpoints']:
+                endpoints[endpoint['application-name']] = endpoint['relation']
+            return endpoints == result.endpoints
+
+        return await self._wait_for_new('relation', None, predicate)
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
@@ -600,14 +823,14 @@ class Model(object):
         pass
 
     async def deploy(
         pass
 
     async def deploy(
-            self, entity_url, service_name=None, bind=None, budget=None,
+            self, entity_url, application_name=None, bind=None, budget=None,
             channel=None, config=None, constraints=None, force=False,
             num_units=1, plan=None, resources=None, series=None, storage=None,
             to=None):
         """Deploy a new service or bundle.
 
         :param str entity_url: Charm or bundle url
             channel=None, config=None, constraints=None, force=False,
             num_units=1, plan=None, resources=None, series=None, storage=None,
             to=None):
         """Deploy a new service or bundle.
 
         :param str entity_url: Charm or bundle url
-        :param str service_name: Name to give the service
+        :param str application_name: Name to give the service
         :param dict bind: <charm endpoint>:<network space> pairs
         :param dict budget: <budget name>:<limit> pairs
         :param str channel: Charm store channel from which to retrieve
         :param dict bind: <charm endpoint>:<network space> pairs
         :param dict budget: <budget name>:<limit> pairs
         :param str channel: Charm store channel from which to retrieve
@@ -633,16 +856,11 @@ class Model(object):
 
         TODO::
 
 
         TODO::
 
-            - entity_url must have a revision; look up latest automatically if
-              not provided by caller
-            - service_name is required; fill this in automatically if not
+            - application_name is required; fill this in automatically if not
               provided by caller
             - series is required; how do we pick a default?
 
         """
               provided by caller
             - series is required; how do we pick a default?
 
         """
-        if constraints:
-            constraints = client.Value(**constraints)
-
         if to:
             placement = [
                 client.Placement(**p) for p in to
         if to:
             placement = [
                 client.Placement(**p) for p in to
@@ -656,28 +874,47 @@ class Model(object):
                 for k, v in storage.items()
             }
 
                 for k, v in storage.items()
             }
 
-        entity_id = await self.charmstore.entityId(entity_url)
+        is_local = not entity_url.startswith('cs:') and \
+            os.path.isdir(entity_url)
+        entity_id = await self.charmstore.entityId(entity_url) \
+            if not is_local else entity_url
 
         app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
         app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
 
         app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
         app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
-        if 'bundle/' in entity_id:
+        is_bundle = ((is_local and
+                      (Path(entity_id) / 'bundle.yaml').exists()) or
+                     (not is_local and 'bundle/' in entity_id))
+
+        if is_bundle:
             handler = BundleHandler(self)
             await handler.fetch_plan(entity_id)
             await handler.execute_plan()
             handler = BundleHandler(self)
             await handler.fetch_plan(entity_id)
             await handler.execute_plan()
+            extant_apps = {app for app in self.applications}
+            pending_apps = set(handler.applications) - extant_apps
+            if pending_apps:
+                # new apps will usually be in the model by now, but if some
+                # haven't made it yet we'll need to wait on them to be added
+                await asyncio.gather(*[
+                    asyncio.ensure_future(
+                        self._wait_for_new('application', app_name))
+                    for app_name in pending_apps
+                ])
+            return [app for name, app in self.applications.items()
+                    if name in handler.applications]
         else:
             log.debug(
                 'Deploying %s', entity_id)
 
             await client_facade.AddCharm(channel, entity_id)
             app = client.ApplicationDeploy(
         else:
             log.debug(
                 'Deploying %s', entity_id)
 
             await client_facade.AddCharm(channel, entity_id)
             app = client.ApplicationDeploy(
-                application=service_name,
+                application=application_name,
                 channel=channel,
                 charm_url=entity_id,
                 config=config,
                 channel=channel,
                 charm_url=entity_id,
                 config=config,
-                constraints=constraints,
+                constraints=parse_constraints(constraints),
                 endpoint_bindings=bind,
                 num_units=num_units,
                 placement=placement,
                 endpoint_bindings=bind,
                 num_units=num_units,
                 placement=placement,
@@ -686,7 +923,8 @@ class Model(object):
                 storage=storage,
             )
 
                 storage=storage,
             )
 
-            return await app_facade.Deploy([app])
+            await app_facade.Deploy([app])
+            return await self._wait_for_new('application', application_name)
 
     def destroy(self):
         """Terminate all machines and resources for this model.
 
     def destroy(self):
         """Terminate all machines and resources for this model.
@@ -694,6 +932,21 @@ class Model(object):
         """
         pass
 
         """
         pass
 
+    async def destroy_unit(self, *unit_names):
+        """Destroy units by name.
+
+        """
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        log.debug(
+            'Destroying unit%s %s',
+            's' if len(unit_names) == 1 else '',
+            ' '.join(unit_names))
+
+        return await app_facade.Destroy(self.name)
+    destroy_units = destroy_unit
+
     def get_backup(self, archive_id):
         """Download a backup archive file.
 
     def get_backup(self, archive_id):
         """Download a backup archive file.
 
@@ -1006,6 +1259,36 @@ class Model(object):
     def charmstore(self):
         return self._charmstore
 
     def charmstore(self):
         return self._charmstore
 
+    async def get_metrics(self, *tags):
+        """Retrieve metrics.
+
+        :param str \*tags: Tags of entities from which to retrieve metrics.
+            No tags retrieves the metrics of all units in the model.
+        """
+        log.debug("Retrieving metrics for %s",
+                  ', '.join(tags) if tags else "all units")
+
+        metrics_facade = client.MetricsDebugFacade()
+        metrics_facade.connect(self.connection)
+
+        entities = [client.Entity(tag) for tag in tags]
+        metrics_result = await metrics_facade.GetMetrics(entities)
+
+        metrics = collections.defaultdict(list)
+
+        for entity_metrics in metrics_result.results:
+            error = entity_metrics.error
+            if error:
+                if "is not a valid tag" in error:
+                    raise ValueError(error.message)
+                else:
+                    raise Exception(error.message)
+
+            for metric in entity_metrics.metrics:
+                metrics[metric.unit].append(vars(metric))
+
+        return metrics
+
 
 class BundleHandler(object):
     """
 
 class BundleHandler(object):
     """
@@ -1029,10 +1312,15 @@ class BundleHandler(object):
         self.ann_facade.connect(model.connection)
 
     async def fetch_plan(self, entity_id):
         self.ann_facade.connect(model.connection)
 
     async def fetch_plan(self, entity_id):
-        yaml = await self.charmstore.files(entity_id,
-                                           filename='bundle.yaml',
-                                           read_file=True)
-        self.plan = await self.client_facade.GetBundleChanges(yaml)
+        is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
+        if is_local:
+            bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
+        else:
+            bundle_yaml = await self.charmstore.files(entity_id,
+                                                      filename='bundle.yaml',
+                                                      read_file=True)
+        self.bundle = yaml.safe_load(bundle_yaml)
+        self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
 
     async def execute_plan(self):
         for step in self.plan.changes:
 
     async def execute_plan(self):
         for step in self.plan.changes:
@@ -1040,6 +1328,10 @@ class BundleHandler(object):
             result = await method(*step.args)
             self.references[step.id_] = result
 
             result = await method(*step.args)
             self.references[step.id_] = result
 
+    @property
+    def applications(self):
+        return list(self.bundle['services'].keys())
+
     def resolve(self, reference):
         if reference and reference.startswith('$'):
             reference = self.references[reference[1:]]
     def resolve(self, reference):
         if reference and reference.startswith('$'):
             reference = self.references[reference[1:]]
@@ -1059,35 +1351,55 @@ class BundleHandler(object):
         await self.client_facade.AddCharm(None, entity_id)
         return entity_id
 
         await self.client_facade.AddCharm(None, entity_id)
         return entity_id
 
-    async def addMachines(self, series, constraints, container_type,
-                          parent_id):
+    async def addMachines(self, params=None):
         """
         """
-        :param series string:
-            Series holds the optional machine OS series.
+        :param params dict:
+            Dictionary specifying the machine to add. All keys are optional.
+            Keys include:
 
 
-        :param constraints string:
-            Constraints holds the optional machine constraints.
+            series: string specifying the machine OS series.
+
+            constraints: string holding machine constraints, if any. We'll
+                parse this into the json friendly dict that the juju api
+                expects.
+
+            container_type: string holding the type of the container (for
+                instance ""lxd" or kvm"). It is not specified for top level
+                machines.
 
 
-        :param Container_type string:
-            ContainerType optionally holds the type of the container (for
-            instance ""lxc" or kvm"). It is not specified for top level
-            machines.
+            parent_id: string holding a placeholder pointing to another
+                machine change or to a unit change. This value is only
+                specified in the case this machine is a container, in
+                which case also ContainerType is set.
 
 
-        :param parent_id string:
-            ParentId optionally holds a placeholder pointing to another machine
-            change or to a unit change. This value is only specified in the
-            case this machine is a container, in which case also ContainerType
-            is set.
         """
         """
-        params = client.AddMachineParams(
-            series=series,
-            constraints=constraints,
-            container_type=container_type,
-            parent_id=self.resolve(parent_id),
-        )
-        results = await self.client_facade.AddMachines(params)
-        log.debug('Added new machine %s', results[0].machine)
-        return results[0].machine
+        params = params or {}
+
+        # Normalize keys
+        params = {normalize_key(k): params[k] for k in params.keys()}
+
+        # Fix up values, as necessary.
+        if 'parent_id' in params:
+            params['parent_id'] = self.resolve(params['parent_id'])
+
+        params['constraints'] = parse_constraints(
+            params.get('constraints'))
+        params['jobs'] = params.get('jobs', ['JobHostUnits'])
+
+        if params.get('container_type') == 'lxc':
+            log.warning('Juju 2.0 does not support lxc containers. '
+                        'Converting containers to lxd.')
+            params['container_type'] = 'lxd'
+
+        # Submit the request.
+        params = client.AddMachineParams(**params)
+        results = await self.client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s", error.message)
+        machine = results.machines[0].machine
+        log.debug('Added new machine %s', machine)
+        return machine
 
     async def addRelation(self, endpoint1, endpoint2):
         """
 
     async def addRelation(self, endpoint1, endpoint2):
         """
@@ -1104,14 +1416,9 @@ class BundleHandler(object):
             parts = endpoints[i].split(':')
             parts[0] = self.resolve(parts[0])
             endpoints[i] = ':'.join(parts)
             parts = endpoints[i].split(':')
             parts[0] = self.resolve(parts[0])
             endpoints[i] = ':'.join(parts)
-        try:
-            await self.app_facade.AddRelation(endpoints)
-            log.debug('Added relation %s <-> %s', *endpoints)
-        except JujuAPIError as e:
-            if 'relation already exists' not in e.message:
-                raise
-            log.debug('Relation %s <-> %s already exists', *endpoints)
-        return None
+
+        log.info('Relating %s <-> %s', *endpoints)
+        return await self.model.add_relation(*endpoints)
 
     async def deploy(self, charm, series, application, options, constraints,
                      storage, endpoint_bindings, resources):
 
     async def deploy(self, charm, series, application, options, constraints,
                      storage, endpoint_bindings, resources):
@@ -1153,14 +1460,16 @@ class BundleHandler(object):
             series=series,
             application=application,
             config=options,
             series=series,
             application=application,
             config=options,
-            constraints=constraints,
+            constraints=parse_constraints(constraints),
             storage=storage,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
         )
         # do the do
             storage=storage,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
         )
         # do the do
-        log.debug('Deploying %s', charm)
+        log.info('Deploying %s', charm)
         await self.app_facade.Deploy([app])
         await self.app_facade.Deploy([app])
+        # ensure the app is in the model for future operations
+        await self.model._wait_for_new('application', application)
         return application
 
     async def addUnit(self, application, to):
         return application
 
     async def addUnit(self, application, to):
@@ -1182,16 +1491,14 @@ class BundleHandler(object):
             # doesn't, so we're not bothering, either
             unit_name = self._units_by_app[application].pop()
             log.debug('Reusing unit %s for %s', unit_name, application)
             # doesn't, so we're not bothering, either
             unit_name = self._units_by_app[application].pop()
             log.debug('Reusing unit %s for %s', unit_name, application)
-            return unit_name
-        log.debug('Adding unit of %s%s',
-                  application,
-                  (' to %s' % placement) if placement else '')
-        result = await self.app_facade.AddUnits(
-            application=application,
-            placement=placement,
-            num_units=1,
+            return self.model.units[unit_name]
+
+        log.debug('Adding new unit for %s%s', application,
+                  ' to %s' % placement if placement else '')
+        return await self.model.applications[application].add_unit(
+            count=1,
+            to=placement,
         )
         )
-        return result.units[0]
 
     async def expose(self, application):
         """
 
     async def expose(self, application):
         """
@@ -1200,9 +1507,8 @@ class BundleHandler(object):
             be exposed.
         """
         application = self.resolve(application)
             be exposed.
         """
         application = self.resolve(application)
-        log.debug('Exposing %s', application)
-        await self.app_facade.Expose(application)
-        return None
+        log.info('Exposing %s', application)
+        return await self.model.applications[application].expose()
 
     async def setAnnotations(self, id_, entity_type, annotations):
         """
 
     async def setAnnotations(self, id_, entity_type, annotations):
         """
@@ -1218,13 +1524,11 @@ class BundleHandler(object):
             Annotations holds the annotations as key/value pairs.
         """
         entity_id = self.resolve(id_)
             Annotations holds the annotations as key/value pairs.
         """
         entity_id = self.resolve(id_)
-        log.debug('Updating annotations of %s', entity_id)
-        ann = client.EntityAnnotations(
-            entity=entity_id,
-            annotations=annotations,
-        )
-        await self.ann_facade.Set([ann])
-        return None
+        try:
+            entity = self.model.state.get_entity(entity_type, entity_id)
+        except KeyError:
+            entity = await self.model._wait_for_new(entity_type, entity_id)
+        return await entity.set_annotations(annotations)
 
 
 class CharmStore(object):
 
 
 class CharmStore(object):