6 from contextlib
import closing
7 from pathlib
import Path
9 from juju
.client
import client
10 from juju
.client
.connection
import Connection
11 from juju
.controller
import Controller
12 from juju
.utils
import run_with_interrupt
20 logger
= logging
.getLogger(__name__
)
25 async def test_monitor(event_loop
):
26 async with base
.CleanModel() as model
:
27 conn
= model
.connection()
28 assert conn
.monitor
.status
== 'connected'
31 assert conn
.monitor
.status
== 'disconnected'
36 async def test_monitor_catches_error(event_loop
):
38 async with base
.CleanModel() as model
:
39 conn
= model
.connection()
41 assert conn
.monitor
.status
== 'connected'
43 async with conn
.monitor
.reconnecting
:
45 await asyncio
.sleep(1)
46 assert conn
.monitor
.status
== 'error'
53 async def test_full_status(event_loop
):
54 async with base
.CleanModel() as model
:
57 application_name
='ubuntu',
62 c
= client
.ClientFacade
.from_connection(model
.connection())
64 await c
.FullStatus(None)
69 async def test_reconnect(event_loop
):
70 async with base
.CleanModel() as model
:
71 kwargs
= model
.connection().connect_params()
72 conn
= await Connection
.connect(**kwargs
)
74 await asyncio
.sleep(0.1)
77 assert not conn
.is_open
78 await model
.block_until(lambda: conn
.is_open
, timeout
=3)
85 async def test_redirect(event_loop
):
86 controller
= Controller()
87 await controller
.connect()
88 kwargs
= controller
.connection().connect_params()
89 await controller
.disconnect()
91 # websockets.server.logger.setLevel(logging.DEBUG)
92 # websockets.client.logger.setLevel(logging.DEBUG)
93 # # websockets.protocol.logger.setLevel(logging.DEBUG)
94 # logger.setLevel(logging.DEBUG)
96 destination
= 'wss://{}/api'.format(kwargs
['endpoint'])
98 http
.HTTPStatus
.MOVED_PERMANENTLY
,
99 http
.HTTPStatus
.FOUND
,
100 http
.HTTPStatus
.SEE_OTHER
,
101 http
.HTTPStatus
.TEMPORARY_REDIRECT
,
102 http
.HTTPStatus
.PERMANENT_REDIRECT
,
104 test_server_cert
= Path(__file__
).with_name('cert.pem')
105 kwargs
['cacert'] += '\n' + test_server_cert
.read_text()
106 server
= RedirectServer(destination
, event_loop
)
108 for status
in redirect_statuses
:
109 logger
.debug('test: starting {}'.format(status
))
111 await run_with_interrupt(server
.running
.wait(),
114 raise server
.exception
115 assert not server
.terminated
.is_set()
116 logger
.debug('test: started')
117 kwargs_copy
= dict(kwargs
,
118 endpoint
='localhost:{}'.format(server
.port
))
119 logger
.debug('test: connecting')
120 conn
= await Connection
.connect(**kwargs_copy
)
121 logger
.debug('test: connected')
123 logger
.debug('test: stopping')
125 await server
.stopped
.wait()
126 logger
.debug('test: stopped')
129 await server
.terminated
.wait()
132 class RedirectServer
:
133 def __init__(self
, destination
, loop
):
134 self
.destination
= destination
136 self
._start
= asyncio
.Event()
137 self
._stop
= asyncio
.Event()
138 self
._terminate
= asyncio
.Event()
139 self
.running
= asyncio
.Event()
140 self
.stopped
= asyncio
.Event()
141 self
.terminated
= asyncio
.Event()
142 if hasattr(ssl
, 'PROTOCOL_TLS_SERVER'):
144 protocol
= ssl
.PROTOCOL_TLS_SERVER
145 elif hasattr(ssl
, 'PROTOCOL_TLS'):
147 protocol
= ssl
.PROTOCOL_TLS
150 protocol
= ssl
.PROTOCOL_TLSv1_2
151 self
.ssl_context
= ssl
.SSLContext(protocol
)
152 crt_file
= Path(__file__
).with_name('cert.pem')
153 key_file
= Path(__file__
).with_name('key.pem')
154 self
.ssl_context
.load_cert_chain(str(crt_file
), str(key_file
))
157 self
._task
= self
.loop
.create_task(self
.run())
159 def start(self
, status
):
161 self
.port
= self
._find
_free
_port
()
168 self
._terminate
.set()
174 return self
._task
.exception()
175 except (asyncio
.CancelledError
, asyncio
.InvalidStateError
):
179 logger
.debug('server: active')
181 async def hello(websocket
, path
):
182 await websocket
.send('hello')
184 async def redirect(path
, request_headers
):
185 return self
.status
, {'Location': self
.destination
}, b
""
188 while not self
._terminate
.is_set():
189 await run_with_interrupt(self
._start
.wait(),
192 if self
._terminate
.is_set():
195 logger
.debug('server: starting {}'.format(self
.status
))
197 async with websockets
.serve(ws_handler
=hello
,
198 process_request
=redirect
,
201 ssl
=self
.ssl_context
,
205 logger
.debug('server: started')
206 while not self
._stop
.is_set():
207 await run_with_interrupt(
208 asyncio
.sleep(1, loop
=self
.loop
),
211 logger
.debug('server: tick')
212 logger
.debug('server: stopping')
213 except asyncio
.CancelledError
:
219 logger
.debug('server: stopped')
220 logger
.debug('server: terminating')
221 except asyncio
.CancelledError
:
226 self
._terminate
.clear()
229 self
.terminated
.set()
230 logger
.debug('server: terminated')
232 def _find_free_port(self
):
233 with
closing(socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)) as s
:
234 s
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
236 return s
.getsockname()[1]