3 from concurrent
.futures
import ThreadPoolExecutor
4 from pathlib
import Path
6 from juju
.client
.client
import ConfigValue
, ApplicationFacade
7 from juju
.model
import Model
, ModelObserver
8 from juju
.utils
import block_until
, run_with_interrupt
17 SSH_KEY
= 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCsYMJGNGG74HAJha3n2CFmWYsOOaORnJK6VqNy86pj0MIpvRXBzFzVy09uPQ66GOQhTEoJHEqE77VMui7+62AcMXT+GG7cFHcnU8XVQsGM6UirCcNyWNysfiEMoAdZScJf/GvoY87tMEszhZIUV37z8PUBx6twIqMdr31W1J0IaPa+sV6FEDadeLaNTvancDcHK1zuKsL39jzAg7+LYjKJfEfrsQP+lj/EQcjtKqlhVS5kzsJVfx8ZEd0xhW5G7N6bCdKNalS8mKCMaBXJpijNQ82AiyqCIDCRrre2To0/i7pTjRiL0U9f9mV3S4NJaQaokR050w/ZLySFf6F7joJT mathijs@Qrama-Mathijs' # noqa
22 async def test_deploy_local_bundle(event_loop
):
23 from pathlib
import Path
24 tests_dir
= Path(__file__
).absolute().parent
.parent
25 bundle_path
= tests_dir
/ 'bundle'
26 mini_bundle_file_path
= bundle_path
/ 'mini-bundle.yaml'
28 async with base
.CleanModel() as model
:
29 await model
.deploy(str(bundle_path
))
30 await model
.deploy(str(mini_bundle_file_path
))
32 for app
in ('wordpress', 'mysql', 'myapp'):
33 assert app
in model
.applications
38 async def test_deploy_local_charm(event_loop
):
39 from pathlib
import Path
40 tests_dir
= Path(__file__
).absolute().parent
.parent
41 charm_path
= tests_dir
/ 'charm'
43 async with base
.CleanModel() as model
:
44 await model
.deploy(str(charm_path
))
45 assert 'charm' in model
.applications
50 async def test_deploy_bundle(event_loop
):
51 async with base
.CleanModel() as model
:
52 await model
.deploy('bundle/wiki-simple')
54 for app
in ('wiki', 'mysql'):
55 assert app
in model
.applications
60 async def test_deploy_channels_revs(event_loop
):
61 async with base
.CleanModel() as model
:
62 charm
= 'cs:~johnsca/libjuju-test'
63 stable
= await model
.deploy(charm
, 'a1')
64 edge
= await model
.deploy(charm
, 'a2', channel
='edge')
65 rev
= await model
.deploy(charm
+ '-2', 'a3')
67 assert [a
.charm_url
for a
in (stable
, edge
, rev
)] == [
68 'cs:~johnsca/libjuju-test-1',
69 'cs:~johnsca/libjuju-test-2',
70 'cs:~johnsca/libjuju-test-2',
76 async def test_add_machine(event_loop
):
77 from juju
.machine
import Machine
79 async with base
.CleanModel() as model
:
80 # add a new default machine
81 machine1
= await model
.add_machine()
83 # add a machine with constraints, disks, and series
84 machine2
= await model
.add_machine(
96 # add a lxd container to machine2
97 machine3
= await model
.add_machine(
98 'lxd:{}'.format(machine2
.id))
100 for m
in (machine1
, machine2
, machine3
):
101 assert isinstance(m
, Machine
)
103 assert len(model
.machines
) == 3
105 await machine3
.destroy(force
=True)
106 await machine2
.destroy(force
=True)
107 res
= await machine1
.destroy(force
=True)
110 assert len(model
.machines
) == 0
115 async def test_relate(event_loop
):
116 from juju
.relation
import Relation
118 async with base
.CleanModel() as model
:
121 application_name
='ubuntu',
127 application_name
='nrpe',
130 # subordinates must be deployed without units
134 relation_added
= asyncio
.Event()
135 timeout
= asyncio
.Event()
137 class TestObserver(ModelObserver
):
138 async def on_relation_add(self
, delta
, old
, new
, model
):
139 if set(new
.key
.split()) == {'nrpe:general-info',
142 event_loop
.call_later(2, timeout
.set)
144 model
.add_observer(TestObserver())
146 real_app_facade
= ApplicationFacade
.from_connection(model
.connection())
147 mock_app_facade
= mock
.MagicMock()
149 async def mock_AddRelation(*args
):
150 # force response delay from AddRelation to test race condition
151 # (see https://github.com/juju/python-libjuju/issues/191)
152 result
= await real_app_facade
.AddRelation(*args
)
153 await relation_added
.wait()
156 mock_app_facade
.AddRelation
= mock_AddRelation
158 with mock
.patch
.object(ApplicationFacade
, 'from_connection',
159 return_value
=mock_app_facade
):
160 my_relation
= await run_with_interrupt(model
.add_relation(
163 ), timeout
, event_loop
)
165 assert isinstance(my_relation
, Relation
)
168 async def _deploy_in_loop(new_loop
, model_name
, jujudata
):
169 new_model
= Model(new_loop
, jujudata
=jujudata
)
170 await new_model
.connect(model_name
)
172 await new_model
.deploy('cs:xenial/ubuntu')
173 assert 'ubuntu' in new_model
.applications
175 await new_model
.disconnect()
180 async def test_explicit_loop_threaded(event_loop
):
181 async with base
.CleanModel() as model
:
182 model_name
= model
.info
.name
183 new_loop
= asyncio
.new_event_loop()
184 with
ThreadPoolExecutor(1) as executor
:
186 new_loop
.run_until_complete
,
187 _deploy_in_loop(new_loop
, model_name
, model
._connector
.jujudata
))
189 await model
._wait
_for
_new
('application', 'ubuntu')
190 assert 'ubuntu' in model
.applications
195 async def test_store_resources_charm(event_loop
):
196 async with base
.CleanModel() as model
:
197 ghost
= await model
.deploy('cs:ghost-19')
198 assert 'ghost' in model
.applications
199 terminal_statuses
= ('active', 'error', 'blocked')
200 await model
.block_until(
202 len(ghost
.units
) > 0 and
203 ghost
.units
[0].workload_status
in terminal_statuses
)
205 # ghost will go in to blocked (or error, for older
206 # charm revs) if the resource is missing
207 assert ghost
.units
[0].workload_status
== 'active'
212 async def test_store_resources_bundle(event_loop
):
213 async with base
.CleanModel() as model
:
214 bundle
= str(Path(__file__
).parent
/ 'bundle')
215 await model
.deploy(bundle
)
216 assert 'ghost' in model
.applications
217 ghost
= model
.applications
['ghost']
218 terminal_statuses
= ('active', 'error', 'blocked')
219 await model
.block_until(
221 len(ghost
.units
) > 0 and
222 ghost
.units
[0].workload_status
in terminal_statuses
)
224 # ghost will go in to blocked (or error, for older
225 # charm revs) if the resource is missing
226 assert ghost
.units
[0].workload_status
== 'active'
231 async def test_ssh_key(event_loop
):
232 async with base
.CleanModel() as model
:
233 await model
.add_ssh_key('admin', SSH_KEY
)
234 result
= await model
.get_ssh_key(True)
235 result
= result
.serialize()['results'][0].serialize()['result']
236 assert SSH_KEY
in result
237 await model
.remove_ssh_key('admin', SSH_KEY
)
238 result
= await model
.get_ssh_key(True)
239 result
= result
.serialize()['results'][0].serialize()['result']
240 assert result
is None
245 async def test_get_machines(event_loop
):
246 async with base
.CleanModel() as model
:
247 result
= await model
.get_machines()
248 assert isinstance(result
, list)
253 async def test_watcher_reconnect(event_loop
):
254 async with base
.CleanModel() as model
:
255 await model
.connection().ws
.close()
256 await block_until(model
.is_connected
, timeout
=3)
261 async def test_config(event_loop
):
262 async with base
.CleanModel() as model
:
263 await model
.set_config({
264 'extra-info': 'booyah',
265 'test-mode': ConfigValue(value
=True),
267 result
= await model
.get_config()
268 assert 'extra-info' in result
269 assert result
['extra-info'].source
== 'model'
270 assert result
['extra-info'].value
== 'booyah'
273 # @pytest.mark.asyncio
274 # async def test_grant(event_loop)
275 # async with base.CleanController() as controller:
276 # await controller.add_user('test-model-grant')
277 # await controller.grant('test-model-grant', 'superuser')
278 # async with base.CleanModel() as model:
279 # await model.grant('test-model-grant', 'admin')
280 # assert model.get_user('test-model-grant')['access'] == 'admin'
281 # await model.grant('test-model-grant', 'login')
282 # assert model.get_user('test-model-grant')['access'] == 'login'