From: Pete Vander Giessen Date: Tue, 1 Nov 2016 21:28:21 +0000 (-0400) Subject: Added observer + callback for unit.run. X-Git-Tag: 0.1.0~57^2~2 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=commitdiff_plain;h=723e8549e181796e84b9a436c7f27cbbc7a39552;hp=bccd5c73a2f02a768612c7aec318c910fceda299 Added observer + callback for unit.run. Allows us to wait until an action has actually been completed before returning a result. --- diff --git a/examples/unitrun.py b/examples/unitrun.py index 0f8b556..d283e36 100644 --- a/examples/unitrun.py +++ b/examples/unitrun.py @@ -12,30 +12,23 @@ import logging from juju.model import Model, ModelObserver - async def run_stuff_on_unit(unit): print('Running command on unit', unit.name) # unit.run() returns a client.ActionResults instance - action_results = await unit.run('unit-get public-address') - action_result = action_results.results[0] + stdout, stderr, code = await unit.run('unit-get public-address') + + print('Unit public address is', stdout) - print('Results from unit', unit.name) - print(action_result.__dict__) + # Inform asyncio that we're done. + await unit.model.disconnect() + unit.model.loop.stop() class MyModelObserver(ModelObserver): async def on_unit_add(self, delta, old, new, model): loop.create_task(run_stuff_on_unit(new)) - async def on_action_change(self, delta, old, new, model): - print(delta.data) - - action = new - if action.status == 'completed': - await action.model.disconnect() - action.model.loop.stop() - async def run(): model = Model() diff --git a/juju/unit.py b/juju/unit.py index d0bbd32..6a853c2 100644 --- a/juju/unit.py +++ b/juju/unit.py @@ -1,3 +1,4 @@ +import asyncio import logging from datetime import datetime @@ -108,6 +109,9 @@ class Unit(model.ModelEntity): :param str command: The command to run :param int timeout: Time to wait before command is considered failed + Returns a tuple containing the stdout, stderr, and return code + from the command. + """ action = client.ActionFacade() action.connect(self.connection) @@ -115,14 +119,48 @@ class Unit(model.ModelEntity): log.debug( 'Running `%s` on %s', command, self.name) - # TODO this should return an Action - return await action.Run( + action_status = asyncio.Queue(loop=self.model.loop) + tag = None + + async def wait_for_tag(): + while tag is None: + asyncio.sleep(0.1) + return tag + + async def callback(delta, old, new, model): + # Wait until we have something to report + if not new: + return + + # Verify that we have the the right action. + tag = await wait_for_tag() + if not new.id in tag: + return + + # Wait until the action has completed, or errored out. + if new.status not in ['completed', 'error']: + return + + # Put the action in our queue, so that we can fetch it + # with the await below. + await action_status.put(new) + + self.model.add_observer(callback, 'action', None) + + res = await action.Run( [], command, [], timeout, [self.name], ) + tag = res.results[0].action.tag # Set the tag for our waiter above. + ret = await action_status.get() # Wait for our callback to fire + return ( + ret.results['Stdout'], + ret.results['Stderr'], + ret.results['Code'] + ) def run_action(self, action_name, **params): """Run action on this unit.