Added integration test for explicit loop and threaded functionality
[osm/N2VC.git] / tests / integration / test_model.py
index 2fe97d0..4d45a1c 100644 (file)
@@ -1,6 +1,9 @@
+import asyncio
+from concurrent.futures import ThreadPoolExecutor
 import pytest
 
 from .. import base
+from juju.model import Model
 
 MB = 1
 GB = 1024
@@ -95,3 +98,40 @@ async def test_relate(event_loop):
         )
 
         assert isinstance(my_relation, Relation)
+
+
+async def _deploy_in_loop(new_loop, model_name):
+    new_model = Model(new_loop)
+    await new_model.connect_model(model_name)
+    try:
+        await new_model.deploy('cs:xenial/ubuntu')
+        assert 'ubuntu' in new_model.applications
+    finally:
+        await new_model.disconnect()
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_explicit_loop(event_loop):
+    async with base.CleanModel() as model:
+        model_name = model.info.name
+        new_loop = asyncio.new_event_loop()
+        new_loop.run_until_complete(
+            _deploy_in_loop(new_loop, model_name))
+        await model._wait_for_new('application', 'ubuntu')
+        assert 'ubuntu' in model.applications
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_explicit_loop_threaded(event_loop):
+    async with base.CleanModel() as model:
+        model_name = model.info.name
+        new_loop = asyncio.new_event_loop()
+        with ThreadPoolExecutor(1) as executor:
+            f = executor.submit(
+                new_loop.run_until_complete,
+                _deploy_in_loop(new_loop, model_name))
+            f.result()
+        await model._wait_for_new('application', 'ubuntu')
+        assert 'ubuntu' in model.applications