if o.cares_about(delta):
asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ async def _wait(self, entity_type, entity_id, action, predicate=None):
+ """
+ Block the calling routine until a given action has happened to the
+ given entity
+
+ :param entity_type: The entity's type.
+ :param entity_id: The entity's id.
+ :param action: the type of action (e.g., 'add' or 'change')
+ :param predicate: optional callable that must take as an
+ argument a delta, and must return a boolean, indicating
+ whether the delta contains the specific action we're looking
+ for. For example, you might check to see whether a 'change'
+ has a 'completed' status. See the _Observer class for details.
+
+ """
+ q = asyncio.Queue(loop=self.loop)
+
+ async def callback(delta, old, new, model):
+ await q.put(delta.get_id())
+
+ self.add_observer(callback, entity_type, action, entity_id, predicate)
+ entity_id = await q.get()
+ return self.state._live_entity_map(entity_type)[entity_id]
+
async def _wait_for_new(self, entity_type, entity_id, predicate=None):
"""Wait for a new object to appear in the Model and return it.
This coroutine blocks until the new object appears in the model.
"""
- entity_added = asyncio.Queue(loop=self.loop)
+ return await self._wait(entity_type, entity_id, predicate)
- async def callback(delta, old, new, model):
- await entity_added.put(delta.get_id())
+ async def wait_for_action(self, action_id):
+ """Given an action, wait for it to complete."""
- self.add_observer(callback, entity_type, 'add', entity_id, predicate)
- entity_id = await entity_added.get()
- return self.state._live_entity_map(entity_type)[entity_id]
+ if action_id.startswith("action-"):
+ # if we've been passed action.tag, transform it into the
+ # id that the api deltas will use.
+ action_id = action_id[7:]
+
+ def predicate(delta):
+ return delta.data['status'] in ('completed', 'error')
+
+ return await self._wait('action', action_id, 'change', predicate)
def add_machine(
self, spec=None, constraints=None, disks=None, series=None,
log.debug(
'Running `%s` on %s', command, self.name)
- action_status = asyncio.Queue(loop=self.model.loop)
- tag = None
-
- async def wait_for_tag():
- while tag is None:
- asyncio.sleep(0.1)
- return tag
-
- async def callback(delta, old, new, model):
- # Wait until we have something to report
- if not new:
- return
-
- # Verify that we have the the right action.
- tag = await wait_for_tag()
- if not new.id in tag:
- return
-
- # Wait until the action has completed, or errored out.
- if new.status not in ['completed', 'error']:
- return
-
- # Put the action in our queue, so that we can fetch it
- # with the await below.
- await action_status.put(new)
-
- self.model.add_observer(callback, 'action', None)
-
res = await action.Run(
[],
command,
timeout,
[self.name],
)
- tag = res.results[0].action.tag # Set the tag for our waiter above.
- ret = await action_status.get() # Wait for our callback to fire
- return (
- ret.results['Stdout'],
- ret.results['Stderr'],
- ret.results['Code']
- )
+ return await self.model.wait_for_action(res.results[0].action.tag)
def run_action(self, action_name, **params):
"""Run action on this unit.