Revert "Remove vendored libjuju"
[osm/N2VC.git] / modules / libjuju / tests / integration / test_controller.py
index f3840cc..6423a98 100644 (file)
@@ -1,20 +1,35 @@
-import pytest
+import asyncio
+import subprocess
 import uuid
 
-from .. import base
+from juju.client.connection import Connection
+from juju.client.jujudata import FileJujuData
 from juju.controller import Controller
 from juju.errors import JujuAPIError
 
+import pytest
+
+from .. import base
+
 
 @base.bootstrapped
 @pytest.mark.asyncio
-async def test_add_user(event_loop):
+async def test_add_remove_user(event_loop):
     async with base.CleanController() as controller:
         username = 'test{}'.format(uuid.uuid4())
-        await controller.add_user(username)
-        result = await controller.get_user(username)
-        res_ser = result.serialize()['results'][0].serialize()
-        assert res_ser['result'] is not None
+        user = await controller.get_user(username)
+        assert user is None
+        user = await controller.add_user(username)
+        assert user is not None
+        assert user.secret_key is not None
+        assert user.username == username
+        users = await controller.get_users()
+        assert any(u.username == username for u in users)
+        await controller.remove_user(username)
+        user = await controller.get_user(username)
+        assert user is None
+        users = await controller.get_users()
+        assert not any(u.username == username for u in users)
 
 
 @base.bootstrapped
@@ -22,15 +37,23 @@ async def test_add_user(event_loop):
 async def test_disable_enable_user(event_loop):
     async with base.CleanController() as controller:
         username = 'test-disable{}'.format(uuid.uuid4())
-        await controller.add_user(username)
-        await controller.disable_user(username)
-        result = await controller.get_user(username)
-        res_ser = result.serialize()['results'][0].serialize()
-        assert res_ser['result'].serialize()['disabled'] is True
-        await controller.enable_user(username)
-        result = await controller.get_user(username)
-        res_ser = result.serialize()['results'][0].serialize()
-        assert res_ser['result'].serialize()['disabled'] is False
+        user = await controller.add_user(username)
+
+        await user.disable()
+        assert not user.enabled
+        assert user.disabled
+
+        fresh = await controller.get_user(username)  # fetch fresh copy
+        assert not fresh.enabled
+        assert fresh.disabled
+
+        await user.enable()
+        assert user.enabled
+        assert not user.disabled
+
+        fresh = await controller.get_user(username)  # fetch fresh copy
+        assert fresh.enabled
+        assert not fresh.disabled
 
 
 @base.bootstrapped
@@ -38,40 +61,168 @@ async def test_disable_enable_user(event_loop):
 async def test_change_user_password(event_loop):
     async with base.CleanController() as controller:
         username = 'test-password{}'.format(uuid.uuid4())
-        await controller.add_user(username)
-        await controller.change_user_password(username, 'password')
+        user = await controller.add_user(username)
+        await user.set_password('password')
+        # Check that we can connect with the new password.
+        new_connection = None
+        try:
+            kwargs = controller.connection().connect_params()
+            kwargs['username'] = username
+            kwargs['password'] = 'password'
+            new_connection = await Connection.connect(**kwargs)
+        except JujuAPIError:
+            raise AssertionError('Unable to connect with new password')
+        finally:
+            if new_connection:
+                await new_connection.close()
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_reset_user_password(event_loop):
+    async with base.CleanController() as controller:
+        username = 'test{}'.format(uuid.uuid4())
+        user = await controller.add_user(username)
+        origin_secret_key = user.secret_key
+        await user.set_password('password')
+        await controller.reset_user_password(username)
+        user = await controller.get_user(username)
+        new_secret_key = user.secret_key
+        # Check secret key is different after the reset.
+        assert origin_secret_key != new_secret_key
+        # Check that we can't connect with the old password.
+        new_connection = None
         try:
-            new_controller = Controller()
-            await new_controller.connect(
-                controller.connection.endpoint, username, 'password')
-            result = True
-            await new_controller.disconnect()
+            kwargs = controller.connection().connect_params()
+            kwargs['username'] = username
+            kwargs['password'] = 'password'
+            new_connection = await Connection.connect(**kwargs)
         except JujuAPIError:
-            result = False
-        assert result is True
+            pass
+        finally:
+            # No connection with old password
+            assert new_connection is None
 
 
 @base.bootstrapped
 @pytest.mark.asyncio
-async def test_grant(event_loop):
+async def test_grant_revoke(event_loop):
     async with base.CleanController() as controller:
         username = 'test-grant{}'.format(uuid.uuid4())
-        await controller.add_user(username)
-        await controller.grant(username, 'superuser')
-        result = await controller.get_user(username)
-        result = result.serialize()['results'][0].serialize()['result']\
-            .serialize()
-        assert result['access'] == 'superuser'
-        await controller.grant(username, 'login')
-        result = await controller.get_user(username)
-        result = result.serialize()['results'][0].serialize()['result']\
-            .serialize()
-        assert result['access'] == 'login'
+        user = await controller.add_user(username)
+        await user.grant('superuser')
+        assert user.access == 'superuser'
+        fresh = await controller.get_user(username)  # fetch fresh copy
+        assert fresh.access == 'superuser'
+        await user.grant('login')  # already has 'superuser', so no-op
+        assert user.access == 'superuser'
+        fresh = await controller.get_user(username)  # fetch fresh copy
+        assert fresh.access == 'superuser'
+        await user.revoke()
+        assert user.access == ''
+        fresh = await controller.get_user(username)  # fetch fresh copy
+        assert fresh.access == ''
 
 
 @base.bootstrapped
 @pytest.mark.asyncio
-async def test_get_models(event_loop):
+async def test_list_models(event_loop):
     async with base.CleanController() as controller:
-        result = await controller.get_models()
-        assert isinstance(result.serialize()['user-models'], list)
+        async with base.CleanModel() as model:
+            result = await controller.list_models()
+            assert model.info.name in result
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_get_model(event_loop):
+    async with base.CleanController() as controller:
+        by_name, by_uuid = None, None
+        model_name = 'test-{}'.format(uuid.uuid4())
+        model = await controller.add_model(model_name)
+        model_uuid = model.info.uuid
+        await model.disconnect()
+        try:
+            by_name = await controller.get_model(model_name)
+            by_uuid = await controller.get_model(model_uuid)
+            assert by_name.info.name == model_name
+            assert by_name.info.uuid == model_uuid
+            assert by_uuid.info.name == model_name
+            assert by_uuid.info.uuid == model_uuid
+        finally:
+            if by_name:
+                await by_name.disconnect()
+            if by_uuid:
+                await by_uuid.disconnect()
+            await controller.destroy_model(model_name)
+
+
+async def _wait_for_model(controller, model_name):
+    while model_name not in await controller.list_models():
+        await asyncio.sleep(0.5, loop=controller.loop)
+
+
+async def _wait_for_model_gone(controller, model_name):
+    while model_name in await controller.list_models():
+        await asyncio.sleep(0.5, loop=controller.loop)
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_destroy_model_by_name(event_loop):
+    async with base.CleanController() as controller:
+        model_name = 'test-{}'.format(uuid.uuid4())
+        model = await controller.add_model(model_name)
+        await model.disconnect()
+        await asyncio.wait_for(_wait_for_model(controller,
+                                               model_name),
+                               timeout=60)
+        await controller.destroy_model(model_name)
+        await asyncio.wait_for(_wait_for_model_gone(controller,
+                                                    model_name),
+                               timeout=60)
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_add_destroy_model_by_uuid(event_loop):
+    async with base.CleanController() as controller:
+        model_name = 'test-{}'.format(uuid.uuid4())
+        model = await controller.add_model(model_name)
+        model_uuid = model.info.uuid
+        await model.disconnect()
+        await asyncio.wait_for(_wait_for_model(controller,
+                                               model_name),
+                               timeout=60)
+        await controller.destroy_model(model_uuid)
+        await asyncio.wait_for(_wait_for_model_gone(controller,
+                                                    model_name),
+                               timeout=60)
+
+
+# this test must be run serially because it modifies the login password
+@pytest.mark.serial
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_macaroon_auth(event_loop):
+    jujudata = FileJujuData()
+    account = jujudata.accounts()[jujudata.current_controller()]
+    with base.patch_file('~/.local/share/juju/accounts.yaml'):
+        if 'password' in account:
+            # force macaroon auth by "changing" password to current password
+            result = subprocess.run(
+                ['juju', 'change-user-password'],
+                input='{0}\n{0}\n'.format(account['password']),
+                universal_newlines=True,
+                stderr=subprocess.PIPE)
+            assert result.returncode == 0, ('Failed to change password: '
+                                            '{}'.format(result.stderr))
+        controller = Controller()
+        try:
+            await controller.connect()
+            assert controller.is_connected()
+        finally:
+            if controller.is_connected():
+                await controller.disconnect()
+        async with base.CleanModel():
+            pass  # create and login to model works