from juju.model import Model, ModelObserver
-
async def run_stuff_on_unit(unit):
print('Running command on unit', unit.name)
# unit.run() returns a client.ActionResults instance
- action_results = await unit.run('unit-get public-address')
- action_result = action_results.results[0]
+ stdout, stderr, code = await unit.run('unit-get public-address')
+
+ print('Unit public address is', stdout)
- print('Results from unit', unit.name)
- print(action_result.__dict__)
+ # Inform asyncio that we're done.
+ await unit.model.disconnect()
+ unit.model.loop.stop()
class MyModelObserver(ModelObserver):
async def on_unit_add(self, delta, old, new, model):
loop.create_task(run_stuff_on_unit(new))
- async def on_action_change(self, delta, old, new, model):
- print(delta.data)
-
- action = new
- if action.status == 'completed':
- await action.model.disconnect()
- action.model.loop.stop()
-
async def run():
model = Model()
+import asyncio
import logging
from datetime import datetime
:param str command: The command to run
:param int timeout: Time to wait before command is considered failed
+ Returns a tuple containing the stdout, stderr, and return code
+ from the command.
+
"""
action = client.ActionFacade()
action.connect(self.connection)
log.debug(
'Running `%s` on %s', command, self.name)
- # TODO this should return an Action
- return await action.Run(
+ action_status = asyncio.Queue(loop=self.model.loop)
+ tag = None
+
+ async def wait_for_tag():
+ while tag is None:
+ asyncio.sleep(0.1)
+ return tag
+
+ async def callback(delta, old, new, model):
+ # Wait until we have something to report
+ if not new:
+ return
+
+ # Verify that we have the the right action.
+ tag = await wait_for_tag()
+ if not new.id in tag:
+ return
+
+ # Wait until the action has completed, or errored out.
+ if new.status not in ['completed', 'error']:
+ return
+
+ # Put the action in our queue, so that we can fetch it
+ # with the await below.
+ await action_status.put(new)
+
+ self.model.add_observer(callback, 'action', None)
+
+ res = await action.Run(
[],
command,
[],
timeout,
[self.name],
)
+ tag = res.results[0].action.tag # Set the tag for our waiter above.
+ ret = await action_status.get() # Wait for our callback to fire
+ return (
+ ret.results['Stdout'],
+ ret.results['Stderr'],
+ ret.results['Code']
+ )
def run_action(self, action_name, **params):
"""Run action on this unit.