3 from juju
.client
import client
4 from juju
.client
.connection
import Connection
13 async def test_monitor(event_loop
):
15 async with base
.CleanModel() as model
:
16 conn
= model
.connection()
17 assert conn
.monitor
.status
== 'connected'
20 assert conn
.monitor
.status
== 'disconnected'
25 async def test_monitor_catches_error(event_loop
):
27 async with base
.CleanModel() as model
:
28 conn
= model
.connection()
30 assert conn
.monitor
.status
== 'connected'
32 async with conn
.monitor
.reconnecting
:
34 await asyncio
.sleep(1)
35 assert conn
.monitor
.status
== 'error'
42 async def test_full_status(event_loop
):
43 async with base
.CleanModel() as model
:
46 application_name
='ubuntu',
51 c
= client
.ClientFacade
.from_connection(model
.connection())
53 await c
.FullStatus(None)
58 async def test_reconnect(event_loop
):
59 async with base
.CleanModel() as model
:
60 kwargs
= model
.connection().connect_params()
61 conn
= await Connection
.connect(**kwargs
)
63 await asyncio
.sleep(0.1)
66 assert not conn
.is_open
67 await model
.block_until(lambda: conn
.is_open
, timeout
=3)