| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 1 | import asyncio |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 2 | import base64 |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 3 | import json |
| 4 | import logging |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 5 | import ssl |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 6 | import urllib.request |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 7 | import weakref |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 8 | from concurrent.futures import CancelledError |
| 9 | from http.client import HTTPSConnection |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 10 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 11 | import macaroonbakery.httpbakery as httpbakery |
| 12 | import macaroonbakery.bakery as bakery |
| 13 | import websockets |
| 14 | from juju import errors, tag, utils |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 15 | from juju.client import client |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 16 | from juju.utils import IdQueue |
| 17 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 18 | log = logging.getLogger('juju.client.connection') |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 19 | |
| 20 | |
| 21 | class Monitor: |
| 22 | """ |
| 23 | Monitor helper class for our Connection class. |
| 24 | |
| 25 | Contains a reference to an instantiated Connection, along with a |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 26 | reference to the Connection.receiver Future. Upon inspection of |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 27 | these objects, this class determines whether the connection is in |
| 28 | an 'error', 'connected' or 'disconnected' state. |
| 29 | |
| 30 | Use this class to stay up to date on the health of a connection, |
| 31 | and take appropriate action if the connection errors out due to |
| 32 | network issues or other unexpected circumstances. |
| 33 | |
| 34 | """ |
| 35 | ERROR = 'error' |
| 36 | CONNECTED = 'connected' |
| 37 | DISCONNECTING = 'disconnecting' |
| 38 | DISCONNECTED = 'disconnected' |
| 39 | |
| 40 | def __init__(self, connection): |
| 41 | self.connection = weakref.ref(connection) |
| 42 | self.reconnecting = asyncio.Lock(loop=connection.loop) |
| 43 | self.close_called = asyncio.Event(loop=connection.loop) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 44 | |
| 45 | @property |
| 46 | def status(self): |
| 47 | """ |
| 48 | Determine the status of the connection and receiver, and return |
| 49 | ERROR, CONNECTED, or DISCONNECTED as appropriate. |
| 50 | |
| 51 | For simplicity, we only consider ourselves to be connected |
| 52 | after the Connection class has setup a receiver task. This |
| 53 | only happens after the websocket is open, and the connection |
| 54 | isn't usable until that receiver has been started. |
| 55 | |
| 56 | """ |
| 57 | connection = self.connection() |
| 58 | |
| 59 | # the connection instance was destroyed but someone kept |
| 60 | # a separate reference to the monitor for some reason |
| 61 | if not connection: |
| 62 | return self.DISCONNECTED |
| 63 | |
| 64 | # connection cleanly disconnected or not yet opened |
| 65 | if not connection.ws: |
| 66 | return self.DISCONNECTED |
| 67 | |
| 68 | # close called but not yet complete |
| 69 | if self.close_called.is_set(): |
| 70 | return self.DISCONNECTING |
| 71 | |
| 72 | # connection closed uncleanly (we didn't call connection.close) |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 73 | stopped = connection._receiver_task.stopped.is_set() |
| 74 | if stopped or not connection.ws.open: |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 75 | return self.ERROR |
| 76 | |
| 77 | # everything is fine! |
| 78 | return self.CONNECTED |
| 79 | |
| 80 | |
| 81 | class Connection: |
| 82 | """ |
| 83 | Usage:: |
| 84 | |
| 85 | # Connect to an arbitrary api server |
| 86 | client = await Connection.connect( |
| 87 | api_endpoint, model_uuid, username, password, cacert) |
| 88 | |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 89 | Note: Any connection method or constructor can accept an optional `loop` |
| 90 | argument to override the default event loop from `asyncio.get_event_loop`. |
| 91 | """ |
| 92 | |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 93 | MAX_FRAME_SIZE = 2**22 |
| 94 | "Maximum size for a single frame. Defaults to 4MB." |
| 95 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 96 | @classmethod |
| 97 | async def connect( |
| 98 | cls, |
| 99 | endpoint=None, |
| 100 | uuid=None, |
| 101 | username=None, |
| 102 | password=None, |
| 103 | cacert=None, |
| 104 | bakery_client=None, |
| 105 | loop=None, |
| 106 | max_frame_size=None, |
| 107 | retries=3, |
| 108 | retry_backoff=10, |
| 109 | ): |
| 110 | """Connect to the websocket. |
| 111 | |
| 112 | If uuid is None, the connection will be to the controller. Otherwise it |
| 113 | will be to the model. |
| 114 | |
| 115 | :param str endpoint: The hostname:port of the controller to connect to. |
| 116 | :param str uuid: The model UUID to connect to (None for a |
| 117 | controller-only connection). |
| 118 | :param str username: The username for controller-local users (or None |
| 119 | to use macaroon-based login.) |
| 120 | :param str password: The password for controller-local users. |
| 121 | :param str cacert: The CA certificate of the controller |
| 122 | (PEM formatted). |
| 123 | :param httpbakery.Client bakery_client: The macaroon bakery client to |
| 124 | to use when performing macaroon-based login. Macaroon tokens |
| 125 | acquired when logging will be saved to bakery_client.cookies. |
| 126 | If this is None, a default bakery_client will be used. |
| 127 | :param asyncio.BaseEventLoop loop: The event loop to use for async |
| 128 | operations. |
| 129 | :param int max_frame_size: The maximum websocket frame size to allow. |
| 130 | :param int retries: When connecting or reconnecting, and all endpoints |
| 131 | fail, how many times to retry the connection before giving up. |
| 132 | :param int retry_backoff: Number of seconds to increase the wait |
| 133 | between connection retry attempts (a backoff of 10 with 3 retries |
| 134 | would wait 10s, 20s, and 30s). |
| 135 | """ |
| 136 | self = cls() |
| 137 | if endpoint is None: |
| 138 | raise ValueError('no endpoint provided') |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 139 | self.uuid = uuid |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 140 | if bakery_client is None: |
| 141 | bakery_client = httpbakery.Client() |
| 142 | self.bakery_client = bakery_client |
| 143 | if username and '@' in username and not username.endswith('@local'): |
| 144 | # We're trying to log in as an external user - we need to use |
| 145 | # macaroon authentication with no username or password. |
| 146 | if password is not None: |
| 147 | raise errors.JujuAuthError('cannot log in as external ' |
| 148 | 'user with a password') |
| 149 | username = None |
| 150 | self.usertag = tag.user(username) |
| 151 | self.password = password |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 152 | self.loop = loop or asyncio.get_event_loop() |
| 153 | |
| 154 | self.__request_id__ = 0 |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 155 | |
| 156 | # The following instance variables are initialized by the |
| 157 | # _connect_with_redirect method, but create them here |
| 158 | # as a reminder that they will exist. |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 159 | self.addr = None |
| 160 | self.ws = None |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 161 | self.endpoint = None |
| 162 | self.cacert = None |
| 163 | self.info = None |
| 164 | |
| 165 | # Create that _Task objects but don't start the tasks yet. |
| 166 | self._pinger_task = _Task(self._pinger, self.loop) |
| 167 | self._receiver_task = _Task(self._receiver, self.loop) |
| 168 | |
| 169 | self._retries = retries |
| 170 | self._retry_backoff = retry_backoff |
| 171 | |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 172 | self.facades = {} |
| 173 | self.messages = IdQueue(loop=self.loop) |
| 174 | self.monitor = Monitor(connection=self) |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 175 | if max_frame_size is None: |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 176 | max_frame_size = self.MAX_FRAME_SIZE |
| 177 | self.max_frame_size = max_frame_size |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 178 | await self._connect_with_redirect([(endpoint, cacert)]) |
| 179 | return self |
| 180 | |
| 181 | @property |
| 182 | def username(self): |
| 183 | if not self.usertag: |
| 184 | return None |
| 185 | return self.usertag[len('user-'):] |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 186 | |
| 187 | @property |
| 188 | def is_open(self): |
| 189 | return self.monitor.status == Monitor.CONNECTED |
| 190 | |
| 191 | def _get_ssl(self, cert=None): |
| 192 | return ssl.create_default_context( |
| 193 | purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert) |
| 194 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 195 | async def _open(self, endpoint, cacert): |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 196 | if self.uuid: |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 197 | url = "wss://{}/model/{}/api".format(endpoint, self.uuid) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 198 | else: |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 199 | url = "wss://{}/api".format(endpoint) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 200 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 201 | return (await websockets.connect( |
| 202 | url, |
| 203 | ssl=self._get_ssl(cacert), |
| 204 | loop=self.loop, |
| 205 | max_size=self.max_frame_size, |
| 206 | ), url, endpoint, cacert) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 207 | |
| 208 | async def close(self): |
| 209 | if not self.ws: |
| 210 | return |
| 211 | self.monitor.close_called.set() |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 212 | await self._pinger_task.stopped.wait() |
| 213 | await self._receiver_task.stopped.wait() |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 214 | await self.ws.close() |
| 215 | self.ws = None |
| 216 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 217 | async def _recv(self, request_id): |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 218 | if not self.is_open: |
| 219 | raise websockets.exceptions.ConnectionClosed(0, 'websocket closed') |
| 220 | return await self.messages.get(request_id) |
| 221 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 222 | async def _receiver(self): |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 223 | try: |
| 224 | while self.is_open: |
| 225 | result = await utils.run_with_interrupt( |
| 226 | self.ws.recv(), |
| 227 | self.monitor.close_called, |
| 228 | loop=self.loop) |
| 229 | if self.monitor.close_called.is_set(): |
| 230 | break |
| 231 | if result is not None: |
| 232 | result = json.loads(result) |
| 233 | await self.messages.put(result['request-id'], result) |
| 234 | except CancelledError: |
| 235 | pass |
| 236 | except websockets.ConnectionClosed as e: |
| 237 | log.warning('Receiver: Connection closed, reconnecting') |
| 238 | await self.messages.put_all(e) |
| 239 | # the reconnect has to be done as a task because the receiver will |
| 240 | # be cancelled by the reconnect and we don't want the reconnect |
| 241 | # to be aborted half-way through |
| 242 | self.loop.create_task(self.reconnect()) |
| 243 | return |
| 244 | except Exception as e: |
| 245 | log.exception("Error in receiver") |
| 246 | # make pending listeners aware of the error |
| 247 | await self.messages.put_all(e) |
| 248 | raise |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 249 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 250 | async def _pinger(self): |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 251 | ''' |
| 252 | A Controller can time us out if we are silent for too long. This |
| 253 | is especially true in JaaS, which has a fairly strict timeout. |
| 254 | |
| 255 | To prevent timing out, we send a ping every ten seconds. |
| 256 | |
| 257 | ''' |
| 258 | async def _do_ping(): |
| 259 | try: |
| 260 | await pinger_facade.Ping() |
| 261 | await asyncio.sleep(10, loop=self.loop) |
| 262 | except CancelledError: |
| 263 | pass |
| 264 | |
| 265 | pinger_facade = client.PingerFacade.from_connection(self) |
| 266 | try: |
| 267 | while True: |
| 268 | await utils.run_with_interrupt( |
| 269 | _do_ping(), |
| 270 | self.monitor.close_called, |
| 271 | loop=self.loop) |
| 272 | if self.monitor.close_called.is_set(): |
| 273 | break |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 274 | except websockets.exceptions.ConnectionClosed: |
| 275 | # The connection has closed - we can't do anything |
| 276 | # more until the connection is restarted. |
| 277 | log.debug('ping failed because of closed connection') |
| 278 | pass |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 279 | |
| 280 | async def rpc(self, msg, encoder=None): |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 281 | '''Make an RPC to the API. The message is encoded as JSON |
| 282 | using the given encoder if any. |
| 283 | :param msg: Parameters for the call (will be encoded as JSON). |
| 284 | :param encoder: Encoder to be used when encoding the message. |
| 285 | :return: The result of the call. |
| 286 | :raises JujuAPIError: When there's an error returned. |
| 287 | :raises JujuError: |
| 288 | ''' |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 289 | self.__request_id__ += 1 |
| 290 | msg['request-id'] = self.__request_id__ |
| 291 | if'params' not in msg: |
| 292 | msg['params'] = {} |
| 293 | if "version" not in msg: |
| 294 | msg['version'] = self.facades[msg['type']] |
| 295 | outgoing = json.dumps(msg, indent=2, cls=encoder) |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 296 | log.debug('connection {} -> {}'.format(id(self), outgoing)) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 297 | for attempt in range(3): |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 298 | if self.monitor.status == Monitor.DISCONNECTED: |
| 299 | # closed cleanly; shouldn't try to reconnect |
| 300 | raise websockets.exceptions.ConnectionClosed( |
| 301 | 0, 'websocket closed') |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 302 | try: |
| 303 | await self.ws.send(outgoing) |
| 304 | break |
| 305 | except websockets.ConnectionClosed: |
| 306 | if attempt == 2: |
| 307 | raise |
| 308 | log.warning('RPC: Connection closed, reconnecting') |
| 309 | # the reconnect has to be done in a separate task because, |
| 310 | # if it is triggered by the pinger, then this RPC call will |
| 311 | # be cancelled when the pinger is cancelled by the reconnect, |
| 312 | # and we don't want the reconnect to be aborted halfway through |
| 313 | await asyncio.wait([self.reconnect()], loop=self.loop) |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 314 | if self.monitor.status != Monitor.CONNECTED: |
| 315 | # reconnect failed; abort and shutdown |
| 316 | log.error('RPC: Automatic reconnect failed') |
| 317 | raise |
| 318 | result = await self._recv(msg['request-id']) |
| 319 | log.debug('connection {} <- {}'.format(id(self), result)) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 320 | |
| 321 | if not result: |
| 322 | return result |
| 323 | |
| 324 | if 'error' in result: |
| 325 | # API Error Response |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 326 | raise errors.JujuAPIError(result) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 327 | |
| 328 | if 'response' not in result: |
| 329 | # This may never happen |
| 330 | return result |
| 331 | |
| 332 | if 'results' in result['response']: |
| 333 | # Check for errors in a result list. |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 334 | # TODO This loses the results that might have succeeded. |
| 335 | # Perhaps JujuError should return all the results including |
| 336 | # errors, or perhaps a keyword parameter to the rpc method |
| 337 | # could be added to trigger this behaviour. |
| 338 | err_results = [] |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 339 | for res in result['response']['results']: |
| 340 | if res.get('error', {}).get('message'): |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 341 | err_results.append(res['error']['message']) |
| 342 | if err_results: |
| 343 | raise errors.JujuError(err_results) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 344 | |
| 345 | elif result['response'].get('error', {}).get('message'): |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 346 | raise errors.JujuError(result['response']['error']['message']) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 347 | |
| 348 | return result |
| 349 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 350 | def _http_headers(self): |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 351 | """Return dictionary of http headers necessary for making an http |
| 352 | connection to the endpoint of this Connection. |
| 353 | |
| 354 | :return: Dictionary of headers |
| 355 | |
| 356 | """ |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 357 | if not self.usertag: |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 358 | return {} |
| 359 | |
| 360 | creds = u'{}:{}'.format( |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 361 | self.usertag, |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 362 | self.password or '' |
| 363 | ) |
| 364 | token = base64.b64encode(creds.encode()) |
| 365 | return { |
| 366 | 'Authorization': 'Basic {}'.format(token.decode()) |
| 367 | } |
| 368 | |
| 369 | def https_connection(self): |
| 370 | """Return an https connection to this Connection's endpoint. |
| 371 | |
| 372 | Returns a 3-tuple containing:: |
| 373 | |
| 374 | 1. The :class:`HTTPSConnection` instance |
| 375 | 2. Dictionary of auth headers to be used with the connection |
| 376 | 3. The root url path (str) to be used for requests. |
| 377 | |
| 378 | """ |
| 379 | endpoint = self.endpoint |
| 380 | host, remainder = endpoint.split(':', 1) |
| 381 | port = remainder |
| 382 | if '/' in remainder: |
| 383 | port, _ = remainder.split('/', 1) |
| 384 | |
| 385 | conn = HTTPSConnection( |
| 386 | host, int(port), |
| 387 | context=self._get_ssl(self.cacert), |
| 388 | ) |
| 389 | |
| 390 | path = ( |
| 391 | "/model/{}".format(self.uuid) |
| 392 | if self.uuid else "" |
| 393 | ) |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 394 | return conn, self._http_headers(), path |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 395 | |
| 396 | async def clone(self): |
| 397 | """Return a new Connection, connected to the same websocket endpoint |
| 398 | as this one. |
| 399 | |
| 400 | """ |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 401 | return await Connection.connect(**self.connect_params()) |
| 402 | |
| 403 | def connect_params(self): |
| 404 | """Return a tuple of parameters suitable for passing to |
| 405 | Connection.connect that can be used to make a new connection |
| 406 | to the same controller (and model if specified. The first |
| 407 | element in the returned tuple holds the endpoint argument; |
| 408 | the other holds a dict of the keyword args. |
| 409 | """ |
| 410 | return { |
| 411 | 'endpoint': self.endpoint, |
| 412 | 'uuid': self.uuid, |
| 413 | 'username': self.username, |
| 414 | 'password': self.password, |
| 415 | 'cacert': self.cacert, |
| 416 | 'bakery_client': self.bakery_client, |
| 417 | 'loop': self.loop, |
| 418 | 'max_frame_size': self.max_frame_size, |
| 419 | } |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 420 | |
| 421 | async def controller(self): |
| 422 | """Return a Connection to the controller at self.endpoint |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 423 | """ |
| 424 | return await Connection.connect( |
| 425 | self.endpoint, |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 426 | username=self.username, |
| 427 | password=self.password, |
| 428 | cacert=self.cacert, |
| 429 | bakery_client=self.bakery_client, |
| 430 | loop=self.loop, |
| 431 | max_frame_size=self.max_frame_size, |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 432 | ) |
| 433 | |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 434 | async def reconnect(self): |
| 435 | """ Force a reconnection. |
| 436 | """ |
| 437 | monitor = self.monitor |
| 438 | if monitor.reconnecting.locked() or monitor.close_called.is_set(): |
| 439 | return |
| 440 | async with monitor.reconnecting: |
| 441 | await self.close() |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 442 | await self._connect_with_login([(self.endpoint, self.cacert)]) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 443 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 444 | async def _connect(self, endpoints): |
| 445 | if len(endpoints) == 0: |
| 446 | raise errors.JujuConnectionError('no endpoints to connect to') |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 447 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 448 | async def _try_endpoint(endpoint, cacert, delay): |
| 449 | if delay: |
| 450 | await asyncio.sleep(delay) |
| 451 | return await self._open(endpoint, cacert) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 452 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 453 | # Try all endpoints in parallel, with slight increasing delay (+100ms |
| 454 | # for each subsequent endpoint); the delay allows us to prefer the |
| 455 | # earlier endpoints over the latter. Use first successful connection. |
| 456 | tasks = [self.loop.create_task(_try_endpoint(endpoint, cacert, |
| 457 | 0.1 * i)) |
| 458 | for i, (endpoint, cacert) in enumerate(endpoints)] |
| 459 | for attempt in range(self._retries + 1): |
| 460 | for task in asyncio.as_completed(tasks, loop=self.loop): |
| 461 | try: |
| 462 | result = await task |
| 463 | break |
| 464 | except ConnectionError: |
| 465 | continue # ignore; try another endpoint |
| 466 | else: |
| 467 | _endpoints_str = ', '.join([endpoint |
| 468 | for endpoint, cacert in endpoints]) |
| 469 | if attempt < self._retries: |
| 470 | log.debug('Retrying connection to endpoints: {}; ' |
| 471 | 'attempt {} of {}'.format(_endpoints_str, |
| 472 | attempt + 1, |
| 473 | self._retries + 1)) |
| 474 | await asyncio.sleep((attempt + 1) * self._retry_backoff) |
| 475 | continue |
| 476 | else: |
| 477 | raise errors.JujuConnectionError( |
| 478 | 'Unable to connect to any endpoint: ' |
| 479 | '{}'.format(_endpoints_str)) |
| 480 | # only executed if inner loop's else did not continue |
| 481 | # (i.e., inner loop did break due to successful connection) |
| 482 | break |
| 483 | for task in tasks: |
| 484 | task.cancel() |
| 485 | self.ws = result[0] |
| 486 | self.addr = result[1] |
| 487 | self.endpoint = result[2] |
| 488 | self.cacert = result[3] |
| 489 | self._receiver_task.start() |
| 490 | log.debug("Driver connected to juju %s", self.addr) |
| 491 | self.monitor.close_called.clear() |
| 492 | |
| 493 | async def _connect_with_login(self, endpoints): |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 494 | """Connect to the websocket. |
| 495 | |
| 496 | If uuid is None, the connection will be to the controller. Otherwise it |
| 497 | will be to the model. |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 498 | :return: The response field of login response JSON object. |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 499 | """ |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 500 | success = False |
| 501 | try: |
| 502 | await self._connect(endpoints) |
| 503 | # It's possible that we may get several discharge-required errors, |
| 504 | # corresponding to different levels of authentication, so retry |
| 505 | # a few times. |
| 506 | for i in range(0, 2): |
| 507 | result = (await self.login())['response'] |
| 508 | macaroonJSON = result.get('discharge-required') |
| 509 | if macaroonJSON is None: |
| 510 | self.info = result |
| 511 | success = True |
| 512 | return result |
| 513 | macaroon = bakery.Macaroon.from_dict(macaroonJSON) |
| 514 | self.bakery_client.handle_error( |
| 515 | httpbakery.Error( |
| 516 | code=httpbakery.ERR_DISCHARGE_REQUIRED, |
| 517 | message=result.get('discharge-required-error'), |
| 518 | version=macaroon.version, |
| 519 | info=httpbakery.ErrorInfo( |
| 520 | macaroon=macaroon, |
| 521 | macaroon_path=result.get('macaroon-path'), |
| 522 | ), |
| 523 | ), |
| 524 | # note: remove the port number. |
| 525 | 'https://' + self.endpoint + '/', |
| 526 | ) |
| 527 | raise errors.JujuAuthError('failed to authenticate ' |
| 528 | 'after several attempts') |
| 529 | finally: |
| 530 | if not success: |
| 531 | await self.close() |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 532 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 533 | async def _connect_with_redirect(self, endpoints): |
| 534 | try: |
| 535 | login_result = await self._connect_with_login(endpoints) |
| 536 | except errors.JujuRedirectException as e: |
| 537 | login_result = await self._connect_with_login(e.endpoints) |
| 538 | self._build_facades(login_result.get('facades', {})) |
| 539 | self._pinger_task.start() |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 540 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 541 | def _build_facades(self, facades): |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 542 | self.facades.clear() |
| 543 | for facade in facades: |
| 544 | self.facades[facade['name']] = facade['versions'][-1] |
| 545 | |
| 546 | async def login(self): |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 547 | params = {} |
| 548 | params['auth-tag'] = self.usertag |
| 549 | if self.password: |
| 550 | params['credentials'] = self.password |
| 551 | else: |
| 552 | macaroons = _macaroons_for_domain(self.bakery_client.cookies, |
| 553 | self.endpoint) |
| 554 | params['macaroons'] = [[bakery.macaroon_to_dict(m) for m in ms] |
| 555 | for ms in macaroons] |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 556 | |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 557 | try: |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 558 | return await self.rpc({ |
| 559 | "type": "Admin", |
| 560 | "request": "Login", |
| 561 | "version": 3, |
| 562 | "params": params, |
| 563 | }) |
| 564 | except errors.JujuAPIError as e: |
| 565 | if e.error_code != 'redirection required': |
| 566 | raise |
| 567 | log.info('Controller requested redirect') |
| 568 | # Fetch additional redirection information now so that |
| 569 | # we can safely close the connection after login |
| 570 | # fails. |
| 571 | redirect_info = (await self.rpc({ |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 572 | "type": "Admin", |
| 573 | "request": "RedirectInfo", |
| 574 | "version": 3, |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 575 | }))['response'] |
| 576 | raise errors.JujuRedirectException(redirect_info) from e |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 577 | |
| 578 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 579 | class _Task: |
| 580 | def __init__(self, task, loop): |
| 581 | self.stopped = asyncio.Event(loop=loop) |
| 582 | self.stopped.set() |
| 583 | self.task = task |
| 584 | self.loop = loop |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 585 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 586 | def start(self): |
| 587 | async def run(): |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 588 | try: |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 589 | return await self.task() |
| 590 | finally: |
| 591 | self.stopped.set() |
| 592 | self.stopped.clear() |
| 593 | self.loop.create_task(run()) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 594 | |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 595 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 596 | def _macaroons_for_domain(cookies, domain): |
| 597 | '''Return any macaroons from the given cookie jar that |
| 598 | apply to the given domain name.''' |
| 599 | req = urllib.request.Request('https://' + domain + '/') |
| 600 | cookies.add_cookie_header(req) |
| 601 | return httpbakery.extract_macaroons(req) |