+
+
+async def _deploy_in_loop(new_loop, model_name):
+ new_model = Model(new_loop)
+ await new_model.connect_model(model_name)
+ try:
+ await new_model.deploy('cs:xenial/ubuntu')
+ assert 'ubuntu' in new_model.applications
+ finally:
+ await new_model.disconnect()
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_explicit_loop(event_loop):
+ async with base.CleanModel() as model:
+ model_name = model.info.name
+ new_loop = asyncio.new_event_loop()
+ new_loop.run_until_complete(
+ _deploy_in_loop(new_loop, model_name))
+ await model._wait_for_new('application', 'ubuntu')
+ assert 'ubuntu' in model.applications
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_explicit_loop_threaded(event_loop):
+ async with base.CleanModel() as model:
+ model_name = model.info.name
+ new_loop = asyncio.new_event_loop()
+ with ThreadPoolExecutor(1) as executor:
+ f = executor.submit(
+ new_loop.run_until_complete,
+ _deploy_in_loop(new_loop, model_name))
+ f.result()
+ await model._wait_for_new('application', 'ubuntu')
+ assert 'ubuntu' in model.applications
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_store_resources_charm(event_loop):
+ async with base.CleanModel() as model:
+ ghost = await model.deploy('cs:ghost-19')
+ assert 'ghost' in model.applications
+ terminal_statuses = ('active', 'error', 'blocked')
+ await model.block_until(
+ lambda: (
+ len(ghost.units) > 0 and
+ ghost.units[0].workload_status in terminal_statuses)
+ )
+ # ghost will go in to blocked (or error, for older
+ # charm revs) if the resource is missing
+ assert ghost.units[0].workload_status == 'active'
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_store_resources_bundle(event_loop):
+ async with base.CleanModel() as model:
+ bundle = str(Path(__file__).parent / 'bundle')
+ await model.deploy(bundle)
+ assert 'ghost' in model.applications
+ ghost = model.applications['ghost']
+ terminal_statuses = ('active', 'error', 'blocked')
+ await model.block_until(
+ lambda: (
+ len(ghost.units) > 0 and
+ ghost.units[0].workload_status in terminal_statuses)
+ )
+ # ghost will go in to blocked (or error, for older
+ # charm revs) if the resource is missing
+ assert ghost.units[0].workload_status == 'active'
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_ssh_key(event_loop):
+ async with base.CleanModel() as model:
+ await model.add_ssh_key('admin', SSH_KEY)
+ result = await model.get_ssh_key(True)
+ result = result.serialize()['results'][0].serialize()['result']
+ assert SSH_KEY in result
+ await model.remove_ssh_key('admin', SSH_KEY)
+ result = await model.get_ssh_key(True)
+ result = result.serialize()['results'][0].serialize()['result']
+ assert result is None
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_get_machines(event_loop):
+ async with base.CleanModel() as model:
+ result = await model.get_machines()
+ assert isinstance(result, list)
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_watcher_reconnect(event_loop):
+ async with base.CleanModel() as model:
+ await model.connection.ws.close()
+ await asyncio.sleep(0.1)
+ assert model.connection.is_open
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_config(event_loop):
+ async with base.CleanModel() as model:
+ await model.set_config({
+ 'extra-info': 'booyah',
+ 'test-mode': ConfigValue(value=True),
+ })
+ result = await model.get_config()
+ assert 'extra-info' in result
+ assert result['extra-info'].source == 'model'
+ assert result['extra-info'].value == 'booyah'
+
+# @base.bootstrapped
+# @pytest.mark.asyncio
+# async def test_grant(event_loop)
+# async with base.CleanController() as controller:
+# await controller.add_user('test-model-grant')
+# await controller.grant('test-model-grant', 'superuser')
+# async with base.CleanModel() as model:
+# await model.grant('test-model-grant', 'admin')
+# assert model.get_user('test-model-grant')['access'] == 'admin'
+# await model.grant('test-model-grant', 'login')
+# assert model.get_user('test-model-grant')['access'] == 'login'