blob: 1604c311cb5f403b61e61cf9d94cb4a109f34ed9 [file] [log] [blame]
import asyncio
import pytest
from tempfile import NamedTemporaryFile
from .. import base
@base.bootstrapped
@pytest.mark.asyncio
async def test_run(event_loop):
from juju.action import Action
async with base.CleanModel() as model:
app = await model.deploy(
'ubuntu-0',
application_name='ubuntu',
series='trusty',
channel='stable',
)
for unit in app.units:
action = await unit.run('unit-get public-address')
assert isinstance(action, Action)
assert 'Stdout' in action.results
break
@base.bootstrapped
@pytest.mark.asyncio
async def test_run_action(event_loop):
async def run_action(unit):
# unit.run() returns a juju.action.Action instance
action = await unit.run_action('add-repo', repo='myrepo')
# wait for the action to complete
return await action.wait()
async with base.CleanModel() as model:
app = await model.deploy(
'git',
application_name='git',
series='trusty',
channel='stable',
)
for unit in app.units:
action = await run_action(unit)
assert action.results == {'dir': '/var/git/myrepo.git'}
break
@base.bootstrapped
@pytest.mark.asyncio
async def test_scp(event_loop):
async with base.CleanModel() as model:
app = await model.deploy('ubuntu')
await asyncio.wait_for(
model.block_until(lambda: app.units),
timeout=60)
unit = app.units[0]
await asyncio.wait_for(
model.block_until(lambda: unit.machine),
timeout=60)
machine = unit.machine
await asyncio.wait_for(
model.block_until(lambda: (machine.status == 'running' and
machine.agent_status == 'started')),
timeout=480)
with NamedTemporaryFile() as f:
f.write(b'testcontents')
f.flush()
await unit.scp_to(f.name, 'testfile')
with NamedTemporaryFile() as f:
await unit.scp_from('testfile', f.name)
assert f.read() == b'testcontents'