From b89f2fff0ca59f55a5caed03d15e34884721c88b Mon Sep 17 00:00:00 2001 From: Pete Vander Giessen Date: Wed, 2 Nov 2016 15:10:27 -0400 Subject: [PATCH] Addressed PR comments. Made a more generic _wait method in the model, and wrapped _wait_for_new, and a new public method called wait_for_action around it. unit.run now returns an action object, with .results that can be unpacked. --- examples/unitrun.py | 4 ++-- juju/model.py | 42 ++++++++++++++++++++++++++++++++++++------ juju/unit.py | 36 +----------------------------------- 3 files changed, 39 insertions(+), 43 deletions(-) diff --git a/examples/unitrun.py b/examples/unitrun.py index d283e36..a5b294b 100644 --- a/examples/unitrun.py +++ b/examples/unitrun.py @@ -16,9 +16,9 @@ async def run_stuff_on_unit(unit): print('Running command on unit', unit.name) # unit.run() returns a client.ActionResults instance - stdout, stderr, code = await unit.run('unit-get public-address') + action = await unit.run('unit-get public-address') - print('Unit public address is', stdout) + print("Action results: {}".format(action.results)) # Inform asyncio that we're done. await unit.model.disconnect() diff --git a/juju/model.py b/juju/model.py index 52721a7..b74a6c8 100644 --- a/juju/model.py +++ b/juju/model.py @@ -535,6 +535,30 @@ class Model(object): if o.cares_about(delta): asyncio.ensure_future(o(delta, old_obj, new_obj, self)) + async def _wait(self, entity_type, entity_id, action, predicate=None): + """ + Block the calling routine until a given action has happened to the + given entity + + :param entity_type: The entity's type. + :param entity_id: The entity's id. + :param action: the type of action (e.g., 'add' or 'change') + :param predicate: optional callable that must take as an + argument a delta, and must return a boolean, indicating + whether the delta contains the specific action we're looking + for. For example, you might check to see whether a 'change' + has a 'completed' status. See the _Observer class for details. + + """ + q = asyncio.Queue(loop=self.loop) + + async def callback(delta, old, new, model): + await q.put(delta.get_id()) + + self.add_observer(callback, entity_type, action, entity_id, predicate) + entity_id = await q.get() + return self.state._live_entity_map(entity_type)[entity_id] + async def _wait_for_new(self, entity_type, entity_id, predicate=None): """Wait for a new object to appear in the Model and return it. @@ -543,14 +567,20 @@ class Model(object): This coroutine blocks until the new object appears in the model. """ - entity_added = asyncio.Queue(loop=self.loop) + return await self._wait(entity_type, entity_id, predicate) - async def callback(delta, old, new, model): - await entity_added.put(delta.get_id()) + async def wait_for_action(self, action_id): + """Given an action, wait for it to complete.""" - self.add_observer(callback, entity_type, 'add', entity_id, predicate) - entity_id = await entity_added.get() - return self.state._live_entity_map(entity_type)[entity_id] + if action_id.startswith("action-"): + # if we've been passed action.tag, transform it into the + # id that the api deltas will use. + action_id = action_id[7:] + + def predicate(delta): + return delta.data['status'] in ('completed', 'error') + + return await self._wait('action', action_id, 'change', predicate) def add_machine( self, spec=None, constraints=None, disks=None, series=None, diff --git a/juju/unit.py b/juju/unit.py index 6a853c2..3dbc1e9 100644 --- a/juju/unit.py +++ b/juju/unit.py @@ -119,34 +119,6 @@ class Unit(model.ModelEntity): log.debug( 'Running `%s` on %s', command, self.name) - action_status = asyncio.Queue(loop=self.model.loop) - tag = None - - async def wait_for_tag(): - while tag is None: - asyncio.sleep(0.1) - return tag - - async def callback(delta, old, new, model): - # Wait until we have something to report - if not new: - return - - # Verify that we have the the right action. - tag = await wait_for_tag() - if not new.id in tag: - return - - # Wait until the action has completed, or errored out. - if new.status not in ['completed', 'error']: - return - - # Put the action in our queue, so that we can fetch it - # with the await below. - await action_status.put(new) - - self.model.add_observer(callback, 'action', None) - res = await action.Run( [], command, @@ -154,13 +126,7 @@ class Unit(model.ModelEntity): timeout, [self.name], ) - tag = res.results[0].action.tag # Set the tag for our waiter above. - ret = await action_status.get() # Wait for our callback to fire - return ( - ret.results['Stdout'], - ret.results['Stderr'], - ret.results['Code'] - ) + return await self.model.wait_for_action(res.results[0].action.tag) def run_action(self, action_name, **params): """Run action on this unit. -- 2.17.1