import mock
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
+import paramiko
from juju.client.client import ConfigValue, ApplicationFacade
from juju.model import Model, ModelObserver
@base.bootstrapped
@pytest.mark.asyncio
-async def test_deploy_local_bundle(event_loop):
+async def test_deploy_local_bundle_dir(event_loop):
tests_dir = Path(__file__).absolute().parent.parent
bundle_path = tests_dir / 'bundle'
mini_bundle_file_path = bundle_path / 'mini-bundle.yaml'
await model.deploy(str(bundle_path))
await model.deploy(str(mini_bundle_file_path))
- for app in ('wordpress', 'mysql', 'myapp'):
- assert app in model.applications
+ wordpress = model.applications.get('wordpress')
+ mysql = model.applications.get('mysql')
+ assert wordpress and mysql
+ await block_until(lambda: (len(wordpress.units) == 1 and
+ len(mysql.units) == 1),
+ timeout=60 * 4)
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_deploy_local_bundle_file(event_loop):
+ tests_dir = Path(__file__).absolute().parent.parent
+ bundle_path = tests_dir / 'bundle'
+ mini_bundle_file_path = bundle_path / 'mini-bundle.yaml'
+
+ async with base.CleanModel() as model:
+ await model.deploy(str(mini_bundle_file_path))
+
+ dummy_sink = model.applications.get('dummy-sink')
+ dummy_subordinate = model.applications.get('dummy-subordinate')
+ assert dummy_sink and dummy_subordinate
+ await block_until(lambda: (len(dummy_sink.units) == 1 and
+ len(dummy_subordinate.units) == 1),
+ timeout=60 * 4)
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_deploy_invalid_bundle(event_loop):
+ tests_dir = Path(__file__).absolute().parent.parent
+ bundle_path = tests_dir / 'bundle' / 'invalid.yaml'
+ async with base.CleanModel() as model:
+ with pytest.raises(JujuError):
+ await model.deploy(str(bundle_path))
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_deploy_local_charm(event_loop):
+ from pathlib import Path
+ tests_dir = Path(__file__).absolute().parent.parent
+ charm_path = tests_dir / 'charm'
+
+ async with base.CleanModel() as model:
+ await model.deploy(str(charm_path))
+ assert 'charm' in model.applications
@base.bootstrapped
"~/.local/share/juju/ssh/juju_id_rsa.pub"
)
- # Use the self-signed cert generated by lxc on first run
- crt = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
- assert os.path.exists(crt)
-
- key = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.key')
- assert os.path.exists(key)
-
- client = pylxd.Client(
- endpoint="https://127.0.0.1:8443",
- cert=(crt, key),
- verify=False,
- )
+ # connect using the local unix socket
+ client = pylxd.Client()
test_name = "test-{}-add-manual-machine-ssh".format(
uuid.uuid4().hex[-4:]
profile = client.profiles.create(
test_name,
- config={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)},
+ config={'user.user-data': '#cloud-config\n'
+ 'ssh_authorized_keys:\n'
+ '- {}'.format(public_key)},
devices={
'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
'eth0': {
return None
host = wait_for_network(container)
+ assert host, 'Failed to get address for machine'
# HACK: We need to give sshd a chance to bind to the interface,
# and pylxd's container.execute seems to be broken and fails and/or
# hangs trying to properly check if the service is up.
time.sleep(5)
- if host:
- # add a new manual machine
- machine1 = await model.add_machine(spec='ssh:{}@{}:{}'.format(
- "ubuntu",
- host['address'],
- private_key_path,
- ))
+ for attempt in range(1, 4):
+ try:
+ # add a new manual machine
+ machine1 = await model.add_machine(spec='ssh:{}@{}:{}'.format(
+ "ubuntu",
+ host['address'],
+ private_key_path,
+ ))
+ except paramiko.ssh_exception.NoValidConnectionsError:
+ if attempt == 3:
+ raise
+ # retry the ssh connection a few times if it fails
+ time.sleep(attempt * 5)
+ else:
+ break
assert len(model.machines) == 1
my_relation = await run_with_interrupt(model.add_relation(
'ubuntu',
'nrpe',
- ), timeout, event_loop)
+ ), timeout, loop=event_loop)
assert isinstance(my_relation, Relation)
with ThreadPoolExecutor(1) as executor:
f = executor.submit(
new_loop.run_until_complete,
- _deploy_in_loop(new_loop, model_name, model._connector.jujudata))
+ _deploy_in_loop(new_loop,
+ model_name,
+ model._connector.jujudata))
f.result()
await model._wait_for_new('application', 'ubuntu')
assert 'ubuntu' in model.applications
# ghost will go in to blocked (or error, for older
# charm revs) if the resource is missing
assert ghost.units[0].workload_status == 'active'
+ resources = await ghost.get_resources()
+ assert resources['ghost-stable'].revision >= 12
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_store_resources_bundle_revs(event_loop):
+ async with base.CleanModel() as model:
+ bundle = str(Path(__file__).parent / 'bundle/bundle-resource-rev.yaml')
+ await model.deploy(bundle)
+ assert 'ghost' in model.applications
+ ghost = model.applications['ghost']
+ terminal_statuses = ('active', 'error', 'blocked')
+ await model.block_until(
+ lambda: (
+ len(ghost.units) > 0 and
+ ghost.units[0].workload_status in terminal_statuses)
+ )
+ # ghost will go in to blocked (or error, for older
+ # charm revs) if the resource is missing
+ assert ghost.units[0].workload_status == 'active'
+ resources = await ghost.get_resources()
+ assert resources['ghost-stable'].revision == 11
@base.bootstrapped
assert result['extra-info'].source == 'model'
assert result['extra-info'].value == 'booyah'
+
@base.bootstrapped
@pytest.mark.asyncio
async def test_set_constraints(event_loop):