4 from juju
.client
.connection
import Connection
5 from juju
.client
import client
11 async def test_connect_current(event_loop
):
12 async with base
.CleanModel():
13 conn
= await Connection
.connect_current()
15 assert isinstance(conn
, Connection
)
21 async def test_monitor(event_loop
):
23 async with base
.CleanModel():
24 conn
= await Connection
.connect_current()
26 assert conn
.monitor
.status
== 'connected'
29 assert conn
.monitor
.status
== 'disconnected'
34 async def test_monitor_catches_error(event_loop
):
36 async with base
.CleanModel():
37 conn
= await Connection
.connect_current()
39 assert conn
.monitor
.status
== 'connected'
42 assert conn
.monitor
.status
== 'error'
49 async def test_full_status(event_loop
):
50 async with base
.CleanModel() as model
:
53 application_name
='ubuntu',
58 c
= client
.ClientFacade
.from_connection(model
.connection
)
60 await c
.FullStatus(None)
65 async def test_reconnect(event_loop
):
66 async with base
.CleanModel() as model
:
67 conn
= await Connection
.connect(
68 model
.connection
.endpoint
,
69 model
.connection
.uuid
,
70 model
.connection
.username
,
71 model
.connection
.password
,
72 model
.connection
.cacert
,
73 model
.connection
.macaroons
,
74 model
.connection
.loop
,
75 model
.connection
.max_frame_size
)
77 await asyncio
.sleep(0.1)
80 assert not conn
.is_open
81 await model
.block_until(lambda: conn
.is_open
, timeout
=3)