blob: 8b2251ccea8f707f87f00d04ba7095c16d93c834 [file] [log] [blame]
Adam Israeldcdf82b2017-08-15 15:26:43 -04001import asyncio
Adam Israeldcdf82b2017-08-15 15:26:43 -04002from tempfile import NamedTemporaryFile
3
Adam Israelc3e6c2e2018-03-01 09:31:50 -05004import pytest
5
Adam Israeldcdf82b2017-08-15 15:26:43 -04006from .. import base
7
8
9@base.bootstrapped
10@pytest.mark.asyncio
11async def test_run(event_loop):
12 from juju.action import Action
13
14 async with base.CleanModel() as model:
15 app = await model.deploy(
16 'ubuntu-0',
17 application_name='ubuntu',
18 series='trusty',
19 channel='stable',
20 )
21
22 for unit in app.units:
23 action = await unit.run('unit-get public-address')
24 assert isinstance(action, Action)
25 assert 'Stdout' in action.results
26 break
27
28
29@base.bootstrapped
30@pytest.mark.asyncio
31async def test_run_action(event_loop):
32 async def run_action(unit):
33 # unit.run() returns a juju.action.Action instance
34 action = await unit.run_action('add-repo', repo='myrepo')
35 # wait for the action to complete
36 return await action.wait()
37
38 async with base.CleanModel() as model:
39 app = await model.deploy(
40 'git',
41 application_name='git',
42 series='trusty',
43 channel='stable',
44 )
45
46 for unit in app.units:
47 action = await run_action(unit)
48 assert action.results == {'dir': '/var/git/myrepo.git'}
49 break
50
51
52@base.bootstrapped
53@pytest.mark.asyncio
54async def test_scp(event_loop):
Adam Israelc3e6c2e2018-03-01 09:31:50 -050055 # ensure that asyncio.subprocess will work;
56 try:
57 asyncio.get_child_watcher().attach_loop(event_loop)
58 except RuntimeError:
59 pytest.skip('test_scp will always fail outside of MainThread')
Adam Israeldcdf82b2017-08-15 15:26:43 -040060 async with base.CleanModel() as model:
61 app = await model.deploy('ubuntu')
62
63 await asyncio.wait_for(
64 model.block_until(lambda: app.units),
65 timeout=60)
66 unit = app.units[0]
67 await asyncio.wait_for(
68 model.block_until(lambda: unit.machine),
69 timeout=60)
70 machine = unit.machine
71 await asyncio.wait_for(
72 model.block_until(lambda: (machine.status == 'running' and
73 machine.agent_status == 'started')),
74 timeout=480)
75
76 with NamedTemporaryFile() as f:
77 f.write(b'testcontents')
78 f.flush()
79 await unit.scp_to(f.name, 'testfile')
80
81 with NamedTemporaryFile() as f:
82 await unit.scp_from('testfile', f.name)
83 assert f.read() == b'testcontents'