+import asyncio
+from concurrent.futures import ThreadPoolExecutor
import pytest
from .. import base
+from juju.model import Model
MB = 1
GB = 1024
)
assert isinstance(my_relation, Relation)
+
+
+async def _deploy_in_loop(new_loop, model_name):
+ new_model = Model(new_loop)
+ await new_model.connect_model(model_name)
+ try:
+ await new_model.deploy('cs:xenial/ubuntu')
+ assert 'ubuntu' in new_model.applications
+ finally:
+ await new_model.disconnect()
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_explicit_loop(event_loop):
+ async with base.CleanModel() as model:
+ model_name = model.info.name
+ new_loop = asyncio.new_event_loop()
+ new_loop.run_until_complete(
+ _deploy_in_loop(new_loop, model_name))
+ await model._wait_for_new('application', 'ubuntu')
+ assert 'ubuntu' in model.applications
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_explicit_loop_threaded(event_loop):
+ async with base.CleanModel() as model:
+ model_name = model.info.name
+ new_loop = asyncio.new_event_loop()
+ with ThreadPoolExecutor(1) as executor:
+ f = executor.submit(
+ new_loop.run_until_complete,
+ _deploy_in_loop(new_loop, model_name))
+ f.result()
+ await model._wait_for_new('application', 'ubuntu')
+ assert 'ubuntu' in model.applications