3 from concurrent
.futures
import ThreadPoolExecutor
4 from pathlib
import Path
6 from juju
.client
.client
import ConfigValue
, ApplicationFacade
7 from juju
.model
import Model
, ModelObserver
8 from juju
.utils
import block_until
, run_with_interrupt
9 from juju
.errors
import JujuError
23 SSH_KEY
= 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCsYMJGNGG74HAJha3n2CFmWYsOOaORnJK6VqNy86pj0MIpvRXBzFzVy09uPQ66GOQhTEoJHEqE77VMui7+62AcMXT+GG7cFHcnU8XVQsGM6UirCcNyWNysfiEMoAdZScJf/GvoY87tMEszhZIUV37z8PUBx6twIqMdr31W1J0IaPa+sV6FEDadeLaNTvancDcHK1zuKsL39jzAg7+LYjKJfEfrsQP+lj/EQcjtKqlhVS5kzsJVfx8ZEd0xhW5G7N6bCdKNalS8mKCMaBXJpijNQ82AiyqCIDCRrre2To0/i7pTjRiL0U9f9mV3S4NJaQaokR050w/ZLySFf6F7joJT mathijs@Qrama-Mathijs' # noqa
28 async def test_deploy_local_bundle(event_loop
):
29 tests_dir
= Path(__file__
).absolute().parent
.parent
30 bundle_path
= tests_dir
/ 'bundle'
31 mini_bundle_file_path
= bundle_path
/ 'mini-bundle.yaml'
33 async with base
.CleanModel() as model
:
34 await model
.deploy(str(bundle_path
))
35 await model
.deploy(str(mini_bundle_file_path
))
37 for app
in ('wordpress', 'mysql', 'myapp'):
38 assert app
in model
.applications
43 async def test_deploy_invalid_bundle(event_loop
):
44 tests_dir
= Path(__file__
).absolute().parent
.parent
45 bundle_path
= tests_dir
/ 'bundle' / 'invalid.yaml'
46 async with base
.CleanModel() as model
:
47 with pytest
.raises(JujuError
):
48 await model
.deploy(str(bundle_path
))
53 async def test_deploy_local_charm(event_loop
):
54 from pathlib
import Path
55 tests_dir
= Path(__file__
).absolute().parent
.parent
56 charm_path
= tests_dir
/ 'charm'
58 async with base
.CleanModel() as model
:
59 await model
.deploy(str(charm_path
))
60 assert 'charm' in model
.applications
65 async def test_deploy_bundle(event_loop
):
66 async with base
.CleanModel() as model
:
67 await model
.deploy('bundle/wiki-simple')
69 for app
in ('wiki', 'mysql'):
70 assert app
in model
.applications
75 async def test_deploy_channels_revs(event_loop
):
76 async with base
.CleanModel() as model
:
77 charm
= 'cs:~johnsca/libjuju-test'
78 stable
= await model
.deploy(charm
, 'a1')
79 edge
= await model
.deploy(charm
, 'a2', channel
='edge')
80 rev
= await model
.deploy(charm
+ '-2', 'a3')
82 assert [a
.charm_url
for a
in (stable
, edge
, rev
)] == [
83 'cs:~johnsca/libjuju-test-1',
84 'cs:~johnsca/libjuju-test-2',
85 'cs:~johnsca/libjuju-test-2',
91 async def test_add_machine(event_loop
):
92 from juju
.machine
import Machine
94 async with base
.CleanModel() as model
:
95 # add a new default machine
96 machine1
= await model
.add_machine()
98 # add a machine with constraints, disks, and series
99 machine2
= await model
.add_machine(
111 # add a lxd container to machine2
112 machine3
= await model
.add_machine(
113 'lxd:{}'.format(machine2
.id))
115 for m
in (machine1
, machine2
, machine3
):
116 assert isinstance(m
, Machine
)
118 assert len(model
.machines
) == 3
120 await machine3
.destroy(force
=True)
121 await machine2
.destroy(force
=True)
122 res
= await machine1
.destroy(force
=True)
125 assert len(model
.machines
) == 0
130 async def test_add_manual_machine_ssh(event_loop
):
132 # Verify controller is localhost
133 async with base
.CleanController() as controller
:
134 cloud
= await controller
.get_cloud()
135 if cloud
!= "localhost":
136 pytest
.skip('Skipping because test requires lxd.')
138 async with base
.CleanModel() as model
:
139 private_key_path
= os
.path
.expanduser(
140 "~/.local/share/juju/ssh/juju_id_rsa"
142 public_key_path
= os
.path
.expanduser(
143 "~/.local/share/juju/ssh/juju_id_rsa.pub"
146 # Use the self-signed cert generated by lxc on first run
147 crt
= os
.path
.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
148 assert os
.path
.exists(crt
)
150 key
= os
.path
.expanduser('~/snap/lxd/current/.config/lxc/client.key')
151 assert os
.path
.exists(key
)
153 client
= pylxd
.Client(
154 endpoint
="https://127.0.0.1:8443",
159 test_name
= "test-{}-add-manual-machine-ssh".format(
160 uuid
.uuid4().hex[-4:]
163 # create profile w/cloud-init and juju ssh key
165 with
open(public_key_path
, "r") as f
:
166 public_key
= f
.readline()
168 profile
= client
.profiles
.create(
170 config
={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key
)},
172 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
174 'nictype': 'bridged',
188 'protocol': 'simplestreams',
189 'server': 'https://cloud-images.ubuntu.com/releases',
191 'profiles': [test_name
],
193 container
= client
.containers
.create(config
, wait
=True)
194 container
.start(wait
=True)
196 def wait_for_network(container
, timeout
=30):
197 """Wait for eth0 to have an ipv4 address."""
198 starttime
= time
.time()
199 while(time
.time() < starttime
+ timeout
):
201 if 'eth0' in container
.state().network
:
202 addresses
= container
.state().network
['eth0']['addresses']
203 if len(addresses
) > 0:
204 if addresses
[0]['family'] == 'inet':
208 host
= wait_for_network(container
)
210 # HACK: We need to give sshd a chance to bind to the interface,
211 # and pylxd's container.execute seems to be broken and fails and/or
212 # hangs trying to properly check if the service is up.
216 # add a new manual machine
217 machine1
= await model
.add_machine(spec
='ssh:{}@{}:{}'.format(
223 assert len(model
.machines
) == 1
225 res
= await machine1
.destroy(force
=True)
228 assert len(model
.machines
) == 0
230 container
.stop(wait
=True)
231 container
.delete(wait
=True)
238 async def test_relate(event_loop
):
239 from juju
.relation
import Relation
241 async with base
.CleanModel() as model
:
244 application_name
='ubuntu',
250 application_name
='nrpe',
253 # subordinates must be deployed without units
257 relation_added
= asyncio
.Event()
258 timeout
= asyncio
.Event()
260 class TestObserver(ModelObserver
):
261 async def on_relation_add(self
, delta
, old
, new
, model
):
262 if set(new
.key
.split()) == {'nrpe:general-info',
265 event_loop
.call_later(2, timeout
.set)
267 model
.add_observer(TestObserver())
269 real_app_facade
= ApplicationFacade
.from_connection(model
.connection())
270 mock_app_facade
= mock
.MagicMock()
272 async def mock_AddRelation(*args
):
273 # force response delay from AddRelation to test race condition
274 # (see https://github.com/juju/python-libjuju/issues/191)
275 result
= await real_app_facade
.AddRelation(*args
)
276 await relation_added
.wait()
279 mock_app_facade
.AddRelation
= mock_AddRelation
281 with mock
.patch
.object(ApplicationFacade
, 'from_connection',
282 return_value
=mock_app_facade
):
283 my_relation
= await run_with_interrupt(model
.add_relation(
286 ), timeout
, event_loop
)
288 assert isinstance(my_relation
, Relation
)
291 async def _deploy_in_loop(new_loop
, model_name
, jujudata
):
292 new_model
= Model(new_loop
, jujudata
=jujudata
)
293 await new_model
.connect(model_name
)
295 await new_model
.deploy('cs:xenial/ubuntu')
296 assert 'ubuntu' in new_model
.applications
298 await new_model
.disconnect()
303 async def test_explicit_loop_threaded(event_loop
):
304 async with base
.CleanModel() as model
:
305 model_name
= model
.info
.name
306 new_loop
= asyncio
.new_event_loop()
307 with
ThreadPoolExecutor(1) as executor
:
309 new_loop
.run_until_complete
,
310 _deploy_in_loop(new_loop
, model_name
, model
._connector
.jujudata
))
312 await model
._wait
_for
_new
('application', 'ubuntu')
313 assert 'ubuntu' in model
.applications
318 async def test_store_resources_charm(event_loop
):
319 async with base
.CleanModel() as model
:
320 ghost
= await model
.deploy('cs:ghost-19')
321 assert 'ghost' in model
.applications
322 terminal_statuses
= ('active', 'error', 'blocked')
323 await model
.block_until(
325 len(ghost
.units
) > 0 and
326 ghost
.units
[0].workload_status
in terminal_statuses
)
328 # ghost will go in to blocked (or error, for older
329 # charm revs) if the resource is missing
330 assert ghost
.units
[0].workload_status
== 'active'
335 async def test_store_resources_bundle(event_loop
):
336 async with base
.CleanModel() as model
:
337 bundle
= str(Path(__file__
).parent
/ 'bundle')
338 await model
.deploy(bundle
)
339 assert 'ghost' in model
.applications
340 ghost
= model
.applications
['ghost']
341 terminal_statuses
= ('active', 'error', 'blocked')
342 await model
.block_until(
344 len(ghost
.units
) > 0 and
345 ghost
.units
[0].workload_status
in terminal_statuses
)
347 # ghost will go in to blocked (or error, for older
348 # charm revs) if the resource is missing
349 assert ghost
.units
[0].workload_status
== 'active'
354 async def test_ssh_key(event_loop
):
355 async with base
.CleanModel() as model
:
356 await model
.add_ssh_key('admin', SSH_KEY
)
357 result
= await model
.get_ssh_key(True)
358 result
= result
.serialize()['results'][0].serialize()['result']
359 assert SSH_KEY
in result
360 await model
.remove_ssh_key('admin', SSH_KEY
)
361 result
= await model
.get_ssh_key(True)
362 result
= result
.serialize()['results'][0].serialize()['result']
363 assert result
is None
368 async def test_get_machines(event_loop
):
369 async with base
.CleanModel() as model
:
370 result
= await model
.get_machines()
371 assert isinstance(result
, list)
376 async def test_watcher_reconnect(event_loop
):
377 async with base
.CleanModel() as model
:
378 await model
.connection().ws
.close()
379 await block_until(model
.is_connected
, timeout
=3)
384 async def test_config(event_loop
):
385 async with base
.CleanModel() as model
:
386 await model
.set_config({
387 'extra-info': 'booyah',
388 'test-mode': ConfigValue(value
=True),
390 result
= await model
.get_config()
391 assert 'extra-info' in result
392 assert result
['extra-info'].source
== 'model'
393 assert result
['extra-info'].value
== 'booyah'
397 async def test_set_constraints(event_loop
):
398 async with base
.CleanModel() as model
:
399 await model
.set_constraints({'cpu-power': 1})
400 cons
= await model
.get_constraints()
401 assert cons
['cpu_power'] == 1
404 # @pytest.mark.asyncio
405 # async def test_grant(event_loop)
406 # async with base.CleanController() as controller:
407 # await controller.add_user('test-model-grant')
408 # await controller.grant('test-model-grant', 'superuser')
409 # async with base.CleanModel() as model:
410 # await model.grant('test-model-grant', 'admin')
411 # assert model.get_user('test-model-grant')['access'] == 'admin'
412 # await model.grant('test-model-grant', 'login')
413 # assert model.get_user('test-model-grant')['access'] == 'login'