X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=modules%2Flibjuju%2Fjuju%2Fclient%2Fconnection.py;fp=modules%2Flibjuju%2Fjuju%2Fclient%2Fconnection.py;h=0000000000000000000000000000000000000000;hp=f2150b7dc44481b6e43b0ed844dd2c9b1ea16a91;hb=9d18c22a0dc9e295adda50601fc5e2f45d2c9b8a;hpb=19c5cfca317615597be6bf1051e9d2fa903adb97 diff --git a/modules/libjuju/juju/client/connection.py b/modules/libjuju/juju/client/connection.py deleted file mode 100644 index f2150b7..0000000 --- a/modules/libjuju/juju/client/connection.py +++ /dev/null @@ -1,601 +0,0 @@ -import asyncio -import base64 -import json -import logging -import ssl -import urllib.request -import weakref -from concurrent.futures import CancelledError -from http.client import HTTPSConnection - -import macaroonbakery.httpbakery as httpbakery -import macaroonbakery.bakery as bakery -import websockets -from juju import errors, tag, utils -from juju.client import client -from juju.utils import IdQueue - -log = logging.getLogger('juju.client.connection') - - -class Monitor: - """ - Monitor helper class for our Connection class. - - Contains a reference to an instantiated Connection, along with a - reference to the Connection.receiver Future. Upon inspection of - these objects, this class determines whether the connection is in - an 'error', 'connected' or 'disconnected' state. - - Use this class to stay up to date on the health of a connection, - and take appropriate action if the connection errors out due to - network issues or other unexpected circumstances. - - """ - ERROR = 'error' - CONNECTED = 'connected' - DISCONNECTING = 'disconnecting' - DISCONNECTED = 'disconnected' - - def __init__(self, connection): - self.connection = weakref.ref(connection) - self.reconnecting = asyncio.Lock(loop=connection.loop) - self.close_called = asyncio.Event(loop=connection.loop) - - @property - def status(self): - """ - Determine the status of the connection and receiver, and return - ERROR, CONNECTED, or DISCONNECTED as appropriate. - - For simplicity, we only consider ourselves to be connected - after the Connection class has setup a receiver task. This - only happens after the websocket is open, and the connection - isn't usable until that receiver has been started. - - """ - connection = self.connection() - - # the connection instance was destroyed but someone kept - # a separate reference to the monitor for some reason - if not connection: - return self.DISCONNECTED - - # connection cleanly disconnected or not yet opened - if not connection.ws: - return self.DISCONNECTED - - # close called but not yet complete - if self.close_called.is_set(): - return self.DISCONNECTING - - # connection closed uncleanly (we didn't call connection.close) - stopped = connection._receiver_task.stopped.is_set() - if stopped or not connection.ws.open: - return self.ERROR - - # everything is fine! - return self.CONNECTED - - -class Connection: - """ - Usage:: - - # Connect to an arbitrary api server - client = await Connection.connect( - api_endpoint, model_uuid, username, password, cacert) - - Note: Any connection method or constructor can accept an optional `loop` - argument to override the default event loop from `asyncio.get_event_loop`. - """ - - MAX_FRAME_SIZE = 2**22 - "Maximum size for a single frame. Defaults to 4MB." - - @classmethod - async def connect( - cls, - endpoint=None, - uuid=None, - username=None, - password=None, - cacert=None, - bakery_client=None, - loop=None, - max_frame_size=None, - retries=3, - retry_backoff=10, - ): - """Connect to the websocket. - - If uuid is None, the connection will be to the controller. Otherwise it - will be to the model. - - :param str endpoint: The hostname:port of the controller to connect to. - :param str uuid: The model UUID to connect to (None for a - controller-only connection). - :param str username: The username for controller-local users (or None - to use macaroon-based login.) - :param str password: The password for controller-local users. - :param str cacert: The CA certificate of the controller - (PEM formatted). - :param httpbakery.Client bakery_client: The macaroon bakery client to - to use when performing macaroon-based login. Macaroon tokens - acquired when logging will be saved to bakery_client.cookies. - If this is None, a default bakery_client will be used. - :param asyncio.BaseEventLoop loop: The event loop to use for async - operations. - :param int max_frame_size: The maximum websocket frame size to allow. - :param int retries: When connecting or reconnecting, and all endpoints - fail, how many times to retry the connection before giving up. - :param int retry_backoff: Number of seconds to increase the wait - between connection retry attempts (a backoff of 10 with 3 retries - would wait 10s, 20s, and 30s). - """ - self = cls() - if endpoint is None: - raise ValueError('no endpoint provided') - self.uuid = uuid - if bakery_client is None: - bakery_client = httpbakery.Client() - self.bakery_client = bakery_client - if username and '@' in username and not username.endswith('@local'): - # We're trying to log in as an external user - we need to use - # macaroon authentication with no username or password. - if password is not None: - raise errors.JujuAuthError('cannot log in as external ' - 'user with a password') - username = None - self.usertag = tag.user(username) - self.password = password - self.loop = loop or asyncio.get_event_loop() - - self.__request_id__ = 0 - - # The following instance variables are initialized by the - # _connect_with_redirect method, but create them here - # as a reminder that they will exist. - self.addr = None - self.ws = None - self.endpoint = None - self.cacert = None - self.info = None - - # Create that _Task objects but don't start the tasks yet. - self._pinger_task = _Task(self._pinger, self.loop) - self._receiver_task = _Task(self._receiver, self.loop) - - self._retries = retries - self._retry_backoff = retry_backoff - - self.facades = {} - self.messages = IdQueue(loop=self.loop) - self.monitor = Monitor(connection=self) - if max_frame_size is None: - max_frame_size = self.MAX_FRAME_SIZE - self.max_frame_size = max_frame_size - await self._connect_with_redirect([(endpoint, cacert)]) - return self - - @property - def username(self): - if not self.usertag: - return None - return self.usertag[len('user-'):] - - @property - def is_open(self): - return self.monitor.status == Monitor.CONNECTED - - def _get_ssl(self, cert=None): - return ssl.create_default_context( - purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert) - - async def _open(self, endpoint, cacert): - if self.uuid: - url = "wss://{}/model/{}/api".format(endpoint, self.uuid) - else: - url = "wss://{}/api".format(endpoint) - - return (await websockets.connect( - url, - ssl=self._get_ssl(cacert), - loop=self.loop, - max_size=self.max_frame_size, - ), url, endpoint, cacert) - - async def close(self): - if not self.ws: - return - self.monitor.close_called.set() - await self._pinger_task.stopped.wait() - await self._receiver_task.stopped.wait() - await self.ws.close() - self.ws = None - - async def _recv(self, request_id): - if not self.is_open: - raise websockets.exceptions.ConnectionClosed(0, 'websocket closed') - return await self.messages.get(request_id) - - async def _receiver(self): - try: - while self.is_open: - result = await utils.run_with_interrupt( - self.ws.recv(), - self.monitor.close_called, - loop=self.loop) - if self.monitor.close_called.is_set(): - break - if result is not None: - result = json.loads(result) - await self.messages.put(result['request-id'], result) - except CancelledError: - pass - except websockets.ConnectionClosed as e: - log.warning('Receiver: Connection closed, reconnecting') - await self.messages.put_all(e) - # the reconnect has to be done as a task because the receiver will - # be cancelled by the reconnect and we don't want the reconnect - # to be aborted half-way through - self.loop.create_task(self.reconnect()) - return - except Exception as e: - log.exception("Error in receiver") - # make pending listeners aware of the error - await self.messages.put_all(e) - raise - - async def _pinger(self): - ''' - A Controller can time us out if we are silent for too long. This - is especially true in JaaS, which has a fairly strict timeout. - - To prevent timing out, we send a ping every ten seconds. - - ''' - async def _do_ping(): - try: - await pinger_facade.Ping() - await asyncio.sleep(10, loop=self.loop) - except CancelledError: - pass - - pinger_facade = client.PingerFacade.from_connection(self) - try: - while True: - await utils.run_with_interrupt( - _do_ping(), - self.monitor.close_called, - loop=self.loop) - if self.monitor.close_called.is_set(): - break - except websockets.exceptions.ConnectionClosed: - # The connection has closed - we can't do anything - # more until the connection is restarted. - log.debug('ping failed because of closed connection') - pass - - async def rpc(self, msg, encoder=None): - '''Make an RPC to the API. The message is encoded as JSON - using the given encoder if any. - :param msg: Parameters for the call (will be encoded as JSON). - :param encoder: Encoder to be used when encoding the message. - :return: The result of the call. - :raises JujuAPIError: When there's an error returned. - :raises JujuError: - ''' - self.__request_id__ += 1 - msg['request-id'] = self.__request_id__ - if'params' not in msg: - msg['params'] = {} - if "version" not in msg: - msg['version'] = self.facades[msg['type']] - outgoing = json.dumps(msg, indent=2, cls=encoder) - log.debug('connection {} -> {}'.format(id(self), outgoing)) - for attempt in range(3): - if self.monitor.status == Monitor.DISCONNECTED: - # closed cleanly; shouldn't try to reconnect - raise websockets.exceptions.ConnectionClosed( - 0, 'websocket closed') - try: - await self.ws.send(outgoing) - break - except websockets.ConnectionClosed: - if attempt == 2: - raise - log.warning('RPC: Connection closed, reconnecting') - # the reconnect has to be done in a separate task because, - # if it is triggered by the pinger, then this RPC call will - # be cancelled when the pinger is cancelled by the reconnect, - # and we don't want the reconnect to be aborted halfway through - await asyncio.wait([self.reconnect()], loop=self.loop) - if self.monitor.status != Monitor.CONNECTED: - # reconnect failed; abort and shutdown - log.error('RPC: Automatic reconnect failed') - raise - result = await self._recv(msg['request-id']) - log.debug('connection {} <- {}'.format(id(self), result)) - - if not result: - return result - - if 'error' in result: - # API Error Response - raise errors.JujuAPIError(result) - - if 'response' not in result: - # This may never happen - return result - - if 'results' in result['response']: - # Check for errors in a result list. - # TODO This loses the results that might have succeeded. - # Perhaps JujuError should return all the results including - # errors, or perhaps a keyword parameter to the rpc method - # could be added to trigger this behaviour. - err_results = [] - for res in result['response']['results']: - if res.get('error', {}).get('message'): - err_results.append(res['error']['message']) - if err_results: - raise errors.JujuError(err_results) - - elif result['response'].get('error', {}).get('message'): - raise errors.JujuError(result['response']['error']['message']) - - return result - - def _http_headers(self): - """Return dictionary of http headers necessary for making an http - connection to the endpoint of this Connection. - - :return: Dictionary of headers - - """ - if not self.usertag: - return {} - - creds = u'{}:{}'.format( - self.usertag, - self.password or '' - ) - token = base64.b64encode(creds.encode()) - return { - 'Authorization': 'Basic {}'.format(token.decode()) - } - - def https_connection(self): - """Return an https connection to this Connection's endpoint. - - Returns a 3-tuple containing:: - - 1. The :class:`HTTPSConnection` instance - 2. Dictionary of auth headers to be used with the connection - 3. The root url path (str) to be used for requests. - - """ - endpoint = self.endpoint - host, remainder = endpoint.split(':', 1) - port = remainder - if '/' in remainder: - port, _ = remainder.split('/', 1) - - conn = HTTPSConnection( - host, int(port), - context=self._get_ssl(self.cacert), - ) - - path = ( - "/model/{}".format(self.uuid) - if self.uuid else "" - ) - return conn, self._http_headers(), path - - async def clone(self): - """Return a new Connection, connected to the same websocket endpoint - as this one. - - """ - return await Connection.connect(**self.connect_params()) - - def connect_params(self): - """Return a tuple of parameters suitable for passing to - Connection.connect that can be used to make a new connection - to the same controller (and model if specified. The first - element in the returned tuple holds the endpoint argument; - the other holds a dict of the keyword args. - """ - return { - 'endpoint': self.endpoint, - 'uuid': self.uuid, - 'username': self.username, - 'password': self.password, - 'cacert': self.cacert, - 'bakery_client': self.bakery_client, - 'loop': self.loop, - 'max_frame_size': self.max_frame_size, - } - - async def controller(self): - """Return a Connection to the controller at self.endpoint - """ - return await Connection.connect( - self.endpoint, - username=self.username, - password=self.password, - cacert=self.cacert, - bakery_client=self.bakery_client, - loop=self.loop, - max_frame_size=self.max_frame_size, - ) - - async def reconnect(self): - """ Force a reconnection. - """ - monitor = self.monitor - if monitor.reconnecting.locked() or monitor.close_called.is_set(): - return - async with monitor.reconnecting: - await self.close() - await self._connect_with_login([(self.endpoint, self.cacert)]) - - async def _connect(self, endpoints): - if len(endpoints) == 0: - raise errors.JujuConnectionError('no endpoints to connect to') - - async def _try_endpoint(endpoint, cacert, delay): - if delay: - await asyncio.sleep(delay) - return await self._open(endpoint, cacert) - - # Try all endpoints in parallel, with slight increasing delay (+100ms - # for each subsequent endpoint); the delay allows us to prefer the - # earlier endpoints over the latter. Use first successful connection. - tasks = [self.loop.create_task(_try_endpoint(endpoint, cacert, - 0.1 * i)) - for i, (endpoint, cacert) in enumerate(endpoints)] - for attempt in range(self._retries + 1): - for task in asyncio.as_completed(tasks, loop=self.loop): - try: - result = await task - break - except ConnectionError: - continue # ignore; try another endpoint - else: - _endpoints_str = ', '.join([endpoint - for endpoint, cacert in endpoints]) - if attempt < self._retries: - log.debug('Retrying connection to endpoints: {}; ' - 'attempt {} of {}'.format(_endpoints_str, - attempt + 1, - self._retries + 1)) - await asyncio.sleep((attempt + 1) * self._retry_backoff) - continue - else: - raise errors.JujuConnectionError( - 'Unable to connect to any endpoint: ' - '{}'.format(_endpoints_str)) - # only executed if inner loop's else did not continue - # (i.e., inner loop did break due to successful connection) - break - for task in tasks: - task.cancel() - self.ws = result[0] - self.addr = result[1] - self.endpoint = result[2] - self.cacert = result[3] - self._receiver_task.start() - log.debug("Driver connected to juju %s", self.addr) - self.monitor.close_called.clear() - - async def _connect_with_login(self, endpoints): - """Connect to the websocket. - - If uuid is None, the connection will be to the controller. Otherwise it - will be to the model. - :return: The response field of login response JSON object. - """ - success = False - try: - await self._connect(endpoints) - # It's possible that we may get several discharge-required errors, - # corresponding to different levels of authentication, so retry - # a few times. - for i in range(0, 2): - result = (await self.login())['response'] - macaroonJSON = result.get('discharge-required') - if macaroonJSON is None: - self.info = result - success = True - return result - macaroon = bakery.Macaroon.from_dict(macaroonJSON) - self.bakery_client.handle_error( - httpbakery.Error( - code=httpbakery.ERR_DISCHARGE_REQUIRED, - message=result.get('discharge-required-error'), - version=macaroon.version, - info=httpbakery.ErrorInfo( - macaroon=macaroon, - macaroon_path=result.get('macaroon-path'), - ), - ), - # note: remove the port number. - 'https://' + self.endpoint + '/', - ) - raise errors.JujuAuthError('failed to authenticate ' - 'after several attempts') - finally: - if not success: - await self.close() - - async def _connect_with_redirect(self, endpoints): - try: - login_result = await self._connect_with_login(endpoints) - except errors.JujuRedirectException as e: - login_result = await self._connect_with_login(e.endpoints) - self._build_facades(login_result.get('facades', {})) - self._pinger_task.start() - - def _build_facades(self, facades): - self.facades.clear() - for facade in facades: - self.facades[facade['name']] = facade['versions'][-1] - - async def login(self): - params = {} - params['auth-tag'] = self.usertag - if self.password: - params['credentials'] = self.password - else: - macaroons = _macaroons_for_domain(self.bakery_client.cookies, - self.endpoint) - params['macaroons'] = [[bakery.macaroon_to_dict(m) for m in ms] - for ms in macaroons] - - try: - return await self.rpc({ - "type": "Admin", - "request": "Login", - "version": 3, - "params": params, - }) - except errors.JujuAPIError as e: - if e.error_code != 'redirection required': - raise - log.info('Controller requested redirect') - # Fetch additional redirection information now so that - # we can safely close the connection after login - # fails. - redirect_info = (await self.rpc({ - "type": "Admin", - "request": "RedirectInfo", - "version": 3, - }))['response'] - raise errors.JujuRedirectException(redirect_info) from e - - -class _Task: - def __init__(self, task, loop): - self.stopped = asyncio.Event(loop=loop) - self.stopped.set() - self.task = task - self.loop = loop - - def start(self): - async def run(): - try: - return await self.task() - finally: - self.stopped.set() - self.stopped.clear() - self.loop.create_task(run()) - - -def _macaroons_for_domain(cookies, domain): - '''Return any macaroons from the given cookie jar that - apply to the given domain name.''' - req = urllib.request.Request('https://' + domain + '/') - cookies.add_cookie_header(req) - return httpbakery.extract_macaroons(req)