X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=modules%2Flibjuju%2Ftests%2Fintegration%2Ftest_connection.py;fp=modules%2Flibjuju%2Ftests%2Fintegration%2Ftest_connection.py;h=0000000000000000000000000000000000000000;hp=6647a03646d56f7c87485e8dfae09c35f427f6f7;hb=9d18c22a0dc9e295adda50601fc5e2f45d2c9b8a;hpb=19c5cfca317615597be6bf1051e9d2fa903adb97 diff --git a/modules/libjuju/tests/integration/test_connection.py b/modules/libjuju/tests/integration/test_connection.py deleted file mode 100644 index 6647a03..0000000 --- a/modules/libjuju/tests/integration/test_connection.py +++ /dev/null @@ -1,236 +0,0 @@ -import asyncio -import http -import logging -import socket -import ssl -from contextlib import closing -from pathlib import Path - -from juju.client import client -from juju.client.connection import Connection -from juju.controller import Controller -from juju.utils import run_with_interrupt - -import pytest -import websockets - -from .. import base - - -logger = logging.getLogger(__name__) - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_monitor(event_loop): - async with base.CleanModel() as model: - conn = model.connection() - assert conn.monitor.status == 'connected' - await conn.close() - - assert conn.monitor.status == 'disconnected' - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_monitor_catches_error(event_loop): - - async with base.CleanModel() as model: - conn = model.connection() - - assert conn.monitor.status == 'connected' - try: - async with conn.monitor.reconnecting: - await conn.ws.close() - await asyncio.sleep(1) - assert conn.monitor.status == 'error' - finally: - await conn.close() - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_full_status(event_loop): - async with base.CleanModel() as model: - await model.deploy( - 'ubuntu-0', - application_name='ubuntu', - series='trusty', - channel='stable', - ) - - c = client.ClientFacade.from_connection(model.connection()) - - await c.FullStatus(None) - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_reconnect(event_loop): - async with base.CleanModel() as model: - kwargs = model.connection().connect_params() - conn = await Connection.connect(**kwargs) - try: - await asyncio.sleep(0.1) - assert conn.is_open - await conn.ws.close() - assert not conn.is_open - await model.block_until(lambda: conn.is_open, timeout=3) - finally: - await conn.close() - - -@base.bootstrapped -@pytest.mark.asyncio -async def test_redirect(event_loop): - controller = Controller() - await controller.connect() - kwargs = controller.connection().connect_params() - await controller.disconnect() - - # websockets.server.logger.setLevel(logging.DEBUG) - # websockets.client.logger.setLevel(logging.DEBUG) - # # websockets.protocol.logger.setLevel(logging.DEBUG) - # logger.setLevel(logging.DEBUG) - - destination = 'wss://{}/api'.format(kwargs['endpoint']) - redirect_statuses = [ - http.HTTPStatus.MOVED_PERMANENTLY, - http.HTTPStatus.FOUND, - http.HTTPStatus.SEE_OTHER, - http.HTTPStatus.TEMPORARY_REDIRECT, - http.HTTPStatus.PERMANENT_REDIRECT, - ] - test_server_cert = Path(__file__).with_name('cert.pem') - kwargs['cacert'] += '\n' + test_server_cert.read_text() - server = RedirectServer(destination, event_loop) - try: - for status in redirect_statuses: - logger.debug('test: starting {}'.format(status)) - server.start(status) - await run_with_interrupt(server.running.wait(), - server.terminated) - if server.exception: - raise server.exception - assert not server.terminated.is_set() - logger.debug('test: started') - kwargs_copy = dict(kwargs, - endpoint='localhost:{}'.format(server.port)) - logger.debug('test: connecting') - conn = await Connection.connect(**kwargs_copy) - logger.debug('test: connected') - await conn.close() - logger.debug('test: stopping') - server.stop() - await server.stopped.wait() - logger.debug('test: stopped') - finally: - server.terminate() - await server.terminated.wait() - - -class RedirectServer: - def __init__(self, destination, loop): - self.destination = destination - self.loop = loop - self._start = asyncio.Event() - self._stop = asyncio.Event() - self._terminate = asyncio.Event() - self.running = asyncio.Event() - self.stopped = asyncio.Event() - self.terminated = asyncio.Event() - if hasattr(ssl, 'PROTOCOL_TLS_SERVER'): - # python 3.6+ - protocol = ssl.PROTOCOL_TLS_SERVER - elif hasattr(ssl, 'PROTOCOL_TLS'): - # python 3.5.3+ - protocol = ssl.PROTOCOL_TLS - else: - # python 3.5.2 - protocol = ssl.PROTOCOL_TLSv1_2 - self.ssl_context = ssl.SSLContext(protocol) - crt_file = Path(__file__).with_name('cert.pem') - key_file = Path(__file__).with_name('key.pem') - self.ssl_context.load_cert_chain(str(crt_file), str(key_file)) - self.status = None - self.port = None - self._task = self.loop.create_task(self.run()) - - def start(self, status): - self.status = status - self.port = self._find_free_port() - self._start.set() - - def stop(self): - self._stop.set() - - def terminate(self): - self._terminate.set() - self.stop() - - @property - def exception(self): - try: - return self._task.exception() - except (asyncio.CancelledError, asyncio.InvalidStateError): - return None - - async def run(self): - logger.debug('server: active') - - async def hello(websocket, path): - await websocket.send('hello') - - async def redirect(path, request_headers): - return self.status, {'Location': self.destination}, b"" - - try: - while not self._terminate.is_set(): - await run_with_interrupt(self._start.wait(), - self._terminate, - loop=self.loop) - if self._terminate.is_set(): - break - self._start.clear() - logger.debug('server: starting {}'.format(self.status)) - try: - async with websockets.serve(ws_handler=hello, - process_request=redirect, - host='localhost', - port=self.port, - ssl=self.ssl_context, - loop=self.loop): - self.stopped.clear() - self.running.set() - logger.debug('server: started') - while not self._stop.is_set(): - await run_with_interrupt( - asyncio.sleep(1, loop=self.loop), - self._stop, - loop=self.loop) - logger.debug('server: tick') - logger.debug('server: stopping') - except asyncio.CancelledError: - break - finally: - self.stopped.set() - self._stop.clear() - self.running.clear() - logger.debug('server: stopped') - logger.debug('server: terminating') - except asyncio.CancelledError: - pass - finally: - self._start.clear() - self._stop.clear() - self._terminate.clear() - self.stopped.set() - self.running.clear() - self.terminated.set() - logger.debug('server: terminated') - - def _find_free_port(self): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(('', 0)) - return s.getsockname()[1]