X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=modules%2Flibjuju%2Ftests%2Fintegration%2Ftest_model.py;fp=modules%2Flibjuju%2Ftests%2Fintegration%2Ftest_model.py;h=0000000000000000000000000000000000000000;hb=5aa242fe2d61c2da9fdb29ed65029f3e27b6966d;hp=58766b4c5bbd5eb2bae04203355de67830a920a9;hpb=e2051cca7dac12aa09f6ed33555dcc4548c4b52b;p=osm%2FN2VC.git diff --git a/modules/libjuju/tests/integration/test_model.py b/modules/libjuju/tests/integration/test_model.py deleted file mode 100644 index 58766b4..0000000 --- a/modules/libjuju/tests/integration/test_model.py +++ /dev/null @@ -1,485 +0,0 @@ -import asyncio -import mock -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -import paramiko - -from juju.client.client import ConfigValue, ApplicationFacade -from juju.model import Model, ModelObserver -from juju.utils import block_until, run_with_interrupt -from juju.errors import JujuError - -import os -import pylxd -import time -import uuid - -import pytest - -from .. import base - - -MB = 1 -GB = 1024 -SSH_KEY = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCsYMJGNGG74HAJha3n2CFmWYsOOaORnJK6VqNy86pj0MIpvRXBzFzVy09uPQ66GOQhTEoJHEqE77VMui7+62AcMXT+GG7cFHcnU8XVQsGM6UirCcNyWNysfiEMoAdZScJf/GvoY87tMEszhZIUV37z8PUBx6twIqMdr31W1J0IaPa+sV6FEDadeLaNTvancDcHK1zuKsL39jzAg7+LYjKJfEfrsQP+lj/EQcjtKqlhVS5kzsJVfx8ZEd0xhW5G7N6bCdKNalS8mKCMaBXJpijNQ82AiyqCIDCRrre2To0/i7pTjRiL0U9f9mV3S4NJaQaokR050w/ZLySFf6F7joJT mathijs@Qrama-Mathijs' # noqa - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_deploy_local_bundle_dir(event_loop): - tests_dir = Path(__file__).absolute().parent.parent - bundle_path = tests_dir / 'bundle' - mini_bundle_file_path = bundle_path / 'mini-bundle.yaml' - - async with base.CleanModel() as model: - await model.deploy(str(bundle_path)) - await model.deploy(str(mini_bundle_file_path)) - - wordpress = model.applications.get('wordpress') - mysql = model.applications.get('mysql') - assert wordpress and mysql - await block_until(lambda: (len(wordpress.units) == 1 and - len(mysql.units) == 1), - timeout=60 * 4) - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_deploy_local_bundle_file(event_loop): - tests_dir = Path(__file__).absolute().parent.parent - bundle_path = tests_dir / 'bundle' - mini_bundle_file_path = bundle_path / 'mini-bundle.yaml' - - async with base.CleanModel() as model: - await model.deploy(str(mini_bundle_file_path)) - - dummy_sink = model.applications.get('dummy-sink') - dummy_subordinate = model.applications.get('dummy-subordinate') - assert dummy_sink and dummy_subordinate - await block_until(lambda: (len(dummy_sink.units) == 1 and - len(dummy_subordinate.units) == 1), - timeout=60 * 4) - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_deploy_invalid_bundle(event_loop): - tests_dir = Path(__file__).absolute().parent.parent - bundle_path = tests_dir / 'bundle' / 'invalid.yaml' - async with base.CleanModel() as model: - with pytest.raises(JujuError): - await model.deploy(str(bundle_path)) - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_deploy_local_charm(event_loop): - from pathlib import Path - tests_dir = Path(__file__).absolute().parent.parent - charm_path = tests_dir / 'charm' - - async with base.CleanModel() as model: - await model.deploy(str(charm_path)) - assert 'charm' in model.applications - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_deploy_invalid_bundle(event_loop): - tests_dir = Path(__file__).absolute().parent.parent - bundle_path = tests_dir / 'bundle' / 'invalid.yaml' - async with base.CleanModel() as model: - with pytest.raises(JujuError): - await model.deploy(str(bundle_path)) - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_deploy_local_charm(event_loop): - from pathlib import Path - tests_dir = Path(__file__).absolute().parent.parent - charm_path = tests_dir / 'charm' - - async with base.CleanModel() as model: - await model.deploy(str(charm_path)) - assert 'charm' in model.applications - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_deploy_bundle(event_loop): - async with base.CleanModel() as model: - await model.deploy('bundle/wiki-simple') - - for app in ('wiki', 'mysql'): - assert app in model.applications - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_deploy_channels_revs(event_loop): - async with base.CleanModel() as model: - charm = 'cs:~johnsca/libjuju-test' - stable = await model.deploy(charm, 'a1') - edge = await model.deploy(charm, 'a2', channel='edge') - rev = await model.deploy(charm + '-2', 'a3') - - assert [a.charm_url for a in (stable, edge, rev)] == [ - 'cs:~johnsca/libjuju-test-1', - 'cs:~johnsca/libjuju-test-2', - 'cs:~johnsca/libjuju-test-2', - ] - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_add_machine(event_loop): - from juju.machine import Machine - - async with base.CleanModel() as model: - # add a new default machine - machine1 = await model.add_machine() - - # add a machine with constraints, disks, and series - machine2 = await model.add_machine( - constraints={ - 'mem': 256 * MB, - }, - disks=[{ - 'pool': 'rootfs', - 'size': 10 * GB, - 'count': 1, - }], - series='xenial', - ) - - # add a lxd container to machine2 - machine3 = await model.add_machine( - 'lxd:{}'.format(machine2.id)) - - for m in (machine1, machine2, machine3): - assert isinstance(m, Machine) - - assert len(model.machines) == 3 - - await machine3.destroy(force=True) - await machine2.destroy(force=True) - res = await machine1.destroy(force=True) - - assert res is None - assert len(model.machines) == 0 - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_add_manual_machine_ssh(event_loop): - - # Verify controller is localhost - async with base.CleanController() as controller: - cloud = await controller.get_cloud() - if cloud != "localhost": - pytest.skip('Skipping because test requires lxd.') - - async with base.CleanModel() as model: - private_key_path = os.path.expanduser( - "~/.local/share/juju/ssh/juju_id_rsa" - ) - public_key_path = os.path.expanduser( - "~/.local/share/juju/ssh/juju_id_rsa.pub" - ) - - # connect using the local unix socket - client = pylxd.Client() - - test_name = "test-{}-add-manual-machine-ssh".format( - uuid.uuid4().hex[-4:] - ) - - # create profile w/cloud-init and juju ssh key - public_key = "" - with open(public_key_path, "r") as f: - public_key = f.readline() - - profile = client.profiles.create( - test_name, - config={'user.user-data': '#cloud-config\n' - 'ssh_authorized_keys:\n' - '- {}'.format(public_key)}, - devices={ - 'root': {'path': '/', 'pool': 'default', 'type': 'disk'}, - 'eth0': { - 'nictype': 'bridged', - 'parent': 'lxdbr0', - 'type': 'nic' - } - } - ) - - # create lxc machine - config = { - 'name': test_name, - 'source': { - 'type': 'image', - 'alias': 'xenial', - 'mode': 'pull', - 'protocol': 'simplestreams', - 'server': 'https://cloud-images.ubuntu.com/releases', - }, - 'profiles': [test_name], - } - container = client.containers.create(config, wait=True) - container.start(wait=True) - - def wait_for_network(container, timeout=30): - """Wait for eth0 to have an ipv4 address.""" - starttime = time.time() - while(time.time() < starttime + timeout): - time.sleep(1) - if 'eth0' in container.state().network: - addresses = container.state().network['eth0']['addresses'] - if len(addresses) > 0: - if addresses[0]['family'] == 'inet': - return addresses[0] - return None - - host = wait_for_network(container) - assert host, 'Failed to get address for machine' - - # HACK: We need to give sshd a chance to bind to the interface, - # and pylxd's container.execute seems to be broken and fails and/or - # hangs trying to properly check if the service is up. - time.sleep(5) - - for attempt in range(1, 4): - try: - # add a new manual machine - machine1 = await model.add_machine(spec='ssh:{}@{}:{}'.format( - "ubuntu", - host['address'], - private_key_path, - )) - except paramiko.ssh_exception.NoValidConnectionsError: - if attempt == 3: - raise - # retry the ssh connection a few times if it fails - time.sleep(attempt * 5) - else: - break - - assert len(model.machines) == 1 - - res = await machine1.destroy(force=True) - - assert res is None - assert len(model.machines) == 0 - - container.stop(wait=True) - container.delete(wait=True) - - profile.delete() - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_relate(event_loop): - from juju.relation import Relation - - async with base.CleanModel() as model: - await model.deploy( - 'ubuntu', - application_name='ubuntu', - series='trusty', - channel='stable', - ) - await model.deploy( - 'nrpe', - application_name='nrpe', - series='trusty', - channel='stable', - # subordinates must be deployed without units - num_units=0, - ) - - relation_added = asyncio.Event() - timeout = asyncio.Event() - - class TestObserver(ModelObserver): - async def on_relation_add(self, delta, old, new, model): - if set(new.key.split()) == {'nrpe:general-info', - 'ubuntu:juju-info'}: - relation_added.set() - event_loop.call_later(2, timeout.set) - - model.add_observer(TestObserver()) - - real_app_facade = ApplicationFacade.from_connection(model.connection()) - mock_app_facade = mock.MagicMock() - - async def mock_AddRelation(*args): - # force response delay from AddRelation to test race condition - # (see https://github.com/juju/python-libjuju/issues/191) - result = await real_app_facade.AddRelation(*args) - await relation_added.wait() - return result - - mock_app_facade.AddRelation = mock_AddRelation - - with mock.patch.object(ApplicationFacade, 'from_connection', - return_value=mock_app_facade): - my_relation = await run_with_interrupt(model.add_relation( - 'ubuntu', - 'nrpe', - ), timeout, loop=event_loop) - - assert isinstance(my_relation, Relation) - - -async def _deploy_in_loop(new_loop, model_name, jujudata): - new_model = Model(new_loop, jujudata=jujudata) - await new_model.connect(model_name) - try: - await new_model.deploy('cs:xenial/ubuntu') - assert 'ubuntu' in new_model.applications - finally: - await new_model.disconnect() - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_explicit_loop_threaded(event_loop): - async with base.CleanModel() as model: - model_name = model.info.name - new_loop = asyncio.new_event_loop() - with ThreadPoolExecutor(1) as executor: - f = executor.submit( - new_loop.run_until_complete, - _deploy_in_loop(new_loop, - model_name, - model._connector.jujudata)) - f.result() - await model._wait_for_new('application', 'ubuntu') - assert 'ubuntu' in model.applications - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_store_resources_charm(event_loop): - async with base.CleanModel() as model: - ghost = await model.deploy('cs:ghost-19') - assert 'ghost' in model.applications - terminal_statuses = ('active', 'error', 'blocked') - await model.block_until( - lambda: ( - len(ghost.units) > 0 and - ghost.units[0].workload_status in terminal_statuses) - ) - # ghost will go in to blocked (or error, for older - # charm revs) if the resource is missing - assert ghost.units[0].workload_status == 'active' - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_store_resources_bundle(event_loop): - async with base.CleanModel() as model: - bundle = str(Path(__file__).parent / 'bundle') - await model.deploy(bundle) - assert 'ghost' in model.applications - ghost = model.applications['ghost'] - terminal_statuses = ('active', 'error', 'blocked') - await model.block_until( - lambda: ( - len(ghost.units) > 0 and - ghost.units[0].workload_status in terminal_statuses) - ) - # ghost will go in to blocked (or error, for older - # charm revs) if the resource is missing - assert ghost.units[0].workload_status == 'active' - resources = await ghost.get_resources() - assert resources['ghost-stable'].revision >= 12 - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_store_resources_bundle_revs(event_loop): - async with base.CleanModel() as model: - bundle = str(Path(__file__).parent / 'bundle/bundle-resource-rev.yaml') - await model.deploy(bundle) - assert 'ghost' in model.applications - ghost = model.applications['ghost'] - terminal_statuses = ('active', 'error', 'blocked') - await model.block_until( - lambda: ( - len(ghost.units) > 0 and - ghost.units[0].workload_status in terminal_statuses) - ) - # ghost will go in to blocked (or error, for older - # charm revs) if the resource is missing - assert ghost.units[0].workload_status == 'active' - resources = await ghost.get_resources() - assert resources['ghost-stable'].revision == 11 - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_ssh_key(event_loop): - async with base.CleanModel() as model: - await model.add_ssh_key('admin', SSH_KEY) - result = await model.get_ssh_key(True) - result = result.serialize()['results'][0].serialize()['result'] - assert SSH_KEY in result - await model.remove_ssh_key('admin', SSH_KEY) - result = await model.get_ssh_key(True) - result = result.serialize()['results'][0].serialize()['result'] - assert result is None - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_get_machines(event_loop): - async with base.CleanModel() as model: - result = await model.get_machines() - assert isinstance(result, list) - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_watcher_reconnect(event_loop): - async with base.CleanModel() as model: - await model.connection().ws.close() - await block_until(model.is_connected, timeout=3) - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_config(event_loop): - async with base.CleanModel() as model: - await model.set_config({ - 'extra-info': 'booyah', - 'test-mode': ConfigValue(value=True), - }) - result = await model.get_config() - assert 'extra-info' in result - assert result['extra-info'].source == 'model' - assert result['extra-info'].value == 'booyah' - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_set_constraints(event_loop): - async with base.CleanModel() as model: - await model.set_constraints({'cpu-power': 1}) - cons = await model.get_constraints() - assert cons['cpu_power'] == 1 - -# @base.bootstrapped -# @pytest.mark.asyncio -# async def test_grant(event_loop) -# async with base.CleanController() as controller: -# await controller.add_user('test-model-grant') -# await controller.grant('test-model-grant', 'superuser') -# async with base.CleanModel() as model: -# await model.grant('test-model-grant', 'admin') -# assert model.get_user('test-model-grant')['access'] == 'admin' -# await model.grant('test-model-grant', 'login') -# assert model.get_user('test-model-grant')['access'] == 'login'