X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=tests%2Fintegration%2Ftest_unit.py;h=bb349699faa796a51e5cd81b1d2d0491bd491aed;hb=HEAD;hp=1604c311cb5f403b61e61cf9d94cb4a109f34ed9;hpb=dcdf82bbc1ef310379f746518b2dd3b006353cb3;p=osm%2FN2VC.git diff --git a/tests/integration/test_unit.py b/tests/integration/test_unit.py deleted file mode 100644 index 1604c31..0000000 --- a/tests/integration/test_unit.py +++ /dev/null @@ -1,78 +0,0 @@ -import asyncio -import pytest - -from tempfile import NamedTemporaryFile - -from .. import base - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_run(event_loop): - from juju.action import Action - - async with base.CleanModel() as model: - app = await model.deploy( - 'ubuntu-0', - application_name='ubuntu', - series='trusty', - channel='stable', - ) - - for unit in app.units: - action = await unit.run('unit-get public-address') - assert isinstance(action, Action) - assert 'Stdout' in action.results - break - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_run_action(event_loop): - async def run_action(unit): - # unit.run() returns a juju.action.Action instance - action = await unit.run_action('add-repo', repo='myrepo') - # wait for the action to complete - return await action.wait() - - async with base.CleanModel() as model: - app = await model.deploy( - 'git', - application_name='git', - series='trusty', - channel='stable', - ) - - for unit in app.units: - action = await run_action(unit) - assert action.results == {'dir': '/var/git/myrepo.git'} - break - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_scp(event_loop): - async with base.CleanModel() as model: - app = await model.deploy('ubuntu') - - await asyncio.wait_for( - model.block_until(lambda: app.units), - timeout=60) - unit = app.units[0] - await asyncio.wait_for( - model.block_until(lambda: unit.machine), - timeout=60) - machine = unit.machine - await asyncio.wait_for( - model.block_until(lambda: (machine.status == 'running' and - machine.agent_status == 'started')), - timeout=480) - - with NamedTemporaryFile() as f: - f.write(b'testcontents') - f.flush() - await unit.scp_to(f.name, 'testfile') - - with NamedTemporaryFile() as f: - await unit.scp_from('testfile', f.name) - assert f.read() == b'testcontents'