import asyncio
+import mock
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
+
+from juju.client.client import ConfigValue, ApplicationFacade
+from juju.model import Model, ModelObserver
+from juju.utils import block_until, run_with_interrupt
+from juju.errors import JujuError
+
+import os
+import pylxd
+import time
+import uuid
+
import pytest
from .. import base
-from juju.model import Model
-from juju.client.client import ConfigValue
+
MB = 1
GB = 1024
@base.bootstrapped
@pytest.mark.asyncio
async def test_deploy_local_bundle(event_loop):
- from pathlib import Path
tests_dir = Path(__file__).absolute().parent.parent
bundle_path = tests_dir / 'bundle'
+ mini_bundle_file_path = bundle_path / 'mini-bundle.yaml'
async with base.CleanModel() as model:
await model.deploy(str(bundle_path))
+ await model.deploy(str(mini_bundle_file_path))
- for app in ('wordpress', 'mysql'):
+ for app in ('wordpress', 'mysql', 'myapp'):
assert app in model.applications
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_deploy_invalid_bundle(event_loop):
+ tests_dir = Path(__file__).absolute().parent.parent
+ bundle_path = tests_dir / 'bundle' / 'invalid.yaml'
+ async with base.CleanModel() as model:
+ with pytest.raises(JujuError):
+ await model.deploy(str(bundle_path))
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_deploy_local_charm(event_loop):
+ from pathlib import Path
+ tests_dir = Path(__file__).absolute().parent.parent
+ charm_path = tests_dir / 'charm'
+
+ async with base.CleanModel() as model:
+ await model.deploy(str(charm_path))
+ assert 'charm' in model.applications
+
+
@base.bootstrapped
@pytest.mark.asyncio
async def test_deploy_bundle(event_loop):
charm = 'cs:~johnsca/libjuju-test'
stable = await model.deploy(charm, 'a1')
edge = await model.deploy(charm, 'a2', channel='edge')
- rev = await model.deploy(charm+'-2', 'a3')
+ rev = await model.deploy(charm + '-2', 'a3')
assert [a.charm_url for a in (stable, edge, rev)] == [
'cs:~johnsca/libjuju-test-1',
assert len(model.machines) == 0
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_add_manual_machine_ssh(event_loop):
+
+ # Verify controller is localhost
+ async with base.CleanController() as controller:
+ cloud = await controller.get_cloud()
+ if cloud != "localhost":
+ pytest.skip('Skipping because test requires lxd.')
+
+ async with base.CleanModel() as model:
+ private_key_path = os.path.expanduser(
+ "~/.local/share/juju/ssh/juju_id_rsa"
+ )
+ public_key_path = os.path.expanduser(
+ "~/.local/share/juju/ssh/juju_id_rsa.pub"
+ )
+
+ # Use the self-signed cert generated by lxc on first run
+ crt = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
+ assert os.path.exists(crt)
+
+ key = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.key')
+ assert os.path.exists(key)
+
+ client = pylxd.Client(
+ endpoint="https://127.0.0.1:8443",
+ cert=(crt, key),
+ verify=False,
+ )
+
+ test_name = "test-{}-add-manual-machine-ssh".format(
+ uuid.uuid4().hex[-4:]
+ )
+
+ # create profile w/cloud-init and juju ssh key
+ public_key = ""
+ with open(public_key_path, "r") as f:
+ public_key = f.readline()
+
+ profile = client.profiles.create(
+ test_name,
+ config={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)},
+ devices={
+ 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
+ 'eth0': {
+ 'nictype': 'bridged',
+ 'parent': 'lxdbr0',
+ 'type': 'nic'
+ }
+ }
+ )
+
+ # create lxc machine
+ config = {
+ 'name': test_name,
+ 'source': {
+ 'type': 'image',
+ 'alias': 'xenial',
+ 'mode': 'pull',
+ 'protocol': 'simplestreams',
+ 'server': 'https://cloud-images.ubuntu.com/releases',
+ },
+ 'profiles': [test_name],
+ }
+ container = client.containers.create(config, wait=True)
+ container.start(wait=True)
+
+ def wait_for_network(container, timeout=30):
+ """Wait for eth0 to have an ipv4 address."""
+ starttime = time.time()
+ while(time.time() < starttime + timeout):
+ time.sleep(1)
+ if 'eth0' in container.state().network:
+ addresses = container.state().network['eth0']['addresses']
+ if len(addresses) > 0:
+ if addresses[0]['family'] == 'inet':
+ return addresses[0]
+ return None
+
+ host = wait_for_network(container)
+
+ # HACK: We need to give sshd a chance to bind to the interface,
+ # and pylxd's container.execute seems to be broken and fails and/or
+ # hangs trying to properly check if the service is up.
+ time.sleep(5)
+
+ if host:
+ # add a new manual machine
+ machine1 = await model.add_machine(spec='ssh:{}@{}:{}'.format(
+ "ubuntu",
+ host['address'],
+ private_key_path,
+ ))
+
+ assert len(model.machines) == 1
+
+ res = await machine1.destroy(force=True)
+
+ assert res is None
+ assert len(model.machines) == 0
+
+ container.stop(wait=True)
+ container.delete(wait=True)
+
+ profile.delete()
+
+
@base.bootstrapped
@pytest.mark.asyncio
async def test_relate(event_loop):
# subordinates must be deployed without units
num_units=0,
)
- my_relation = await model.add_relation(
- 'ubuntu',
- 'nrpe',
- )
+
+ relation_added = asyncio.Event()
+ timeout = asyncio.Event()
+
+ class TestObserver(ModelObserver):
+ async def on_relation_add(self, delta, old, new, model):
+ if set(new.key.split()) == {'nrpe:general-info',
+ 'ubuntu:juju-info'}:
+ relation_added.set()
+ event_loop.call_later(2, timeout.set)
+
+ model.add_observer(TestObserver())
+
+ real_app_facade = ApplicationFacade.from_connection(model.connection())
+ mock_app_facade = mock.MagicMock()
+
+ async def mock_AddRelation(*args):
+ # force response delay from AddRelation to test race condition
+ # (see https://github.com/juju/python-libjuju/issues/191)
+ result = await real_app_facade.AddRelation(*args)
+ await relation_added.wait()
+ return result
+
+ mock_app_facade.AddRelation = mock_AddRelation
+
+ with mock.patch.object(ApplicationFacade, 'from_connection',
+ return_value=mock_app_facade):
+ my_relation = await run_with_interrupt(model.add_relation(
+ 'ubuntu',
+ 'nrpe',
+ ), timeout, event_loop)
assert isinstance(my_relation, Relation)
-async def _deploy_in_loop(new_loop, model_name):
- new_model = Model(new_loop)
- await new_model.connect_model(model_name)
+async def _deploy_in_loop(new_loop, model_name, jujudata):
+ new_model = Model(new_loop, jujudata=jujudata)
+ await new_model.connect(model_name)
try:
await new_model.deploy('cs:xenial/ubuntu')
assert 'ubuntu' in new_model.applications
await new_model.disconnect()
-@base.bootstrapped
-@pytest.mark.asyncio
-async def test_explicit_loop(event_loop):
- async with base.CleanModel() as model:
- model_name = model.info.name
- new_loop = asyncio.new_event_loop()
- new_loop.run_until_complete(
- _deploy_in_loop(new_loop, model_name))
- await model._wait_for_new('application', 'ubuntu')
- assert 'ubuntu' in model.applications
-
-
@base.bootstrapped
@pytest.mark.asyncio
async def test_explicit_loop_threaded(event_loop):
with ThreadPoolExecutor(1) as executor:
f = executor.submit(
new_loop.run_until_complete,
- _deploy_in_loop(new_loop, model_name))
+ _deploy_in_loop(new_loop, model_name, model._connector.jujudata))
f.result()
await model._wait_for_new('application', 'ubuntu')
assert 'ubuntu' in model.applications
lambda: (
len(ghost.units) > 0 and
ghost.units[0].workload_status in terminal_statuses)
- )
+ )
# ghost will go in to blocked (or error, for older
# charm revs) if the resource is missing
assert ghost.units[0].workload_status == 'active'
lambda: (
len(ghost.units) > 0 and
ghost.units[0].workload_status in terminal_statuses)
- )
+ )
# ghost will go in to blocked (or error, for older
# charm revs) if the resource is missing
assert ghost.units[0].workload_status == 'active'
@pytest.mark.asyncio
async def test_watcher_reconnect(event_loop):
async with base.CleanModel() as model:
- await model.connection.ws.close()
- await asyncio.sleep(0.1)
- assert model.connection.is_open
+ await model.connection().ws.close()
+ await block_until(model.is_connected, timeout=3)
@base.bootstrapped
assert result['extra-info'].source == 'model'
assert result['extra-info'].value == 'booyah'
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_set_constraints(event_loop):
+ async with base.CleanModel() as model:
+ await model.set_constraints({'cpu-power': 1})
+ cons = await model.get_constraints()
+ assert cons['cpu_power'] == 1
+
# @base.bootstrapped
# @pytest.mark.asyncio
# async def test_grant(event_loop)