Addressed PR comments.
[osm/N2VC.git] / juju / model.py
index 04f3437..b74a6c8 100644 (file)
 import asyncio
+import collections
 import logging
+import re
+import weakref
 from concurrent.futures import CancelledError
+from functools import partial
+
+import yaml
+from theblues import charmstore
 
 from .client import client
 from .client import watcher
 from .client import connection
 from .delta import get_entity_delta
+from .delta import get_entity_class
+from .exceptions import DeadEntityException
+from .errors import JujuAPIError
 
 log = logging.getLogger(__name__)
 
 
+class _Observer(object):
+    """Wrapper around an observer callable.
+
+    This wrapper allows filter criteria to be associated with the
+    callable so that it's only called for changes that meet the criteria.
+
+    """
+    def __init__(self, callable_, entity_type, action, entity_id, predicate):
+        self.callable_ = callable_
+        self.entity_type = entity_type
+        self.action = action
+        self.entity_id = entity_id
+        self.predicate = predicate
+        if self.entity_id:
+            self.entity_id = str(self.entity_id)
+            if not self.entity_id.startswith('^'):
+                self.entity_id = '^' + self.entity_id
+            if not self.entity_id.endswith('$'):
+                self.entity_id += '$'
+
+    async def __call__(self, delta, old, new, model):
+        await self.callable_(delta, old, new, model)
+
+    def cares_about(self, delta):
+        """Return True if this observer "cares about" (i.e. wants to be
+        called) for a this delta.
+
+        """
+        if (self.entity_id and delta.get_id() and
+                not re.match(self.entity_id, str(delta.get_id()))):
+            return False
+
+        if self.entity_type and self.entity_type != delta.entity:
+            return False
+
+        if self.action and self.action != delta.type:
+            return False
+
+        if self.predicate and not self.predicate(delta):
+            return False
+
+        return True
+
+
 class ModelObserver(object):
-    def __call__(self, delta, old, new, model):
-        if old is None and new is not None:
-            type_ = 'add'
-        else:
-            type_ = delta.type
-        handler_name = 'on_{}_{}'.format(delta.entity, type_)
+    async def __call__(self, delta, old, new, model):
+        handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
         method = getattr(self, handler_name, self.on_change)
-        log.debug(
-            'Model changed: %s %s %s',
-            delta.entity, delta.type, delta.get_id())
-        method(delta, old, new, model)
+        await method(delta, old, new, model)
 
-    def on_change(self, delta, old, new, model):
+    async def on_change(self, delta, old, new, model):
         pass
 
 
+class ModelState(object):
+    """Holds the state of the model, including the delta history of all
+    entities in the model.
+
+    """
+    def __init__(self, model):
+        self.model = model
+        self.state = dict()
+
+    def _live_entity_map(self, entity_type):
+        """Return an id:Entity map of all the living entities of
+        type ``entity_type``.
+
+        """
+        return {
+            entity_id: self.get_entity(entity_type, entity_id)
+            for entity_id, history in self.state.get(entity_type, {}).items()
+            if history[-1] is not None
+        }
+
+    @property
+    def applications(self):
+        """Return a map of application-name:Application for all applications
+        currently in the model.
+
+        """
+        return self._live_entity_map('application')
+
+    @property
+    def machines(self):
+        """Return a map of machine-id:Machine for all machines currently in
+        the model.
+
+        """
+        return self._live_entity_map('machine')
+
+    @property
+    def units(self):
+        """Return a map of unit-id:Unit for all units currently in
+        the model.
+
+        """
+        return self._live_entity_map('unit')
+
+    def entity_history(self, entity_type, entity_id):
+        """Return the history deque for an entity.
+
+        """
+        return self.state[entity_type][entity_id]
+
+    def entity_data(self, entity_type, entity_id, history_index):
+        """Return the data dict for an entity at a specific index of its
+        history.
+
+        """
+        return self.entity_history(entity_type, entity_id)[history_index]
+
+    def apply_delta(self, delta):
+        """Apply delta to our state and return a copy of the
+        affected object as it was before and after the update, e.g.:
+
+            old_obj, new_obj = self.apply_delta(delta)
+
+        old_obj may be None if the delta is for the creation of a new object,
+        e.g. a new application or unit is deployed.
+
+        new_obj will never be None, but may be dead (new_obj.dead == True)
+        if the object was deleted as a result of the delta being applied.
+
+        """
+        history = (
+            self.state
+            .setdefault(delta.entity, {})
+            .setdefault(delta.get_id(), collections.deque())
+        )
+
+        history.append(delta.data)
+        if delta.type == 'remove':
+            history.append(None)
+
+        entity = self.get_entity(delta.entity, delta.get_id())
+        return entity.previous(), entity
+
+    def get_entity(
+            self, entity_type, entity_id, history_index=-1, connected=True):
+        """Return an object instance representing the entity created or
+        updated by ``delta``
+
+        """
+        """
+        log.debug(
+            'Getting %s:%s at index %s',
+            entity_type, entity_id, history_index)
+        """
+
+        if history_index < 0 and history_index != -1:
+            history_index += len(self.entity_history(entity_type, entity_id))
+            if history_index < 0:
+                return None
+
+        try:
+            self.entity_data(entity_type, entity_id, history_index)
+        except IndexError:
+            return None
+
+        entity_class = get_entity_class(entity_type)
+        return entity_class(
+            entity_id, self.model, history_index=history_index,
+            connected=connected)
+
+
 class ModelEntity(object):
     """An object in the Model tree"""
 
-    def __init__(self, data, model):
+    def __init__(self, entity_id, model, history_index=-1, connected=True):
         """Initialize a new entity
 
-        :param data: dict of data from a watcher delta
+        :param entity_id str: The unique id of the object in the model
         :param model: The model instance in whose object tree this
             entity resides
+        :history_index int: The index of this object's state in the model's
+            history deque for this entity
+        :connected bool: Flag indicating whether this object gets live updates
+            from the model.
 
         """
-        self.data = data
+        self.entity_id = entity_id
         self.model = model
+        self._history_index = history_index
+        self.connected = connected
         self.connection = model.connection
 
     def __getattr__(self, name):
+        """Fetch object attributes from the underlying data dict held in the
+        model.
+
+        """
+        if self.data is None:
+            raise DeadEntityException(
+                "Entity {}:{} is dead - its attributes can no longer be "
+                "accessed. Use the .previous() method on this object to get "
+                "a copy of the object at its previous state.".format(
+                    self.entity_type, self.entity_id))
         return self.data[name]
 
+    def __bool__(self):
+        return bool(self.data)
+
+    def on_change(self, callable_):
+        """Add a change observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'change', self.entity_id)
+
+    def on_remove(self, callable_):
+        """Add a remove observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'remove', self.entity_id)
+
+    @property
+    def entity_type(self):
+        """A string identifying the entity type of this object, e.g.
+        'application' or 'unit', etc.
+
+        """
+        return self.__class__.__name__.lower()
+
+    @property
+    def current(self):
+        """Return True if this object represents the current state of the
+        entity in the underlying model.
+
+        This will be True except when the object represents an entity at a
+        non-latest state in history, e.g. if the object was obtained by calling
+        .previous() on another object.
+
+        """
+        return self._history_index == -1
+
+    @property
+    def dead(self):
+        """Returns True if this entity no longer exists in the underlying
+        model.
+
+        """
+        return (
+            self.data is None or
+            self.model.state.entity_data(
+                self.entity_type, self.entity_id, -1) is None
+        )
+
+    @property
+    def alive(self):
+        """Returns True if this entity still exists in the underlying
+        model.
+
+        """
+        return not self.dead
+
+    @property
+    def data(self):
+        """The data dictionary for this entity.
+
+        """
+        return self.model.state.entity_data(
+            self.entity_type, self.entity_id, self._history_index)
+
+    def previous(self):
+        """Return a copy of this object as was at its previous state in
+        history.
+
+        Returns None if this object is new (and therefore has no history).
+
+        The returned object is always "disconnected", i.e. does not receive
+        live updates.
+
+        """
+        return self.model.state.get_entity(
+            self.entity_type, self.entity_id, self._history_index - 1,
+            connected=False)
+
+    def next(self):
+        """Return a copy of this object at its next state in
+        history.
+
+        Returns None if this object is already the latest.
+
+        The returned object is "disconnected", i.e. does not receive
+        live updates, unless it is current (latest).
+
+        """
+        if self._history_index == -1:
+            return None
+
+        new_index = self._history_index + 1
+        connected = (
+            new_index == len(self.model.state.entity_history(
+                self.entity_type, self.entity_id)) - 1
+        )
+        return self.model.state.get_entity(
+            self.entity_type, self.entity_id, self._history_index - 1,
+            connected=connected)
+
+    def latest(self):
+        """Return a copy of this object at its current state in the model.
+
+        Returns self if this object is already the latest.
+
+        The returned object is always "connected", i.e. receives
+        live updates from the model.
+
+        """
+        if self._history_index == -1:
+            return self
+
+        return self.model.state.get_entity(self.entity_type, self.entity_id)
+
 
 class Model(object):
     def __init__(self, loop=None):
@@ -55,23 +344,30 @@ class Model(object):
         """
         self.loop = loop or asyncio.get_event_loop()
         self.connection = None
-        self.observers = set()
-        self.state = dict()
+        self.observers = weakref.WeakValueDictionary()
+        self.state = ModelState(self)
         self._watcher_task = None
         self._watch_shutdown = asyncio.Event(loop=loop)
         self._watch_received = asyncio.Event(loop=loop)
+        self._charmstore = CharmStore(self.loop)
 
     async def connect_current(self):
+        """Connect to the current Juju model.
+
+        """
         self.connection = await connection.Connection.connect_current()
         self._watch()
         await self._watch_received.wait()
 
     async def disconnect(self):
+        """Shut down the watcher task and close websockets.
+
+        """
         self._stop_watching()
         if self.connection and self.connection.is_open:
             await self._watch_shutdown.wait()
             log.debug('Closing model connection')
-            await asyncio.wait_for(self.connection.close(), None)
+            await self.connection.close()
             self.connection = None
 
     def all_units_idle(self):
@@ -85,35 +381,64 @@ class Model(object):
         return True
 
     async def reset(self, force=False):
+        """Reset the model to a clean state.
+
+        :param bool force: Force-terminate machines.
+
+        This returns only after the model has reached a clean state. "Clean"
+        means no applications or machines exist in the model.
+
+        """
+        log.debug('Resetting model')
         for app in self.applications.values():
             await app.destroy()
         for machine in self.machines.values():
             await machine.destroy(force=force)
+        await self.block_until(
+            lambda: len(self.machines) == 0
+        )
 
-    async def block_until(self, func):
+    async def block_until(self, *conditions, timeout=None):
+        """Return only after all conditions are true.
+
+        """
         async def _block():
-            while not func():
-                await asyncio.sleep(.1)
-        await asyncio.wait_for(_block(), None)
+            while not all(c() for c in conditions):
+                await asyncio.sleep(0)
+        await asyncio.wait_for(_block(), timeout)
 
     @property
     def applications(self):
-        return self.state.get('application', {})
+        """Return a map of application-name:Application for all applications
+        currently in the model.
+
+        """
+        return self.state.applications
 
     @property
     def machines(self):
-        return self.state.get('machine', {})
+        """Return a map of machine-id:Machine for all machines currently in
+        the model.
+
+        """
+        return self.state.machines
 
     @property
     def units(self):
-        return self.state.get('unit', {})
+        """Return a map of unit-id:Unit for all units currently in
+        the model.
 
-    def add_observer(self, callable_):
+        """
+        return self.state.units
+
+    def add_observer(
+            self, callable_, entity_type=None, action=None, entity_id=None,
+            predicate=None):
         """Register an "on-model-change" callback
 
-        Once a watch is started (Model.watch() is called), ``callable_``
+        Once the model is connected, ``callable_``
         will be called each time the model changes. callable_ should
-        accept the following positional arguments:
+        be Awaitable and accept the following positional arguments:
 
             delta - An instance of :class:`juju.delta.EntityDelta`
                 containing the raw delta data recv'd from the Juju
@@ -130,8 +455,20 @@ class Model(object):
 
             model - The :class:`Model` itself.
 
+        Events for which ``callable_`` is called can be specified by passing
+        entity_type, action, and/or id_ filter criteria, e.g.:
+
+            add_observer(
+                myfunc, entity_type='application', action='add', id_='ubuntu')
+
+        For more complex filtering conditions, pass a predicate function. It
+        will be called with a delta as its only argument. If the predicate
+        function returns True, the callable_ will be called.
+
         """
-        self.observers.add(callable_)
+        observer = _Observer(
+            callable_, entity_type, action, entity_id, predicate)
+        self.observers[observer] = callable_
 
     def _watch(self):
         """Start an asynchronous watch against this model.
@@ -149,12 +486,19 @@ class Model(object):
                     results = await allwatcher.Next()
                     for delta in results.deltas:
                         delta = get_entity_delta(delta)
-                        old_obj, new_obj = self._apply_delta(delta)
-                        self._notify_observers(delta, old_obj, new_obj)
+                        old_obj, new_obj = self.state.apply_delta(delta)
+                        # XXX: Might not want to shield at this level
+                        # We are shielding because when the watcher is
+                        # canceled (on disconnect()), we don't want all of
+                        # its children (every observer callback) to be
+                        # canceled with it. So we shield them. But this means
+                        # they can *never* be canceled.
+                        await asyncio.shield(
+                            self._notify_observers(delta, old_obj, new_obj))
                     self._watch_received.set()
             except CancelledError:
                 log.debug('Closing watcher connection')
-                await asyncio.wait_for(self._watch_conn.close(), None)
+                await self._watch_conn.close()
                 self._watch_shutdown.set()
                 self._watch_conn = None
 
@@ -169,54 +513,74 @@ class Model(object):
         if self._watcher_task:
             self._watcher_task.cancel()
 
-    def _apply_delta(self, delta):
-        """Apply delta to our model state and return the a copy of the
-        affected object as it was before and after the update, e.g.:
+    async def _notify_observers(self, delta, old_obj, new_obj):
+        """Call observing callbacks, notifying them of a change in model state
 
-            old_obj, new_obj = self._apply_delta(delta)
+        :param delta: The raw change from the watcher
+            (:class:`juju.client.overrides.Delta`)
+        :param old_obj: The object in the model that this delta updates.
+            May be None.
+        :param new_obj: The object in the model that is created or updated
+            by applying this delta.
 
-        old_obj may be None if the delta is for the creation of a new object,
-        e.g. a new application or unit is deployed.
+        """
+        if new_obj and not old_obj:
+            delta.type = 'add'
+
+        log.debug(
+            'Model changed: %s %s %s',
+            delta.entity, delta.type, delta.get_id())
 
-        new_obj may be None if no object was created or updated, or if an
-        object was deleted as a result of the delta being applied.
+        for o in self.observers:
+            if o.cares_about(delta):
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
 
+    async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
-        old_obj, new_obj = None, None
+        Block the calling routine until a given action has happened to the
+        given entity
+
+        :param entity_type: The entity's type.
+        :param entity_id: The entity's id.
+        :param action: the type of action (e.g., 'add' or 'change')
+        :param predicate: optional callable that must take as an
+            argument a delta, and must return a boolean, indicating
+            whether the delta contains the specific action we're looking
+            for. For example, you might check to see whether a 'change'
+            has a 'completed' status. See the _Observer class for details.
 
-        if (delta.entity in self.state and
-                delta.get_id() in self.state[delta.entity]):
-            old_obj = self.state[delta.entity][delta.get_id()]
-            if delta.type == 'remove':
-                del self.state[delta.entity][delta.get_id()]
-                return old_obj, new_obj
+        """
+        q = asyncio.Queue(loop=self.loop)
 
-        new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = (
-            self._create_model_entity(delta))
+        async def callback(delta, old, new, model):
+            await q.put(delta.get_id())
 
-        return old_obj, new_obj
+        self.add_observer(callback, entity_type, action, entity_id, predicate)
+        entity_id = await q.get()
+        return self.state._live_entity_map(entity_type)[entity_id]
 
-    def _create_model_entity(self, delta):
-        """Return an object instance representing the entity created or
-        updated by ``delta``
+    async def _wait_for_new(self, entity_type, entity_id, predicate=None):
+        """Wait for a new object to appear in the Model and return it.
+
+        Waits for an object of type ``entity_type`` with id ``entity_id``.
+
+        This coroutine blocks until the new object appears in the model.
 
         """
-        entity_class = delta.get_entity_class()
-        return entity_class(delta.data, self)
+        return await self._wait(entity_type, entity_id, predicate)
 
-    def _notify_observers(self, delta, old_obj, new_obj):
-        """Call observing callbacks, notifying them of a change in model state
+    async def wait_for_action(self, action_id):
+        """Given an action, wait for it to complete."""
 
-        :param delta: The raw change from the watcher
-            (:class:`juju.client.overrides.Delta`)
-        :param old_obj: The object in the model that this delta updates.
-            May be None.
-        :param new_obj: The object in the model that is created or updated
-            by applying this delta.
+        if action_id.startswith("action-"):
+            # if we've been passed action.tag, transform it into the
+            # id that the api deltas will use.
+            action_id = action_id[7:]
 
-        """
-        for o in self.observers:
-            o(delta, old_obj, new_obj, self)
+        def predicate(delta):
+            return delta.data['status'] in ('completed', 'error')
+
+        return await self._wait('action', action_id, 'change', predicate)
 
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
@@ -261,7 +625,24 @@ class Model(object):
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
-        return await app_facade.AddRelation([relation1, relation2])
+        try:
+            result = await app_facade.AddRelation([relation1, relation2])
+        except JujuAPIError as e:
+            if 'relation already exists' not in e.message:
+                raise
+            log.debug(
+                'Relation %s <-> %s already exists', relation1, relation2)
+            # TODO: if relation already exists we should return the
+            # Relation ModelEntity here
+            return None
+
+        def predicate(delta):
+            endpoints = {}
+            for endpoint in delta.data['endpoints']:
+                endpoints[endpoint['application-name']] = endpoint['relation']
+            return endpoints == result.endpoints
+
+        return await self._wait_for_new('relation', None, predicate)
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
@@ -403,16 +784,11 @@ class Model(object):
 
         TODO::
 
-            - entity_url must have a revision; look up latest automatically if
-              not provided by caller
             - service_name is required; fill this in automatically if not
               provided by caller
             - series is required; how do we pick a default?
 
         """
-        if constraints:
-            constraints = client.Value(**constraints)
-
         if to:
             placement = [
                 client.Placement(**p) for p in to
@@ -426,30 +802,50 @@ class Model(object):
                 for k, v in storage.items()
             }
 
+        entity_id = await self.charmstore.entityId(entity_url)
+
         app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
         app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
-        log.debug(
-            'Deploying %s', entity_url)
-
-        await client_facade.AddCharm(channel, entity_url)
-        app = client.ApplicationDeploy(
-            application=service_name,
-            channel=channel,
-            charm_url=entity_url,
-            config=config,
-            constraints=constraints,
-            endpoint_bindings=bind,
-            num_units=num_units,
-            placement=placement,
-            resources=resources,
-            series=series,
-            storage=storage,
-        )
-
-        return await app_facade.Deploy([app])
+        if 'bundle/' in entity_id:
+            handler = BundleHandler(self)
+            await handler.fetch_plan(entity_id)
+            await handler.execute_plan()
+            extant_apps = {app for app in self.applications}
+            pending_apps = set(handler.applications) - extant_apps
+            if pending_apps:
+                # new apps will usually be in the model by now, but if some
+                # haven't made it yet we'll need to wait on them to be added
+                await asyncio.gather(*[
+                    asyncio.ensure_future(
+                        self.model._wait_for_new('application', app_name))
+                    for app_name in pending_apps
+                ])
+            return [app for name, app in self.applications.items()
+                    if name in handler.applications]
+        else:
+            log.debug(
+                'Deploying %s', entity_id)
+
+            await client_facade.AddCharm(channel, entity_id)
+            app = client.ApplicationDeploy(
+                application=service_name,
+                channel=channel,
+                charm_url=entity_id,
+                config=config,
+                constraints=constraints,
+                endpoint_bindings=bind,
+                num_units=num_units,
+                placement=placement,
+                resources=resources,
+                series=series,
+                storage=storage,
+            )
+
+            await app_facade.Deploy([app])
+            return await self._wait_for_new('application', service_name)
 
     def destroy(self):
         """Terminate all machines and resources for this model.
@@ -457,6 +853,21 @@ class Model(object):
         """
         pass
 
+    async def destroy_unit(self, *unit_names):
+        """Destroy units by name.
+
+        """
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        log.debug(
+            'Destroying unit%s %s',
+            's' if len(unit_names) == 1 else '',
+            ' '.join(unit_names))
+
+        return await app_facade.Destroy(self.name)
+    destroy_units = destroy_unit
+
     def get_backup(self, archive_id):
         """Download a backup archive file.
 
@@ -764,3 +1175,248 @@ class Model(object):
 
         """
         pass
+
+    @property
+    def charmstore(self):
+        return self._charmstore
+
+
+class BundleHandler(object):
+    """
+    Handle bundles by using the API to translate bundle YAML into a plan of
+    steps and then dispatching each of those using the API.
+    """
+    def __init__(self, model):
+        self.model = model
+        self.charmstore = model.charmstore
+        self.plan = []
+        self.references = {}
+        self._units_by_app = {}
+        for unit_name, unit in model.units.items():
+            app_units = self._units_by_app.setdefault(unit.application, [])
+            app_units.append(unit_name)
+        self.client_facade = client.ClientFacade()
+        self.client_facade.connect(model.connection)
+        self.app_facade = client.ApplicationFacade()
+        self.app_facade.connect(model.connection)
+        self.ann_facade = client.AnnotationsFacade()
+        self.ann_facade.connect(model.connection)
+
+    async def fetch_plan(self, entity_id):
+        bundle_yaml = await self.charmstore.files(entity_id,
+                                                  filename='bundle.yaml',
+                                                  read_file=True)
+        self.bundle = yaml.safe_load(bundle_yaml)
+        self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
+
+    async def execute_plan(self):
+        for step in self.plan.changes:
+            method = getattr(self, step.method)
+            result = await method(*step.args)
+            self.references[step.id_] = result
+
+    @property
+    def applications(self):
+        return list(self.bundle['services'].keys())
+
+    def resolve(self, reference):
+        if reference and reference.startswith('$'):
+            reference = self.references[reference[1:]]
+        return reference
+
+    async def addCharm(self, charm, series):
+        """
+        :param charm string:
+            Charm holds the URL of the charm to be added.
+
+        :param series string:
+            Series holds the series of the charm to be added
+            if the charm default is not sufficient.
+        """
+        entity_id = await self.charmstore.entityId(charm)
+        log.debug('Adding %s', entity_id)
+        await self.client_facade.AddCharm(None, entity_id)
+        return entity_id
+
+    async def addMachines(self, series, constraints, container_type,
+                          parent_id):
+        """
+        :param series string:
+            Series holds the optional machine OS series.
+
+        :param constraints string:
+            Constraints holds the optional machine constraints.
+
+        :param Container_type string:
+            ContainerType optionally holds the type of the container (for
+            instance ""lxc" or kvm"). It is not specified for top level
+            machines.
+
+        :param parent_id string:
+            ParentId optionally holds a placeholder pointing to another machine
+            change or to a unit change. This value is only specified in the
+            case this machine is a container, in which case also ContainerType
+            is set.
+        """
+        params = client.AddMachineParams(
+            series=series,
+            constraints=constraints,
+            container_type=container_type,
+            parent_id=self.resolve(parent_id),
+        )
+        results = await self.client_facade.AddMachines(params)
+        log.debug('Added new machine %s', results[0].machine)
+        return results[0].machine
+
+    async def addRelation(self, endpoint1, endpoint2):
+        """
+        :param endpoint1 string:
+        :param endpoint2 string:
+            Endpoint1 and Endpoint2 hold relation endpoints in the
+            "application:interface" form, where the application is always a
+            placeholder pointing to an application change, and the interface is
+            optional. Examples are "$deploy-42:web" or just "$deploy-42".
+        """
+        endpoints = [endpoint1, endpoint2]
+        # resolve indirect references
+        for i in range(len(endpoints)):
+            parts = endpoints[i].split(':')
+            parts[0] = self.resolve(parts[0])
+            endpoints[i] = ':'.join(parts)
+
+        log.info('Relating %s <-> %s', *endpoints)
+        return await self.model.add_relation(*endpoints)
+
+    async def deploy(self, charm, series, application, options, constraints,
+                     storage, endpoint_bindings, resources):
+        """
+        :param charm string:
+            Charm holds the URL of the charm to be used to deploy this
+            application.
+
+        :param series string:
+            Series holds the series of the application to be deployed
+            if the charm default is not sufficient.
+
+        :param application string:
+            Application holds the application name.
+
+        :param options map[string]interface{}:
+            Options holds application options.
+
+        :param constraints string:
+            Constraints holds the optional application constraints.
+
+        :param storage map[string]string:
+            Storage holds the optional storage constraints.
+
+        :param endpoint_bindings map[string]string:
+            EndpointBindings holds the optional endpoint bindings
+
+        :param resources map[string]int:
+            Resources identifies the revision to use for each resource
+            of the application's charm.
+        """
+        # resolve indirect references
+        charm = self.resolve(charm)
+        # stringify all config values for API
+        options = {k: str(v) for k, v in options.items()}
+        # build param object
+        app = client.ApplicationDeploy(
+            charm_url=charm,
+            series=series,
+            application=application,
+            config=options,
+            constraints=constraints,
+            storage=storage,
+            endpoint_bindings=endpoint_bindings,
+            resources=resources,
+        )
+        # do the do
+        log.info('Deploying %s', charm)
+        await self.app_facade.Deploy([app])
+        return application
+
+    async def addUnit(self, application, to):
+        """
+        :param application string:
+            Application holds the application placeholder name for which a unit
+            is added.
+
+        :param to string:
+            To holds the optional location where to add the unit, as a
+            placeholder pointing to another unit change or to a machine change.
+        """
+        application = self.resolve(application)
+        placement = self.resolve(to)
+        if self._units_by_app.get(application):
+            # enough units for this application already exist;
+            # claim one, and carry on
+            # NB: this should probably honor placement, but the juju client
+            # doesn't, so we're not bothering, either
+            unit_name = self._units_by_app[application].pop()
+            log.debug('Reusing unit %s for %s', unit_name, application)
+            return self.model.units[unit_name]
+
+        log.debug('Adding new unit for %s%s', application,
+                  ' to %s' % placement if placement else '')
+        return await self.model.applications[application].add_unit(
+            count=1,
+            to=placement,
+        )
+
+    async def expose(self, application):
+        """
+        :param application string:
+            Application holds the placeholder name of the application that must
+            be exposed.
+        """
+        application = self.resolve(application)
+        log.info('Exposing %s', application)
+        return await self.model.applications[application].expose()
+
+    async def setAnnotations(self, id_, entity_type, annotations):
+        """
+        :param id_ string:
+            Id is the placeholder for the application or machine change
+            corresponding to the entity to be annotated.
+
+        :param entity_type EntityType:
+            EntityType holds the type of the entity, "application" or
+            "machine".
+
+        :param annotations map[string]string:
+            Annotations holds the annotations as key/value pairs.
+        """
+        entity_id = self.resolve(id_)
+        try:
+            entity = self.model.state.get_entity(entity_type, entity_id)
+        except KeyError:
+            entity = await self.model._wait_for_new(entity_type, entity_id)
+        return await entity.set_annotations(annotations)
+
+
+class CharmStore(object):
+    """
+    Async wrapper around theblues.charmstore.CharmStore
+    """
+    def __init__(self, loop):
+        self.loop = loop
+        self._cs = charmstore.CharmStore()
+
+    def __getattr__(self, name):
+        """
+        Wrap method calls in coroutines that use run_in_executor to make them
+        async.
+        """
+        attr = getattr(self._cs, name)
+        if not callable(attr):
+            wrapper = partial(getattr, self._cs, name)
+            setattr(self, name, wrapper)
+        else:
+            async def coro(*args, **kwargs):
+                method = partial(attr, *args, **kwargs)
+                return await self.loop.run_in_executor(None, method)
+            setattr(self, name, coro)
+            wrapper = coro
+        return wrapper