Bump version and update changelog for release
[osm/N2VC.git] / tests / integration / test_model.py
1 import asyncio
2 from concurrent.futures import ThreadPoolExecutor
3 from pathlib import Path
4 import pytest
5
6 from .. import base
7 from juju.model import Model
8
9 MB = 1
10 GB = 1024
11 SSH_KEY = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCsYMJGNGG74HAJha3n2CFmWYsOOaORnJK6VqNy86pj0MIpvRXBzFzVy09uPQ66GOQhTEoJHEqE77VMui7+62AcMXT+GG7cFHcnU8XVQsGM6UirCcNyWNysfiEMoAdZScJf/GvoY87tMEszhZIUV37z8PUBx6twIqMdr31W1J0IaPa+sV6FEDadeLaNTvancDcHK1zuKsL39jzAg7+LYjKJfEfrsQP+lj/EQcjtKqlhVS5kzsJVfx8ZEd0xhW5G7N6bCdKNalS8mKCMaBXJpijNQ82AiyqCIDCRrre2To0/i7pTjRiL0U9f9mV3S4NJaQaokR050w/ZLySFf6F7joJT mathijs@Qrama-Mathijs'
12
13 @base.bootstrapped
14 @pytest.mark.asyncio
15 async def test_deploy_local_bundle(event_loop):
16 from pathlib import Path
17 tests_dir = Path(__file__).absolute().parent.parent
18 bundle_path = tests_dir / 'bundle'
19
20 async with base.CleanModel() as model:
21 await model.deploy(str(bundle_path))
22
23 for app in ('wordpress', 'mysql'):
24 assert app in model.applications
25
26
27 @base.bootstrapped
28 @pytest.mark.asyncio
29 async def test_deploy_bundle(event_loop):
30 async with base.CleanModel() as model:
31 await model.deploy('bundle/wiki-simple')
32
33 for app in ('wiki', 'mysql'):
34 assert app in model.applications
35
36
37 @base.bootstrapped
38 @pytest.mark.asyncio
39 async def test_deploy_channels_revs(event_loop):
40 async with base.CleanModel() as model:
41 charm = 'cs:~johnsca/libjuju-test'
42 stable = await model.deploy(charm, 'a1')
43 edge = await model.deploy(charm, 'a2', channel='edge')
44 rev = await model.deploy(charm+'-2', 'a3')
45
46 assert [a.charm_url for a in (stable, edge, rev)] == [
47 'cs:~johnsca/libjuju-test-1',
48 'cs:~johnsca/libjuju-test-2',
49 'cs:~johnsca/libjuju-test-2',
50 ]
51
52
53 @base.bootstrapped
54 @pytest.mark.asyncio
55 async def test_add_machine(event_loop):
56 from juju.machine import Machine
57
58 async with base.CleanModel() as model:
59 # add a new default machine
60 machine1 = await model.add_machine()
61
62 # add a machine with constraints, disks, and series
63 machine2 = await model.add_machine(
64 constraints={
65 'mem': 256 * MB,
66 },
67 disks=[{
68 'pool': 'rootfs',
69 'size': 10 * GB,
70 'count': 1,
71 }],
72 series='xenial',
73 )
74
75 # add a lxd container to machine2
76 machine3 = await model.add_machine(
77 'lxd:{}'.format(machine2.id))
78
79 for m in (machine1, machine2, machine3):
80 assert isinstance(m, Machine)
81
82 assert len(model.machines) == 3
83
84 await machine3.destroy(force=True)
85 await machine2.destroy(force=True)
86 res = await machine1.destroy(force=True)
87
88 assert res is None
89 assert len(model.machines) == 0
90
91
92 @base.bootstrapped
93 @pytest.mark.asyncio
94 async def test_relate(event_loop):
95 from juju.relation import Relation
96
97 async with base.CleanModel() as model:
98 await model.deploy(
99 'ubuntu',
100 application_name='ubuntu',
101 series='trusty',
102 channel='stable',
103 )
104 await model.deploy(
105 'nrpe',
106 application_name='nrpe',
107 series='trusty',
108 channel='stable',
109 # subordinates must be deployed without units
110 num_units=0,
111 )
112 my_relation = await model.add_relation(
113 'ubuntu',
114 'nrpe',
115 )
116
117 assert isinstance(my_relation, Relation)
118
119
120 async def _deploy_in_loop(new_loop, model_name):
121 new_model = Model(new_loop)
122 await new_model.connect_model(model_name)
123 try:
124 await new_model.deploy('cs:xenial/ubuntu')
125 assert 'ubuntu' in new_model.applications
126 finally:
127 await new_model.disconnect()
128
129
130 @base.bootstrapped
131 @pytest.mark.asyncio
132 async def test_explicit_loop(event_loop):
133 async with base.CleanModel() as model:
134 model_name = model.info.name
135 new_loop = asyncio.new_event_loop()
136 new_loop.run_until_complete(
137 _deploy_in_loop(new_loop, model_name))
138 await model._wait_for_new('application', 'ubuntu')
139 assert 'ubuntu' in model.applications
140
141
142 @base.bootstrapped
143 @pytest.mark.asyncio
144 async def test_explicit_loop_threaded(event_loop):
145 async with base.CleanModel() as model:
146 model_name = model.info.name
147 new_loop = asyncio.new_event_loop()
148 with ThreadPoolExecutor(1) as executor:
149 f = executor.submit(
150 new_loop.run_until_complete,
151 _deploy_in_loop(new_loop, model_name))
152 f.result()
153 await model._wait_for_new('application', 'ubuntu')
154 assert 'ubuntu' in model.applications
155
156
157 @base.bootstrapped
158 @pytest.mark.asyncio
159 async def test_store_resources_charm(event_loop):
160 async with base.CleanModel() as model:
161 ghost = await model.deploy('cs:ghost-19')
162 assert 'ghost' in model.applications
163 terminal_statuses = ('active', 'error', 'blocked')
164 await model.block_until(
165 lambda: (
166 len(ghost.units) > 0 and
167 ghost.units[0].workload_status in terminal_statuses)
168 )
169 # ghost will go in to blocked (or error, for older
170 # charm revs) if the resource is missing
171 assert ghost.units[0].workload_status == 'active'
172
173
174 @base.bootstrapped
175 @pytest.mark.asyncio
176 async def test_store_resources_bundle(event_loop):
177 async with base.CleanModel() as model:
178 bundle = str(Path(__file__).parent / 'bundle')
179 await model.deploy(bundle)
180 assert 'ghost' in model.applications
181 ghost = model.applications['ghost']
182 terminal_statuses = ('active', 'error', 'blocked')
183 await model.block_until(
184 lambda: (
185 len(ghost.units) > 0 and
186 ghost.units[0].workload_status in terminal_statuses)
187 )
188 # ghost will go in to blocked (or error, for older
189 # charm revs) if the resource is missing
190 assert ghost.units[0].workload_status == 'active'
191
192
193 @base.bootstrapped
194 @pytest.mark.asyncio
195 async def test_ssh_key(event_loop):
196 async with base.CleanModel() as model:
197 await model.add_ssh_key('admin', SSH_KEY)
198 result = await model.get_ssh_key(True)
199 result = result.serialize()['results'][0].serialize()['result']
200 assert SSH_KEY in result
201 await model.remove_ssh_key('admin', SSH_KEY)
202 result = await model.get_ssh_key(True)
203 result = result.serialize()['results'][0].serialize()['result']
204 assert result is None
205
206
207 @base.bootstrapped
208 @pytest.mark.asyncio
209 async def test_get_machines(event_loop):
210 async with base.CleanModel() as model:
211 result = await model.get_machines()
212 assert isinstance(result, list)
213
214
215 @base.bootstrapped
216 @pytest.mark.asyncio
217 async def test_watcher_reconnect(event_loop):
218 async with base.CleanModel() as model:
219 await model.connection.ws.close()
220 await asyncio.sleep(0.1)
221 assert model.connection.is_open
222
223
224 # @base.bootstrapped
225 # @pytest.mark.asyncio
226 # async def test_grant(event_loop)
227 # async with base.CleanController() as controller:
228 # await controller.add_user('test-model-grant')
229 # await controller.grant('test-model-grant', 'superuser')
230 # async with base.CleanModel() as model:
231 # await model.grant('test-model-grant', 'admin')
232 # assert model.get_user('test-model-grant')['access'] == 'admin'
233 # await model.grant('test-model-grant', 'login')
234 # assert model.get_user('test-model-grant')['access'] == 'login'