5 from juju
.client
.connection
import Connection
6 from juju
.client
.jujudata
import FileJujuData
7 from juju
.controller
import Controller
8 from juju
.errors
import JujuAPIError
17 async def test_add_remove_user(event_loop
):
18 async with base
.CleanController() as controller
:
19 username
= 'test{}'.format(uuid
.uuid4())
20 user
= await controller
.get_user(username
)
22 user
= await controller
.add_user(username
)
23 assert user
is not None
24 assert user
.username
== username
25 users
= await controller
.get_users()
26 assert any(u
.username
== username
for u
in users
)
27 await controller
.remove_user(username
)
28 user
= await controller
.get_user(username
)
30 users
= await controller
.get_users()
31 assert not any(u
.username
== username
for u
in users
)
36 async def test_disable_enable_user(event_loop
):
37 async with base
.CleanController() as controller
:
38 username
= 'test-disable{}'.format(uuid
.uuid4())
39 user
= await controller
.add_user(username
)
42 assert not user
.enabled
45 fresh
= await controller
.get_user(username
) # fetch fresh copy
46 assert not fresh
.enabled
51 assert not user
.disabled
53 fresh
= await controller
.get_user(username
) # fetch fresh copy
55 assert not fresh
.disabled
60 async def test_change_user_password(event_loop
):
61 async with base
.CleanController() as controller
:
62 username
= 'test-password{}'.format(uuid
.uuid4())
63 user
= await controller
.add_user(username
)
64 await user
.set_password('password')
65 # Check that we can connect with the new password.
68 kwargs
= controller
.connection().connect_params()
69 kwargs
['username'] = username
70 kwargs
['password'] = 'password'
71 new_connection
= await Connection
.connect(**kwargs
)
73 raise AssertionError('Unable to connect with new password')
76 await new_connection
.close()
81 async def test_grant_revoke(event_loop
):
82 async with base
.CleanController() as controller
:
83 username
= 'test-grant{}'.format(uuid
.uuid4())
84 user
= await controller
.add_user(username
)
85 await user
.grant('superuser')
86 assert user
.access
== 'superuser'
87 fresh
= await controller
.get_user(username
) # fetch fresh copy
88 assert fresh
.access
== 'superuser'
89 await user
.grant('login') # already has 'superuser', so no-op
90 assert user
.access
== 'superuser'
91 fresh
= await controller
.get_user(username
) # fetch fresh copy
92 assert fresh
.access
== 'superuser'
94 assert user
.access
is ''
95 fresh
= await controller
.get_user(username
) # fetch fresh copy
96 assert fresh
.access
is ''
101 async def test_list_models(event_loop
):
102 async with base
.CleanController() as controller
:
103 async with base
.CleanModel() as model
:
104 result
= await controller
.list_models()
105 assert model
.info
.name
in result
110 async def test_get_model(event_loop
):
111 async with base
.CleanController() as controller
:
112 by_name
, by_uuid
= None, None
113 model_name
= 'test-{}'.format(uuid
.uuid4())
114 model
= await controller
.add_model(model_name
)
115 model_uuid
= model
.info
.uuid
116 await model
.disconnect()
118 by_name
= await controller
.get_model(model_name
)
119 by_uuid
= await controller
.get_model(model_uuid
)
120 assert by_name
.info
.name
== model_name
121 assert by_name
.info
.uuid
== model_uuid
122 assert by_uuid
.info
.name
== model_name
123 assert by_uuid
.info
.uuid
== model_uuid
126 await by_name
.disconnect()
128 await by_uuid
.disconnect()
129 await controller
.destroy_model(model_name
)
132 async def _wait_for_model(controller
, model_name
):
133 while model_name
not in await controller
.list_models():
134 await asyncio
.sleep(0.5, loop
=controller
.loop
)
137 async def _wait_for_model_gone(controller
, model_name
):
138 while model_name
in await controller
.list_models():
139 await asyncio
.sleep(0.5, loop
=controller
.loop
)
144 async def test_destroy_model_by_name(event_loop
):
145 async with base
.CleanController() as controller
:
146 model_name
= 'test-{}'.format(uuid
.uuid4())
147 model
= await controller
.add_model(model_name
)
148 await model
.disconnect()
149 await asyncio
.wait_for(_wait_for_model(controller
,
152 await controller
.destroy_model(model_name
)
153 await asyncio
.wait_for(_wait_for_model_gone(controller
,
160 async def test_add_destroy_model_by_uuid(event_loop
):
161 async with base
.CleanController() as controller
:
162 model_name
= 'test-{}'.format(uuid
.uuid4())
163 model
= await controller
.add_model(model_name
)
164 model_uuid
= model
.info
.uuid
165 await model
.disconnect()
166 await asyncio
.wait_for(_wait_for_model(controller
,
169 await controller
.destroy_model(model_uuid
)
170 await asyncio
.wait_for(_wait_for_model_gone(controller
,
175 # this test must be run serially because it modifies the login password
179 async def test_macaroon_auth(event_loop
):
180 jujudata
= FileJujuData()
181 account
= jujudata
.accounts()[jujudata
.current_controller()]
182 with base
.patch_file('~/.local/share/juju/accounts.yaml'):
183 if 'password' in account
:
184 # force macaroon auth by "changing" password to current password
185 result
= subprocess
.run(
186 ['juju', 'change-user-password'],
187 input='{0}\n{0}\n'.format(account
['password']),
188 universal_newlines
=True,
189 stderr
=subprocess
.PIPE
)
190 assert result
.returncode
== 0, ('Failed to change password: '
191 '{}'.format(result
.stderr
))
192 controller
= Controller()
194 await controller
.connect()
195 assert controller
.is_connected()
197 if controller
.is_connected():
198 await controller
.disconnect()
199 async with base
.CleanModel():
200 pass # create and login to model works