2 from concurrent
.futures
import ThreadPoolExecutor
3 from pathlib
import Path
7 from juju
.model
import Model
11 SSH_KEY
= 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCsYMJGNGG74HAJha3n2CFmWYsOOaORnJK6VqNy86pj0MIpvRXBzFzVy09uPQ66GOQhTEoJHEqE77VMui7+62AcMXT+GG7cFHcnU8XVQsGM6UirCcNyWNysfiEMoAdZScJf/GvoY87tMEszhZIUV37z8PUBx6twIqMdr31W1J0IaPa+sV6FEDadeLaNTvancDcHK1zuKsL39jzAg7+LYjKJfEfrsQP+lj/EQcjtKqlhVS5kzsJVfx8ZEd0xhW5G7N6bCdKNalS8mKCMaBXJpijNQ82AiyqCIDCRrre2To0/i7pTjRiL0U9f9mV3S4NJaQaokR050w/ZLySFf6F7joJT mathijs@Qrama-Mathijs'
15 async def test_deploy_local_bundle(event_loop
):
16 from pathlib
import Path
17 tests_dir
= Path(__file__
).absolute().parent
.parent
18 bundle_path
= tests_dir
/ 'bundle'
20 async with base
.CleanModel() as model
:
21 await model
.deploy(str(bundle_path
))
23 for app
in ('wordpress', 'mysql'):
24 assert app
in model
.applications
29 async def test_deploy_bundle(event_loop
):
30 async with base
.CleanModel() as model
:
31 await model
.deploy('bundle/wiki-simple')
33 for app
in ('wiki', 'mysql'):
34 assert app
in model
.applications
39 async def test_deploy_channels_revs(event_loop
):
40 async with base
.CleanModel() as model
:
41 charm
= 'cs:~johnsca/libjuju-test'
42 stable
= await model
.deploy(charm
, 'a1')
43 edge
= await model
.deploy(charm
, 'a2', channel
='edge')
44 rev
= await model
.deploy(charm
+'-2', 'a3')
46 assert [a
.charm_url
for a
in (stable
, edge
, rev
)] == [
47 'cs:~johnsca/libjuju-test-1',
48 'cs:~johnsca/libjuju-test-2',
49 'cs:~johnsca/libjuju-test-2',
55 async def test_add_machine(event_loop
):
56 from juju
.machine
import Machine
58 async with base
.CleanModel() as model
:
59 # add a new default machine
60 machine1
= await model
.add_machine()
62 # add a machine with constraints, disks, and series
63 machine2
= await model
.add_machine(
75 # add a lxd container to machine2
76 machine3
= await model
.add_machine(
77 'lxd:{}'.format(machine2
.id))
79 for m
in (machine1
, machine2
, machine3
):
80 assert isinstance(m
, Machine
)
82 assert len(model
.machines
) == 3
84 await machine3
.destroy(force
=True)
85 await machine2
.destroy(force
=True)
86 res
= await machine1
.destroy(force
=True)
89 assert len(model
.machines
) == 0
94 async def test_relate(event_loop
):
95 from juju
.relation
import Relation
97 async with base
.CleanModel() as model
:
100 application_name
='ubuntu',
106 application_name
='nrpe',
109 # subordinates must be deployed without units
112 my_relation
= await model
.add_relation(
117 assert isinstance(my_relation
, Relation
)
120 async def _deploy_in_loop(new_loop
, model_name
):
121 new_model
= Model(new_loop
)
122 await new_model
.connect_model(model_name
)
124 await new_model
.deploy('cs:xenial/ubuntu')
125 assert 'ubuntu' in new_model
.applications
127 await new_model
.disconnect()
132 async def test_explicit_loop(event_loop
):
133 async with base
.CleanModel() as model
:
134 model_name
= model
.info
.name
135 new_loop
= asyncio
.new_event_loop()
136 new_loop
.run_until_complete(
137 _deploy_in_loop(new_loop
, model_name
))
138 await model
._wait
_for
_new
('application', 'ubuntu')
139 assert 'ubuntu' in model
.applications
144 async def test_explicit_loop_threaded(event_loop
):
145 async with base
.CleanModel() as model
:
146 model_name
= model
.info
.name
147 new_loop
= asyncio
.new_event_loop()
148 with
ThreadPoolExecutor(1) as executor
:
150 new_loop
.run_until_complete
,
151 _deploy_in_loop(new_loop
, model_name
))
153 await model
._wait
_for
_new
('application', 'ubuntu')
154 assert 'ubuntu' in model
.applications
159 async def test_store_resources_charm(event_loop
):
160 async with base
.CleanModel() as model
:
161 ghost
= await model
.deploy('cs:ghost-19')
162 assert 'ghost' in model
.applications
163 terminal_statuses
= ('active', 'error', 'blocked')
164 await model
.block_until(
166 len(ghost
.units
) > 0 and
167 ghost
.units
[0].workload_status
in terminal_statuses
)
169 # ghost will go in to blocked (or error, for older
170 # charm revs) if the resource is missing
171 assert ghost
.units
[0].workload_status
== 'active'
176 async def test_store_resources_bundle(event_loop
):
177 async with base
.CleanModel() as model
:
178 bundle
= str(Path(__file__
).parent
/ 'bundle')
179 await model
.deploy(bundle
)
180 assert 'ghost' in model
.applications
181 ghost
= model
.applications
['ghost']
182 terminal_statuses
= ('active', 'error', 'blocked')
183 await model
.block_until(
185 len(ghost
.units
) > 0 and
186 ghost
.units
[0].workload_status
in terminal_statuses
)
188 # ghost will go in to blocked (or error, for older
189 # charm revs) if the resource is missing
190 assert ghost
.units
[0].workload_status
== 'active'
195 async def test_ssh_key(event_loop
):
196 async with base
.CleanModel() as model
:
197 await model
.add_ssh_key('admin', SSH_KEY
)
198 result
= await model
.get_ssh_key(True)
199 result
= result
.serialize()['results'][0].serialize()['result']
200 assert SSH_KEY
in result
201 await model
.remove_ssh_key('admin', SSH_KEY
)
202 result
= await model
.get_ssh_key(True)
203 result
= result
.serialize()['results'][0].serialize()['result']
204 assert result
is None
209 async def test_get_machines(event_loop
):
210 async with base
.CleanModel() as model
:
211 result
= await model
.get_machines()
212 assert isinstance(result
, list)
217 async def test_watcher_reconnect(event_loop
):
218 async with base
.CleanModel() as model
:
219 await model
.connection
.ws
.close()
220 await asyncio
.sleep(0.1)
221 assert model
.connection
.is_open
225 # @pytest.mark.asyncio
226 # async def test_grant(event_loop)
227 # async with base.CleanController() as controller:
228 # await controller.add_user('test-model-grant')
229 # await controller.grant('test-model-grant', 'superuser')
230 # async with base.CleanModel() as model:
231 # await model.grant('test-model-grant', 'admin')
232 # assert model.get_user('test-model-grant')['access'] == 'admin'
233 # await model.grant('test-model-grant', 'login')
234 # assert model.get_user('test-model-grant')['access'] == 'login'