4d45a1c00cf7e2e887677a14ec069fb001af5efc
2 from concurrent
.futures
import ThreadPoolExecutor
6 from juju
.model
import Model
14 async def test_deploy_local_bundle(event_loop
):
15 from pathlib
import Path
16 tests_dir
= Path(__file__
).absolute().parent
.parent
17 bundle_path
= tests_dir
/ 'bundle'
19 async with base
.CleanModel() as model
:
20 await model
.deploy(str(bundle_path
))
22 for app
in ('wordpress', 'mysql'):
23 assert app
in model
.applications
28 async def test_deploy_bundle(event_loop
):
29 async with base
.CleanModel() as model
:
30 await model
.deploy('bundle/wiki-simple')
32 for app
in ('wiki', 'mysql'):
33 assert app
in model
.applications
38 async def test_add_machine(event_loop
):
39 from juju
.machine
import Machine
41 async with base
.CleanModel() as model
:
42 # add a new default machine
43 machine1
= await model
.add_machine()
45 # add a machine with constraints, disks, and series
46 machine2
= await model
.add_machine(
58 # add a lxd container to machine2
59 machine3
= await model
.add_machine(
60 'lxd:{}'.format(machine2
.id))
62 for m
in (machine1
, machine2
, machine3
):
63 assert isinstance(m
, Machine
)
65 assert len(model
.machines
) == 3
67 await machine3
.destroy(force
=True)
68 await machine2
.destroy(force
=True)
69 res
= await machine1
.destroy(force
=True)
72 assert len(model
.machines
) == 0
77 async def test_relate(event_loop
):
78 from juju
.relation
import Relation
80 async with base
.CleanModel() as model
:
83 application_name
='ubuntu',
89 application_name
='nrpe',
92 # subordinates must be deployed without units
95 my_relation
= await model
.add_relation(
100 assert isinstance(my_relation
, Relation
)
103 async def _deploy_in_loop(new_loop
, model_name
):
104 new_model
= Model(new_loop
)
105 await new_model
.connect_model(model_name
)
107 await new_model
.deploy('cs:xenial/ubuntu')
108 assert 'ubuntu' in new_model
.applications
110 await new_model
.disconnect()
115 async def test_explicit_loop(event_loop
):
116 async with base
.CleanModel() as model
:
117 model_name
= model
.info
.name
118 new_loop
= asyncio
.new_event_loop()
119 new_loop
.run_until_complete(
120 _deploy_in_loop(new_loop
, model_name
))
121 await model
._wait
_for
_new
('application', 'ubuntu')
122 assert 'ubuntu' in model
.applications
127 async def test_explicit_loop_threaded(event_loop
):
128 async with base
.CleanModel() as model
:
129 model_name
= model
.info
.name
130 new_loop
= asyncio
.new_event_loop()
131 with
ThreadPoolExecutor(1) as executor
:
133 new_loop
.run_until_complete
,
134 _deploy_in_loop(new_loop
, model_name
))
136 await model
._wait
_for
_new
('application', 'ubuntu')
137 assert 'ubuntu' in model
.applications