2 from concurrent
.futures
import ThreadPoolExecutor
3 from pathlib
import Path
7 from juju
.model
import Model
8 from juju
.client
.client
import ConfigValue
12 SSH_KEY
= 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCsYMJGNGG74HAJha3n2CFmWYsOOaORnJK6VqNy86pj0MIpvRXBzFzVy09uPQ66GOQhTEoJHEqE77VMui7+62AcMXT+GG7cFHcnU8XVQsGM6UirCcNyWNysfiEMoAdZScJf/GvoY87tMEszhZIUV37z8PUBx6twIqMdr31W1J0IaPa+sV6FEDadeLaNTvancDcHK1zuKsL39jzAg7+LYjKJfEfrsQP+lj/EQcjtKqlhVS5kzsJVfx8ZEd0xhW5G7N6bCdKNalS8mKCMaBXJpijNQ82AiyqCIDCRrre2To0/i7pTjRiL0U9f9mV3S4NJaQaokR050w/ZLySFf6F7joJT mathijs@Qrama-Mathijs' # noqa
17 async def test_deploy_local_bundle(event_loop
):
18 from pathlib
import Path
19 tests_dir
= Path(__file__
).absolute().parent
.parent
20 bundle_path
= tests_dir
/ 'bundle'
22 async with base
.CleanModel() as model
:
23 await model
.deploy(str(bundle_path
))
25 for app
in ('wordpress', 'mysql'):
26 assert app
in model
.applications
31 async def test_deploy_bundle(event_loop
):
32 async with base
.CleanModel() as model
:
33 await model
.deploy('bundle/wiki-simple')
35 for app
in ('wiki', 'mysql'):
36 assert app
in model
.applications
41 async def test_deploy_channels_revs(event_loop
):
42 async with base
.CleanModel() as model
:
43 charm
= 'cs:~johnsca/libjuju-test'
44 stable
= await model
.deploy(charm
, 'a1')
45 edge
= await model
.deploy(charm
, 'a2', channel
='edge')
46 rev
= await model
.deploy(charm
+'-2', 'a3')
48 assert [a
.charm_url
for a
in (stable
, edge
, rev
)] == [
49 'cs:~johnsca/libjuju-test-1',
50 'cs:~johnsca/libjuju-test-2',
51 'cs:~johnsca/libjuju-test-2',
57 async def test_add_machine(event_loop
):
58 from juju
.machine
import Machine
60 async with base
.CleanModel() as model
:
61 # add a new default machine
62 machine1
= await model
.add_machine()
64 # add a machine with constraints, disks, and series
65 machine2
= await model
.add_machine(
77 # add a lxd container to machine2
78 machine3
= await model
.add_machine(
79 'lxd:{}'.format(machine2
.id))
81 for m
in (machine1
, machine2
, machine3
):
82 assert isinstance(m
, Machine
)
84 assert len(model
.machines
) == 3
86 await machine3
.destroy(force
=True)
87 await machine2
.destroy(force
=True)
88 res
= await machine1
.destroy(force
=True)
91 assert len(model
.machines
) == 0
96 async def test_relate(event_loop
):
97 from juju
.relation
import Relation
99 async with base
.CleanModel() as model
:
102 application_name
='ubuntu',
108 application_name
='nrpe',
111 # subordinates must be deployed without units
114 my_relation
= await model
.add_relation(
119 assert isinstance(my_relation
, Relation
)
122 async def _deploy_in_loop(new_loop
, model_name
):
123 new_model
= Model(new_loop
)
124 await new_model
.connect_model(model_name
)
126 await new_model
.deploy('cs:xenial/ubuntu')
127 assert 'ubuntu' in new_model
.applications
129 await new_model
.disconnect()
134 async def test_explicit_loop_threaded(event_loop
):
135 async with base
.CleanModel() as model
:
136 model_name
= model
.info
.name
137 new_loop
= asyncio
.new_event_loop()
138 with
ThreadPoolExecutor(1) as executor
:
140 new_loop
.run_until_complete
,
141 _deploy_in_loop(new_loop
, model_name
))
143 await model
._wait
_for
_new
('application', 'ubuntu')
144 assert 'ubuntu' in model
.applications
149 async def test_store_resources_charm(event_loop
):
150 async with base
.CleanModel() as model
:
151 ghost
= await model
.deploy('cs:ghost-19')
152 assert 'ghost' in model
.applications
153 terminal_statuses
= ('active', 'error', 'blocked')
154 await model
.block_until(
156 len(ghost
.units
) > 0 and
157 ghost
.units
[0].workload_status
in terminal_statuses
)
159 # ghost will go in to blocked (or error, for older
160 # charm revs) if the resource is missing
161 assert ghost
.units
[0].workload_status
== 'active'
166 async def test_store_resources_bundle(event_loop
):
167 async with base
.CleanModel() as model
:
168 bundle
= str(Path(__file__
).parent
/ 'bundle')
169 await model
.deploy(bundle
)
170 assert 'ghost' in model
.applications
171 ghost
= model
.applications
['ghost']
172 terminal_statuses
= ('active', 'error', 'blocked')
173 await model
.block_until(
175 len(ghost
.units
) > 0 and
176 ghost
.units
[0].workload_status
in terminal_statuses
)
178 # ghost will go in to blocked (or error, for older
179 # charm revs) if the resource is missing
180 assert ghost
.units
[0].workload_status
== 'active'
185 async def test_ssh_key(event_loop
):
186 async with base
.CleanModel() as model
:
187 await model
.add_ssh_key('admin', SSH_KEY
)
188 result
= await model
.get_ssh_key(True)
189 result
= result
.serialize()['results'][0].serialize()['result']
190 assert SSH_KEY
in result
191 await model
.remove_ssh_key('admin', SSH_KEY
)
192 result
= await model
.get_ssh_key(True)
193 result
= result
.serialize()['results'][0].serialize()['result']
194 assert result
is None
199 async def test_get_machines(event_loop
):
200 async with base
.CleanModel() as model
:
201 result
= await model
.get_machines()
202 assert isinstance(result
, list)
207 async def test_watcher_reconnect(event_loop
):
208 async with base
.CleanModel() as model
:
209 await model
.connection
.ws
.close()
210 await asyncio
.sleep(0.1)
211 assert model
.connection
.is_open
216 async def test_config(event_loop
):
217 async with base
.CleanModel() as model
:
218 await model
.set_config({
219 'extra-info': 'booyah',
220 'test-mode': ConfigValue(value
=True),
222 result
= await model
.get_config()
223 assert 'extra-info' in result
224 assert result
['extra-info'].source
== 'model'
225 assert result
['extra-info'].value
== 'booyah'
228 # @pytest.mark.asyncio
229 # async def test_grant(event_loop)
230 # async with base.CleanController() as controller:
231 # await controller.add_user('test-model-grant')
232 # await controller.grant('test-model-grant', 'superuser')
233 # async with base.CleanModel() as model:
234 # await model.grant('test-model-grant', 'admin')
235 # assert model.get_user('test-model-grant')['access'] == 'admin'
236 # await model.grant('test-model-grant', 'login')
237 # assert model.get_user('test-model-grant')['access'] == 'login'