import asyncio
from concurrent.futures import ThreadPoolExecutor
import pytest
+import uuid
from .. import base
from juju.controller import Controller
@pytest.mark.asyncio
async def test_add_user(event_loop):
async with base.CleanController() as controller:
- await controller.add_user('test')
- result = await controller.get_user('test')
+ username = 'test{}'.format(uuid.uuid4())
+ await controller.add_user(username)
+ result = await controller.get_user(username)
res_ser = result.serialize()['results'][0].serialize()
assert res_ser['result'] is not None
@pytest.mark.asyncio
async def test_disable_enable_user(event_loop):
async with base.CleanController() as controller:
- await controller.add_user('test-disable')
- await controller.disable_user('test-disable')
- result = await controller.get_user('test-disable')
+ username = 'test-disable{}'.format(uuid.uuid4())
+ await controller.add_user(username)
+ await controller.disable_user(username)
+ result = await controller.get_user(username)
res_ser = result.serialize()['results'][0].serialize()
assert res_ser['result'].serialize()['disabled'] is True
- await controller.enable_user('test-disable')
- result = await controller.get_user('test-disable')
+ await controller.enable_user(username)
+ result = await controller.get_user(username)
res_ser = result.serialize()['results'][0].serialize()
assert res_ser['result'].serialize()['disabled'] is False
@pytest.mark.asyncio
async def test_change_user_password(event_loop):
async with base.CleanController() as controller:
- await controller.add_user('test-password')
- await controller.change_user_password('test-password', 'password')
+ username = 'test-password{}'.format(uuid.uuid4())
+ await controller.add_user(username)
+ await controller.change_user_password(username, 'password')
try:
- con = await controller.connect(controller.connection.endpoint, 'test-password', 'password')
+ con = await controller.connect(
+ controller.connection.endpoint, username, 'password')
result = True
except JujuAPIError:
result = False
@pytest.mark.asyncio
async def test_grant(event_loop):
async with base.CleanController() as controller:
- await controller.add_user('test-grant')
- await controller.grant('test-grant', 'superuser')
- result = await controller.get_user('test-grant')
+ username = 'test-grant{}'.format(uuid.uuid4())
+ await controller.add_user(username)
+ await controller.grant(username, 'superuser')
+ result = await controller.get_user(username)
result = result.serialize()['results'][0].serialize()['result'].serialize()
assert result['access'] == 'superuser'
- await controller.grant('test-grant', 'login')
- result = await controller.get_user('test-grant')
+ await controller.grant(username, 'login')
+ result = await controller.get_user(username)
result = result.serialize()['results'][0].serialize()['result'].serialize()
assert result['access'] == 'login'