Added observer + callback for unit.run.
authorPete Vander Giessen <petevg@gmail.com>
Tue, 1 Nov 2016 21:28:21 +0000 (17:28 -0400)
committerPete Vander Giessen <petevg@gmail.com>
Wed, 2 Nov 2016 15:06:43 +0000 (11:06 -0400)
Allows us to wait until an action has actually been completed before
returning a result.

examples/unitrun.py
juju/unit.py

index 0f8b556..d283e36 100644 (file)
@@ -12,30 +12,23 @@ import logging
 
 from juju.model import Model, ModelObserver
 
-
 async def run_stuff_on_unit(unit):
     print('Running command on unit', unit.name)
 
     # unit.run() returns a client.ActionResults instance
-    action_results = await unit.run('unit-get public-address')
-    action_result = action_results.results[0]
+    stdout, stderr, code = await unit.run('unit-get public-address')
+
+    print('Unit public address is', stdout)
 
-    print('Results from unit', unit.name)
-    print(action_result.__dict__)
+    # Inform asyncio that we're done.
+    await unit.model.disconnect()
+    unit.model.loop.stop()
 
 
 class MyModelObserver(ModelObserver):
     async def on_unit_add(self, delta, old, new, model):
         loop.create_task(run_stuff_on_unit(new))
 
-    async def on_action_change(self, delta, old, new, model):
-        print(delta.data)
-
-        action = new
-        if action.status == 'completed':
-            await action.model.disconnect()
-            action.model.loop.stop()
-
 
 async def run():
     model = Model()
index d0bbd32..6a853c2 100644 (file)
@@ -1,3 +1,4 @@
+import asyncio
 import logging
 from datetime import datetime
 
@@ -108,6 +109,9 @@ class Unit(model.ModelEntity):
         :param str command: The command to run
         :param int timeout: Time to wait before command is considered failed
 
+        Returns a tuple containing the stdout, stderr, and return code
+        from the command.
+
         """
         action = client.ActionFacade()
         action.connect(self.connection)
@@ -115,14 +119,48 @@ class Unit(model.ModelEntity):
         log.debug(
             'Running `%s` on %s', command, self.name)
 
-        # TODO this should return an Action
-        return await action.Run(
+        action_status = asyncio.Queue(loop=self.model.loop)
+        tag = None
+
+        async def wait_for_tag():
+            while tag is None:
+                asyncio.sleep(0.1)
+            return tag
+
+        async def callback(delta, old, new, model):
+            # Wait until we have something to report
+            if not new:
+                return
+
+            # Verify that we have the the right action.
+            tag = await wait_for_tag()
+            if not new.id in tag:
+                return
+
+            # Wait until the action has completed, or errored out.
+            if new.status not in ['completed', 'error']:
+                return
+
+            # Put the action in our queue, so that we can fetch it
+            # with the await below.
+            await action_status.put(new)
+
+        self.model.add_observer(callback, 'action', None)
+
+        res = await action.Run(
             [],
             command,
             [],
             timeout,
             [self.name],
         )
+        tag = res.results[0].action.tag  # Set the tag for our waiter above.
+        ret = await action_status.get()  # Wait for our callback to fire
+        return (
+            ret.results['Stdout'],
+            ret.results['Stderr'],
+            ret.results['Code']
+        )
 
     def run_action(self, action_name, **params):
         """Run action on this unit.