2 from concurrent
.futures
import ThreadPoolExecutor
3 from pathlib
import Path
7 from juju
.model
import Model
11 SSH_KEY
= 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCsYMJGNGG74HAJha3n2CFmWYsOOaORnJK6VqNy86pj0MIpvRXBzFzVy09uPQ66GOQhTEoJHEqE77VMui7+62AcMXT+GG7cFHcnU8XVQsGM6UirCcNyWNysfiEMoAdZScJf/GvoY87tMEszhZIUV37z8PUBx6twIqMdr31W1J0IaPa+sV6FEDadeLaNTvancDcHK1zuKsL39jzAg7+LYjKJfEfrsQP+lj/EQcjtKqlhVS5kzsJVfx8ZEd0xhW5G7N6bCdKNalS8mKCMaBXJpijNQ82AiyqCIDCRrre2To0/i7pTjRiL0U9f9mV3S4NJaQaokR050w/ZLySFf6F7joJT mathijs@Qrama-Mathijs'
15 async def test_deploy_local_bundle(event_loop
):
16 from pathlib
import Path
17 tests_dir
= Path(__file__
).absolute().parent
.parent
18 bundle_path
= tests_dir
/ 'bundle'
20 async with base
.CleanModel() as model
:
21 await model
.deploy(str(bundle_path
))
23 for app
in ('wordpress', 'mysql'):
24 assert app
in model
.applications
29 async def test_deploy_bundle(event_loop
):
30 async with base
.CleanModel() as model
:
31 await model
.deploy('bundle/wiki-simple')
33 for app
in ('wiki', 'mysql'):
34 assert app
in model
.applications
39 async def test_add_machine(event_loop
):
40 from juju
.machine
import Machine
42 async with base
.CleanModel() as model
:
43 # add a new default machine
44 machine1
= await model
.add_machine()
46 # add a machine with constraints, disks, and series
47 machine2
= await model
.add_machine(
59 # add a lxd container to machine2
60 machine3
= await model
.add_machine(
61 'lxd:{}'.format(machine2
.id))
63 for m
in (machine1
, machine2
, machine3
):
64 assert isinstance(m
, Machine
)
66 assert len(model
.machines
) == 3
68 await machine3
.destroy(force
=True)
69 await machine2
.destroy(force
=True)
70 res
= await machine1
.destroy(force
=True)
73 assert len(model
.machines
) == 0
78 async def test_relate(event_loop
):
79 from juju
.relation
import Relation
81 async with base
.CleanModel() as model
:
84 application_name
='ubuntu',
90 application_name
='nrpe',
93 # subordinates must be deployed without units
96 my_relation
= await model
.add_relation(
101 assert isinstance(my_relation
, Relation
)
104 async def _deploy_in_loop(new_loop
, model_name
):
105 new_model
= Model(new_loop
)
106 await new_model
.connect_model(model_name
)
108 await new_model
.deploy('cs:xenial/ubuntu')
109 assert 'ubuntu' in new_model
.applications
111 await new_model
.disconnect()
116 async def test_explicit_loop(event_loop
):
117 async with base
.CleanModel() as model
:
118 model_name
= model
.info
.name
119 new_loop
= asyncio
.new_event_loop()
120 new_loop
.run_until_complete(
121 _deploy_in_loop(new_loop
, model_name
))
122 await model
._wait
_for
_new
('application', 'ubuntu')
123 assert 'ubuntu' in model
.applications
128 async def test_explicit_loop_threaded(event_loop
):
129 async with base
.CleanModel() as model
:
130 model_name
= model
.info
.name
131 new_loop
= asyncio
.new_event_loop()
132 with
ThreadPoolExecutor(1) as executor
:
134 new_loop
.run_until_complete
,
135 _deploy_in_loop(new_loop
, model_name
))
137 await model
._wait
_for
_new
('application', 'ubuntu')
138 assert 'ubuntu' in model
.applications
143 async def test_store_resources_charm(event_loop
):
144 async with base
.CleanModel() as model
:
145 ghost
= await model
.deploy('cs:ghost-18')
146 assert 'ghost' in model
.applications
147 terminal_statuses
= ('active', 'error', 'blocked')
148 await model
.block_until(
150 len(ghost
.units
) > 0 and
151 ghost
.units
[0].workload_status
in terminal_statuses
)
153 # ghost will go in to blocked (or error, for older
154 # charm revs) if the resource is missing
155 assert ghost
.units
[0].workload_status
== 'active'
160 async def test_store_resources_bundle(event_loop
):
161 async with base
.CleanModel() as model
:
162 bundle
= str(Path(__file__
).parent
/ 'bundle')
163 await model
.deploy(bundle
)
164 assert 'ghost' in model
.applications
165 ghost
= model
.applications
['ghost']
166 terminal_statuses
= ('active', 'error', 'blocked')
167 await model
.block_until(
169 len(ghost
.units
) > 0 and
170 ghost
.units
[0].workload_status
in terminal_statuses
)
172 # ghost will go in to blocked (or error, for older
173 # charm revs) if the resource is missing
174 assert ghost
.units
[0].workload_status
== 'active'
179 async def test_ssh_key(event_loop
):
180 async with base
.CleanModel() as model
:
181 await model
.add_ssh_key('admin', SSH_KEY
)
182 result
= await model
.get_ssh_key(True)
183 result
= result
.serialize()['results'][0].serialize()['result']
184 assert SSH_KEY
in result
185 await model
.remove_ssh_key('admin', SSH_KEY
)
186 result
= await model
.get_ssh_key(True)
187 result
= result
.serialize()['results'][0].serialize()['result']
188 assert result
is None
193 async def test_get_machines(event_loop
):
194 async with base
.CleanModel() as model
:
195 result
= await model
.get_machines()
196 assert isinstance(result
, list)
200 # @pytest.mark.asyncio
201 # async def test_grant(event_loop)
202 # async with base.CleanController() as controller:
203 # await controller.add_user('test-model-grant')
204 # await controller.grant('test-model-grant', 'superuser')
205 # async with base.CleanModel() as model:
206 # await model.grant('test-model-grant', 'admin')
207 # assert model.get_user('test-model-grant')['access'] == 'admin'
208 # await model.grant('test-model-grant', 'login')
209 # assert model.get_user('test-model-grant')['access'] == 'login'