5 from juju
.client
.connection
import Connection
6 from juju
.client
.jujudata
import FileJujuData
7 from juju
.controller
import Controller
8 from juju
.errors
import JujuAPIError
17 async def test_add_remove_user(event_loop
):
18 async with base
.CleanController() as controller
:
19 username
= 'test{}'.format(uuid
.uuid4())
20 user
= await controller
.get_user(username
)
22 user
= await controller
.add_user(username
)
23 assert user
is not None
24 assert user
.secret_key
is not None
25 assert user
.username
== username
26 users
= await controller
.get_users()
27 assert any(u
.username
== username
for u
in users
)
28 await controller
.remove_user(username
)
29 user
= await controller
.get_user(username
)
31 users
= await controller
.get_users()
32 assert not any(u
.username
== username
for u
in users
)
37 async def test_disable_enable_user(event_loop
):
38 async with base
.CleanController() as controller
:
39 username
= 'test-disable{}'.format(uuid
.uuid4())
40 user
= await controller
.add_user(username
)
43 assert not user
.enabled
46 fresh
= await controller
.get_user(username
) # fetch fresh copy
47 assert not fresh
.enabled
52 assert not user
.disabled
54 fresh
= await controller
.get_user(username
) # fetch fresh copy
56 assert not fresh
.disabled
61 async def test_change_user_password(event_loop
):
62 async with base
.CleanController() as controller
:
63 username
= 'test-password{}'.format(uuid
.uuid4())
64 user
= await controller
.add_user(username
)
65 await user
.set_password('password')
66 # Check that we can connect with the new password.
69 kwargs
= controller
.connection().connect_params()
70 kwargs
['username'] = username
71 kwargs
['password'] = 'password'
72 new_connection
= await Connection
.connect(**kwargs
)
74 raise AssertionError('Unable to connect with new password')
77 await new_connection
.close()
82 async def test_reset_user_password(event_loop
):
83 async with base
.CleanController() as controller
:
84 username
= 'test{}'.format(uuid
.uuid4())
85 user
= await controller
.add_user(username
)
86 origin_secret_key
= user
.secret_key
87 await user
.set_password('password')
88 await controller
.reset_user_password(username
)
89 user
= await controller
.get_user(username
)
90 new_secret_key
= user
.secret_key
91 # Check secret key is different after the reset.
92 assert origin_secret_key
!= new_secret_key
93 # Check that we can't connect with the old password.
96 kwargs
= controller
.connection().connect_params()
97 kwargs
['username'] = username
98 kwargs
['password'] = 'password'
99 new_connection
= await Connection
.connect(**kwargs
)
103 # No connection with old password
104 assert new_connection
is None
109 async def test_grant_revoke(event_loop
):
110 async with base
.CleanController() as controller
:
111 username
= 'test-grant{}'.format(uuid
.uuid4())
112 user
= await controller
.add_user(username
)
113 await user
.grant('superuser')
114 assert user
.access
== 'superuser'
115 fresh
= await controller
.get_user(username
) # fetch fresh copy
116 assert fresh
.access
== 'superuser'
117 await user
.grant('login') # already has 'superuser', so no-op
118 assert user
.access
== 'superuser'
119 fresh
= await controller
.get_user(username
) # fetch fresh copy
120 assert fresh
.access
== 'superuser'
122 assert user
.access
== ''
123 fresh
= await controller
.get_user(username
) # fetch fresh copy
124 assert fresh
.access
== ''
129 async def test_list_models(event_loop
):
130 async with base
.CleanController() as controller
:
131 async with base
.CleanModel() as model
:
132 result
= await controller
.list_models()
133 assert model
.info
.name
in result
138 async def test_get_model(event_loop
):
139 async with base
.CleanController() as controller
:
140 by_name
, by_uuid
= None, None
141 model_name
= 'test-{}'.format(uuid
.uuid4())
142 model
= await controller
.add_model(model_name
)
143 model_uuid
= model
.info
.uuid
144 await model
.disconnect()
146 by_name
= await controller
.get_model(model_name
)
147 by_uuid
= await controller
.get_model(model_uuid
)
148 assert by_name
.info
.name
== model_name
149 assert by_name
.info
.uuid
== model_uuid
150 assert by_uuid
.info
.name
== model_name
151 assert by_uuid
.info
.uuid
== model_uuid
154 await by_name
.disconnect()
156 await by_uuid
.disconnect()
157 await controller
.destroy_model(model_name
)
160 async def _wait_for_model(controller
, model_name
):
161 while model_name
not in await controller
.list_models():
162 await asyncio
.sleep(0.5, loop
=controller
.loop
)
165 async def _wait_for_model_gone(controller
, model_name
):
166 while model_name
in await controller
.list_models():
167 await asyncio
.sleep(0.5, loop
=controller
.loop
)
172 async def test_destroy_model_by_name(event_loop
):
173 async with base
.CleanController() as controller
:
174 model_name
= 'test-{}'.format(uuid
.uuid4())
175 model
= await controller
.add_model(model_name
)
176 await model
.disconnect()
177 await asyncio
.wait_for(_wait_for_model(controller
,
180 await controller
.destroy_model(model_name
)
181 await asyncio
.wait_for(_wait_for_model_gone(controller
,
188 async def test_add_destroy_model_by_uuid(event_loop
):
189 async with base
.CleanController() as controller
:
190 model_name
= 'test-{}'.format(uuid
.uuid4())
191 model
= await controller
.add_model(model_name
)
192 model_uuid
= model
.info
.uuid
193 await model
.disconnect()
194 await asyncio
.wait_for(_wait_for_model(controller
,
197 await controller
.destroy_model(model_uuid
)
198 await asyncio
.wait_for(_wait_for_model_gone(controller
,
203 # this test must be run serially because it modifies the login password
207 async def test_macaroon_auth(event_loop
):
208 jujudata
= FileJujuData()
209 account
= jujudata
.accounts()[jujudata
.current_controller()]
210 with base
.patch_file('~/.local/share/juju/accounts.yaml'):
211 if 'password' in account
:
212 # force macaroon auth by "changing" password to current password
213 result
= subprocess
.run(
214 ['juju', 'change-user-password'],
215 input='{0}\n{0}\n'.format(account
['password']),
216 universal_newlines
=True,
217 stderr
=subprocess
.PIPE
)
218 assert result
.returncode
== 0, ('Failed to change password: '
219 '{}'.format(result
.stderr
))
220 controller
= Controller()
222 await controller
.connect()
223 assert controller
.is_connected()
225 if controller
.is_connected():
226 await controller
.disconnect()
227 async with base
.CleanModel():
228 pass # create and login to model works