X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=tests%2Fintegration%2Ftest_machine.py;fp=tests%2Fintegration%2Ftest_machine.py;h=070208a568fbe639fde52a1235a03cfbbfab0f51;hb=b8a8281b1785358bd5632a119c016f21811172c6;hp=60de035244f75b946be3619e618b3bc9d44b2d7a;hpb=dcdf82bbc1ef310379f746518b2dd3b006353cb3;p=osm%2FN2VC.git diff --git a/tests/integration/test_machine.py b/tests/integration/test_machine.py index 60de035..070208a 100644 --- a/tests/integration/test_machine.py +++ b/tests/integration/test_machine.py @@ -1,8 +1,8 @@ import asyncio -import pytest - from tempfile import NamedTemporaryFile +import pytest + from .. import base @@ -26,21 +26,24 @@ async def test_status(event_loop): assert machine.agent_status == 'pending' assert not machine.agent_version + # there is some inconsistency in the capitalization of status_message + # between different providers await asyncio.wait_for( - model.block_until(lambda: (machine.status == 'running' and - machine.agent_status == 'started')), + model.block_until( + lambda: (machine.status == 'running' and + machine.status_message.lower() == 'running' and + machine.agent_status == 'started')), timeout=480) - assert machine.status == 'running' - # there is some inconsistency in the message case between providers - assert machine.status_message.lower() == 'running' - assert machine.agent_status == 'started' - assert machine.agent_version.major >= 2 - @base.bootstrapped @pytest.mark.asyncio async def test_scp(event_loop): + # ensure that asyncio.subprocess will work; + try: + asyncio.get_child_watcher().attach_loop(event_loop) + except RuntimeError: + pytest.skip('test_scp will always fail outside of MainThread') async with base.CleanModel() as model: await model.add_machine() await asyncio.wait_for( @@ -55,8 +58,8 @@ async def test_scp(event_loop): with NamedTemporaryFile() as f: f.write(b'testcontents') f.flush() - await machine.scp_to(f.name, 'testfile') + await machine.scp_to(f.name, 'testfile', scp_opts='-p') with NamedTemporaryFile() as f: - await machine.scp_from('testfile', f.name) + await machine.scp_from('testfile', f.name, scp_opts='-p') assert f.read() == b'testcontents'