This example:
1. Connects to the current model
-2. Resets it
-3. Deploy a charm and waits until it's idle
-4. Destroys the unit and application
+2. Deploy a charm and waits until it reports itself active
+3. Destroys the unit and application
"""
-import asyncio
-import logging
+from juju import loop
+from juju.model import Model
-from juju.model import Model, ModelObserver
-
-class MyModelObserver(ModelObserver):
- async def on_unit_add(self, delta, old, new, model):
- logging.info(
- 'New unit added: %s', new.name)
-
- async def on_change(self, delta, old, new, model):
- for unit in model.units.values():
- unit_status = unit.data['agent-status']['current']
- logging.info(
- 'Unit %s status: %s', unit.name, unit_status)
- if unit_status == 'idle':
- logging.info(
- 'Destroying unit %s', unit.name)
- loop.create_task(unit.destroy())
-
- async def on_unit_remove(self, delta, old, new, model):
- app_name = old.application
- app = model.applications[app_name]
- if not app.units:
- logging.info(
- 'Destroying application %s', app.name)
- loop.create_task(app.destroy())
- await model.block_until(
- lambda: len(model.applications) == 0
- )
- await model.disconnect()
- model.loop.stop()
-
-
-async def run():
+async def main():
model = Model()
+ print('Connecting to model')
await model.connect_current()
- await model.reset(force=True)
- model.add_observer(MyModelObserver())
+ try:
+ print('Deploying ubuntu')
+ application = await model.deploy(
+ 'ubuntu-10',
+ application_name='ubuntu',
+ series='trusty',
+ channel='stable',
+ )
+
+ print('Waiting for active')
+ await model.block_until(
+ lambda: all(unit.workload_status == 'active'
+ for unit in application.units))
- await model.deploy(
- 'ubuntu-0',
- application_name='ubuntu',
- series='trusty',
- channel='stable',
- )
+ print('Removing ubuntu')
+ await application.remove()
+ finally:
+ print('Disconnecting from model')
+ await model.disconnect()
-logging.basicConfig(level=logging.DEBUG)
-ws_logger = logging.getLogger('websockets.protocol')
-ws_logger.setLevel(logging.INFO)
-loop = asyncio.get_event_loop()
-loop.set_debug(False)
-loop.create_task(run())
-loop.run_forever()
+loop.run(main())
--- /dev/null
+import asyncio
+import signal
+
+
+def run(*steps):
+ """
+ Helper to run one or more async functions synchronously, with graceful
+ handling of SIGINT / Ctrl-C.
+
+ Returns the return value of the last function.
+ """
+ if not steps:
+ return
+
+ task = None
+ run._sigint = False # function attr to allow setting from closure
+ loop = asyncio.get_event_loop()
+
+ def abort():
+ task.cancel()
+ run._sigint = True
+
+ loop.add_signal_handler(signal.SIGINT, abort)
+ try:
+ for step in steps:
+ task = loop.create_task(step)
+ loop.run_until_complete(asyncio.wait([task], loop=loop))
+ if run._sigint:
+ raise KeyboardInterrupt()
+ if task.exception():
+ raise task.exception()
+ return task.result()
+ finally:
+ loop.remove_signal_handler(signal.SIGINT)
--- /dev/null
+import unittest
+import juju.loop
+
+
+class TestLoop(unittest.TestCase):
+ def test_run(self):
+ async def _test():
+ return 'success'
+ self.assertEqual(juju.loop.run(_test()), 'success')
+
+ def test_run_interrupt(self):
+ async def _test():
+ juju.loop.run._sigint = True
+ self.assertRaises(KeyboardInterrupt, juju.loop.run, _test())
+
+ def test_run_exception(self):
+ async def _test():
+ raise ValueError()
+ self.assertRaises(ValueError, juju.loop.run, _test())