Fixed status to check for for failed messages in unit.run
[osm/N2VC.git] / juju / model.py
index b168987..953aa8f 100644 (file)
@@ -6,6 +6,7 @@ import weakref
 from concurrent.futures import CancelledError
 from functools import partial
 
+import yaml
 from theblues import charmstore
 
 from .client import client
@@ -534,6 +535,30 @@ class Model(object):
             if o.cares_about(delta):
                 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
 
+    async def _wait(self, entity_type, entity_id, action, predicate=None):
+        """
+        Block the calling routine until a given action has happened to the
+        given entity
+
+        :param entity_type: The entity's type.
+        :param entity_id: The entity's id.
+        :param action: the type of action (e.g., 'add' or 'change')
+        :param predicate: optional callable that must take as an
+            argument a delta, and must return a boolean, indicating
+            whether the delta contains the specific action we're looking
+            for. For example, you might check to see whether a 'change'
+            has a 'completed' status. See the _Observer class for details.
+
+        """
+        q = asyncio.Queue(loop=self.loop)
+
+        async def callback(delta, old, new, model):
+            await q.put(delta.get_id())
+
+        self.add_observer(callback, entity_type, action, entity_id, predicate)
+        entity_id = await q.get()
+        return self.state._live_entity_map(entity_type)[entity_id]
+
     async def _wait_for_new(self, entity_type, entity_id, predicate=None):
         """Wait for a new object to appear in the Model and return it.
 
@@ -542,14 +567,20 @@ class Model(object):
         This coroutine blocks until the new object appears in the model.
 
         """
-        entity_added = asyncio.Queue(loop=self.loop)
+        return await self._wait(entity_type, entity_id, 'add', predicate)
 
-        async def callback(delta, old, new, model):
-            await entity_added.put(delta.get_id())
+    async def wait_for_action(self, action_id):
+        """Given an action, wait for it to complete."""
 
-        self.add_observer(callback, entity_type, 'add', entity_id, predicate)
-        entity_id = await entity_added.get()
-        return self.state._live_entity_map(entity_type)[entity_id]
+        if action_id.startswith("action-"):
+            # if we've been passed action.tag, transform it into the
+            # id that the api deltas will use.
+            action_id = action_id[7:]
+
+        def predicate(delta):
+            return delta.data['status'] in ('completed', 'failed')
+
+        return await self._wait('action', action_id, 'change', predicate)
 
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
@@ -758,9 +789,6 @@ class Model(object):
             - series is required; how do we pick a default?
 
         """
-        if constraints:
-            constraints = client.Value(**constraints)
-
         if to:
             placement = [
                 client.Placement(**p) for p in to
@@ -785,6 +813,18 @@ class Model(object):
             handler = BundleHandler(self)
             await handler.fetch_plan(entity_id)
             await handler.execute_plan()
+            extant_apps = {app for app in self.applications}
+            pending_apps = set(handler.applications) - extant_apps
+            if pending_apps:
+                # new apps will usually be in the model by now, but if some
+                # haven't made it yet we'll need to wait on them to be added
+                await asyncio.gather(*[
+                    asyncio.ensure_future(
+                        self.model._wait_for_new('application', app_name))
+                    for app_name in pending_apps
+                ])
+            return [app for name, app in self.applications.items()
+                    if name in handler.applications]
         else:
             log.debug(
                 'Deploying %s', entity_id)
@@ -813,6 +853,21 @@ class Model(object):
         """
         pass
 
+    async def destroy_unit(self, *unit_names):
+        """Destroy units by name.
+
+        """
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        log.debug(
+            'Destroying unit%s %s',
+            's' if len(unit_names) == 1 else '',
+            ' '.join(unit_names))
+
+        return await app_facade.Destroy(self.name)
+    destroy_units = destroy_unit
+
     def get_backup(self, archive_id):
         """Download a backup archive file.
 
@@ -1148,10 +1203,11 @@ class BundleHandler(object):
         self.ann_facade.connect(model.connection)
 
     async def fetch_plan(self, entity_id):
-        yaml = await self.charmstore.files(entity_id,
-                                           filename='bundle.yaml',
-                                           read_file=True)
-        self.plan = await self.client_facade.GetBundleChanges(yaml)
+        bundle_yaml = await self.charmstore.files(entity_id,
+                                                  filename='bundle.yaml',
+                                                  read_file=True)
+        self.bundle = yaml.safe_load(bundle_yaml)
+        self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
 
     async def execute_plan(self):
         for step in self.plan.changes:
@@ -1159,6 +1215,10 @@ class BundleHandler(object):
             result = await method(*step.args)
             self.references[step.id_] = result
 
+    @property
+    def applications(self):
+        return list(self.bundle['services'].keys())
+
     def resolve(self, reference):
         if reference and reference.startswith('$'):
             reference = self.references[reference[1:]]
@@ -1224,6 +1284,7 @@ class BundleHandler(object):
             parts[0] = self.resolve(parts[0])
             endpoints[i] = ':'.join(parts)
 
+        log.info('Relating %s <-> %s', *endpoints)
         return await self.model.add_relation(*endpoints)
 
     async def deploy(self, charm, series, application, options, constraints,
@@ -1272,7 +1333,7 @@ class BundleHandler(object):
             resources=resources,
         )
         # do the do
-        log.debug('Deploying %s', charm)
+        log.info('Deploying %s', charm)
         await self.app_facade.Deploy([app])
         return application
 
@@ -1297,6 +1358,8 @@ class BundleHandler(object):
             log.debug('Reusing unit %s for %s', unit_name, application)
             return self.model.units[unit_name]
 
+        log.debug('Adding new unit for %s%s', application,
+                  ' to %s' % placement if placement else '')
         return await self.model.applications[application].add_unit(
             count=1,
             to=placement,
@@ -1309,6 +1372,7 @@ class BundleHandler(object):
             be exposed.
         """
         application = self.resolve(application)
+        log.info('Exposing %s', application)
         return await self.model.applications[application].expose()
 
     async def setAnnotations(self, id_, entity_type, annotations):
@@ -1325,13 +1389,11 @@ class BundleHandler(object):
             Annotations holds the annotations as key/value pairs.
         """
         entity_id = self.resolve(id_)
-        log.debug('Updating annotations of %s', entity_id)
-        ann = client.EntityAnnotations(
-            entity=entity_id,
-            annotations=annotations,
-        )
-        await self.ann_facade.Set([ann])
-        return None
+        try:
+            entity = self.model.state.get_entity(entity_type, entity_id)
+        except KeyError:
+            entity = await self.model._wait_for_new(entity_type, entity_id)
+        return await entity.set_annotations(annotations)
 
 
 class CharmStore(object):