import asyncio
from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
import pytest
from .. import base
f.result()
await model._wait_for_new('application', 'ubuntu')
assert 'ubuntu' in model.applications
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_store_resources_charm(event_loop):
+ async with base.CleanModel() as model:
+ ghost = await model.deploy('cs:ghost-18')
+ assert 'ghost' in model.applications
+ terminal_statuses = ('active', 'error', 'blocked')
+ await model.block_until(
+ lambda: (
+ len(ghost.units) > 0 and
+ ghost.units[0].workload_status in terminal_statuses)
+ )
+ # ghost will go in to blocked (or error, for older
+ # charm revs) if the resource is missing
+ assert ghost.units[0].workload_status == 'active'
+
+
+@base.bootstrapped
+@pytest.mark.asyncio
+async def test_store_resources_bundle(event_loop):
+ async with base.CleanModel() as model:
+ bundle = str(Path(__file__).parent / 'bundle')
+ await model.deploy(bundle)
+ assert 'ghost' in model.applications
+ ghost = model.applications['ghost']
+ terminal_statuses = ('active', 'error', 'blocked')
+ await model.block_until(
+ lambda: (
+ len(ghost.units) > 0 and
+ ghost.units[0].workload_status in terminal_statuses)
+ )
+ # ghost will go in to blocked (or error, for older
+ # charm revs) if the resource is missing
+ assert ghost.units[0].workload_status == 'active'