Addressed PR comments.
authorPete Vander Giessen <petevg@gmail.com>
Wed, 2 Nov 2016 19:10:27 +0000 (15:10 -0400)
committerPete Vander Giessen <petevg@gmail.com>
Wed, 2 Nov 2016 19:10:27 +0000 (15:10 -0400)
Made a more generic _wait method in the model, and wrapped
_wait_for_new, and a new public method called wait_for_action around it.

unit.run now returns an action object, with .results that can be unpacked.

examples/unitrun.py
juju/model.py
juju/unit.py

index d283e36..a5b294b 100644 (file)
@@ -16,9 +16,9 @@ async def run_stuff_on_unit(unit):
     print('Running command on unit', unit.name)
 
     # unit.run() returns a client.ActionResults instance
-    stdout, stderr, code = await unit.run('unit-get public-address')
+    action = await unit.run('unit-get public-address')
 
-    print('Unit public address is', stdout)
+    print("Action results: {}".format(action.results))
 
     # Inform asyncio that we're done.
     await unit.model.disconnect()
index 52721a7..b74a6c8 100644 (file)
@@ -535,6 +535,30 @@ class Model(object):
             if o.cares_about(delta):
                 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
 
+    async def _wait(self, entity_type, entity_id, action, predicate=None):
+        """
+        Block the calling routine until a given action has happened to the
+        given entity
+
+        :param entity_type: The entity's type.
+        :param entity_id: The entity's id.
+        :param action: the type of action (e.g., 'add' or 'change')
+        :param predicate: optional callable that must take as an
+            argument a delta, and must return a boolean, indicating
+            whether the delta contains the specific action we're looking
+            for. For example, you might check to see whether a 'change'
+            has a 'completed' status. See the _Observer class for details.
+
+        """
+        q = asyncio.Queue(loop=self.loop)
+
+        async def callback(delta, old, new, model):
+            await q.put(delta.get_id())
+
+        self.add_observer(callback, entity_type, action, entity_id, predicate)
+        entity_id = await q.get()
+        return self.state._live_entity_map(entity_type)[entity_id]
+
     async def _wait_for_new(self, entity_type, entity_id, predicate=None):
         """Wait for a new object to appear in the Model and return it.
 
@@ -543,14 +567,20 @@ class Model(object):
         This coroutine blocks until the new object appears in the model.
 
         """
-        entity_added = asyncio.Queue(loop=self.loop)
+        return await self._wait(entity_type, entity_id, predicate)
 
-        async def callback(delta, old, new, model):
-            await entity_added.put(delta.get_id())
+    async def wait_for_action(self, action_id):
+        """Given an action, wait for it to complete."""
 
-        self.add_observer(callback, entity_type, 'add', entity_id, predicate)
-        entity_id = await entity_added.get()
-        return self.state._live_entity_map(entity_type)[entity_id]
+        if action_id.startswith("action-"):
+            # if we've been passed action.tag, transform it into the
+            # id that the api deltas will use.
+            action_id = action_id[7:]
+
+        def predicate(delta):
+            return delta.data['status'] in ('completed', 'error')
+
+        return await self._wait('action', action_id, 'change', predicate)
 
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
index 6a853c2..3dbc1e9 100644 (file)
@@ -119,34 +119,6 @@ class Unit(model.ModelEntity):
         log.debug(
             'Running `%s` on %s', command, self.name)
 
-        action_status = asyncio.Queue(loop=self.model.loop)
-        tag = None
-
-        async def wait_for_tag():
-            while tag is None:
-                asyncio.sleep(0.1)
-            return tag
-
-        async def callback(delta, old, new, model):
-            # Wait until we have something to report
-            if not new:
-                return
-
-            # Verify that we have the the right action.
-            tag = await wait_for_tag()
-            if not new.id in tag:
-                return
-
-            # Wait until the action has completed, or errored out.
-            if new.status not in ['completed', 'error']:
-                return
-
-            # Put the action in our queue, so that we can fetch it
-            # with the await below.
-            await action_status.put(new)
-
-        self.model.add_observer(callback, 'action', None)
-
         res = await action.Run(
             [],
             command,
@@ -154,13 +126,7 @@ class Unit(model.ModelEntity):
             timeout,
             [self.name],
         )
-        tag = res.results[0].action.tag  # Set the tag for our waiter above.
-        ret = await action_status.get()  # Wait for our callback to fire
-        return (
-            ret.results['Stdout'],
-            ret.results['Stderr'],
-            ret.results['Code']
-        )
+        return await self.model.wait_for_action(res.results[0].action.tag)
 
     def run_action(self, action_name, **params):
         """Run action on this unit.