blob: 13770a5343d841bbb8369034c03499eaf7d8bc5e [file] [log] [blame]
Adam Israelc3e6c2e2018-03-01 09:31:50 -05001import asyncio
Adam Israeldcdf82b2017-08-15 15:26:43 -04002import base64
Adam Israeldcdf82b2017-08-15 15:26:43 -04003import json
4import logging
Adam Israeldcdf82b2017-08-15 15:26:43 -04005import ssl
Adam Israelc3e6c2e2018-03-01 09:31:50 -05006import urllib.request
Adam Israeldcdf82b2017-08-15 15:26:43 -04007import weakref
Adam Israeldcdf82b2017-08-15 15:26:43 -04008from concurrent.futures import CancelledError
9from http.client import HTTPSConnection
Adam Israeldcdf82b2017-08-15 15:26:43 -040010
Adam Israelc3e6c2e2018-03-01 09:31:50 -050011import macaroonbakery.httpbakery as httpbakery
12import macaroonbakery.bakery as bakery
13import websockets
14from juju import errors, tag, utils
Adam Israeldcdf82b2017-08-15 15:26:43 -040015from juju.client import client
Adam Israeldcdf82b2017-08-15 15:26:43 -040016from juju.utils import IdQueue
17
Adam Israelc3e6c2e2018-03-01 09:31:50 -050018log = logging.getLogger('juju.client.connection')
Adam Israeldcdf82b2017-08-15 15:26:43 -040019
20
21class Monitor:
22 """
23 Monitor helper class for our Connection class.
24
25 Contains a reference to an instantiated Connection, along with a
Adam Israelc3e6c2e2018-03-01 09:31:50 -050026 reference to the Connection.receiver Future. Upon inspection of
Adam Israeldcdf82b2017-08-15 15:26:43 -040027 these objects, this class determines whether the connection is in
28 an 'error', 'connected' or 'disconnected' state.
29
30 Use this class to stay up to date on the health of a connection,
31 and take appropriate action if the connection errors out due to
32 network issues or other unexpected circumstances.
33
34 """
35 ERROR = 'error'
36 CONNECTED = 'connected'
37 DISCONNECTING = 'disconnecting'
38 DISCONNECTED = 'disconnected'
39
40 def __init__(self, connection):
41 self.connection = weakref.ref(connection)
42 self.reconnecting = asyncio.Lock(loop=connection.loop)
43 self.close_called = asyncio.Event(loop=connection.loop)
Adam Israeldcdf82b2017-08-15 15:26:43 -040044
45 @property
46 def status(self):
47 """
48 Determine the status of the connection and receiver, and return
49 ERROR, CONNECTED, or DISCONNECTED as appropriate.
50
51 For simplicity, we only consider ourselves to be connected
52 after the Connection class has setup a receiver task. This
53 only happens after the websocket is open, and the connection
54 isn't usable until that receiver has been started.
55
56 """
57 connection = self.connection()
58
59 # the connection instance was destroyed but someone kept
60 # a separate reference to the monitor for some reason
61 if not connection:
62 return self.DISCONNECTED
63
64 # connection cleanly disconnected or not yet opened
65 if not connection.ws:
66 return self.DISCONNECTED
67
68 # close called but not yet complete
69 if self.close_called.is_set():
70 return self.DISCONNECTING
71
72 # connection closed uncleanly (we didn't call connection.close)
Adam Israelc3e6c2e2018-03-01 09:31:50 -050073 stopped = connection._receiver_task.stopped.is_set()
74 if stopped or not connection.ws.open:
Adam Israeldcdf82b2017-08-15 15:26:43 -040075 return self.ERROR
76
77 # everything is fine!
78 return self.CONNECTED
79
80
81class Connection:
82 """
83 Usage::
84
85 # Connect to an arbitrary api server
86 client = await Connection.connect(
87 api_endpoint, model_uuid, username, password, cacert)
88
Adam Israeldcdf82b2017-08-15 15:26:43 -040089 Note: Any connection method or constructor can accept an optional `loop`
90 argument to override the default event loop from `asyncio.get_event_loop`.
91 """
92
Adam Israeldcdf82b2017-08-15 15:26:43 -040093 MAX_FRAME_SIZE = 2**22
94 "Maximum size for a single frame. Defaults to 4MB."
95
Adam Israelc3e6c2e2018-03-01 09:31:50 -050096 @classmethod
97 async def connect(
98 cls,
99 endpoint=None,
100 uuid=None,
101 username=None,
102 password=None,
103 cacert=None,
104 bakery_client=None,
105 loop=None,
106 max_frame_size=None,
107 ):
108 """Connect to the websocket.
109
110 If uuid is None, the connection will be to the controller. Otherwise it
111 will be to the model.
Adam Israelb0943662018-08-02 15:32:00 -0400112
113 :param str endpoint: The hostname:port of the controller to connect to.
114 :param str uuid: The model UUID to connect to (None for a
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500115 controller-only connection).
Adam Israelb0943662018-08-02 15:32:00 -0400116 :param str username: The username for controller-local users (or None
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500117 to use macaroon-based login.)
Adam Israelb0943662018-08-02 15:32:00 -0400118 :param str password: The password for controller-local users.
119 :param str cacert: The CA certificate of the controller
120 (PEM formatted).
121 :param httpbakery.Client bakery_client: The macaroon bakery client to
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500122 to use when performing macaroon-based login. Macaroon tokens
123 acquired when logging will be saved to bakery_client.cookies.
124 If this is None, a default bakery_client will be used.
Adam Israelb0943662018-08-02 15:32:00 -0400125 :param asyncio.BaseEventLoop loop: The event loop to use for async
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500126 operations.
Adam Israelb0943662018-08-02 15:32:00 -0400127 :param int max_frame_size: The maximum websocket frame size to allow.
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500128 """
129 self = cls()
130 if endpoint is None:
131 raise ValueError('no endpoint provided')
Adam Israeldcdf82b2017-08-15 15:26:43 -0400132 self.uuid = uuid
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500133 if bakery_client is None:
134 bakery_client = httpbakery.Client()
135 self.bakery_client = bakery_client
136 if username and '@' in username and not username.endswith('@local'):
137 # We're trying to log in as an external user - we need to use
138 # macaroon authentication with no username or password.
139 if password is not None:
140 raise errors.JujuAuthError('cannot log in as external '
141 'user with a password')
142 username = None
143 self.usertag = tag.user(username)
144 self.password = password
Adam Israeldcdf82b2017-08-15 15:26:43 -0400145 self.loop = loop or asyncio.get_event_loop()
146
147 self.__request_id__ = 0
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500148
149 # The following instance variables are initialized by the
150 # _connect_with_redirect method, but create them here
151 # as a reminder that they will exist.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400152 self.addr = None
153 self.ws = None
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500154 self.endpoint = None
155 self.cacert = None
156 self.info = None
157
158 # Create that _Task objects but don't start the tasks yet.
159 self._pinger_task = _Task(self._pinger, self.loop)
160 self._receiver_task = _Task(self._receiver, self.loop)
161
Adam Israeldcdf82b2017-08-15 15:26:43 -0400162 self.facades = {}
163 self.messages = IdQueue(loop=self.loop)
164 self.monitor = Monitor(connection=self)
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500165 if max_frame_size is None:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400166 max_frame_size = self.MAX_FRAME_SIZE
167 self.max_frame_size = max_frame_size
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500168 await self._connect_with_redirect([(endpoint, cacert)])
169 return self
170
171 @property
172 def username(self):
173 if not self.usertag:
174 return None
175 return self.usertag[len('user-'):]
Adam Israeldcdf82b2017-08-15 15:26:43 -0400176
177 @property
178 def is_open(self):
179 return self.monitor.status == Monitor.CONNECTED
180
181 def _get_ssl(self, cert=None):
182 return ssl.create_default_context(
183 purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
184
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500185 async def _open(self, endpoint, cacert):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400186 if self.uuid:
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500187 url = "wss://{}/model/{}/api".format(endpoint, self.uuid)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400188 else:
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500189 url = "wss://{}/api".format(endpoint)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400190
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500191 return (await websockets.connect(
192 url,
193 ssl=self._get_ssl(cacert),
194 loop=self.loop,
195 max_size=self.max_frame_size,
196 ), url, endpoint, cacert)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400197
198 async def close(self):
199 if not self.ws:
200 return
201 self.monitor.close_called.set()
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500202 await self._pinger_task.stopped.wait()
203 await self._receiver_task.stopped.wait()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400204 await self.ws.close()
205 self.ws = None
206
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500207 async def _recv(self, request_id):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400208 if not self.is_open:
209 raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
210 return await self.messages.get(request_id)
211
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500212 async def _receiver(self):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400213 try:
214 while self.is_open:
215 result = await utils.run_with_interrupt(
216 self.ws.recv(),
217 self.monitor.close_called,
218 loop=self.loop)
219 if self.monitor.close_called.is_set():
220 break
221 if result is not None:
222 result = json.loads(result)
223 await self.messages.put(result['request-id'], result)
224 except CancelledError:
225 pass
226 except websockets.ConnectionClosed as e:
227 log.warning('Receiver: Connection closed, reconnecting')
228 await self.messages.put_all(e)
229 # the reconnect has to be done as a task because the receiver will
230 # be cancelled by the reconnect and we don't want the reconnect
231 # to be aborted half-way through
232 self.loop.create_task(self.reconnect())
233 return
234 except Exception as e:
235 log.exception("Error in receiver")
236 # make pending listeners aware of the error
237 await self.messages.put_all(e)
238 raise
Adam Israeldcdf82b2017-08-15 15:26:43 -0400239
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500240 async def _pinger(self):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400241 '''
242 A Controller can time us out if we are silent for too long. This
243 is especially true in JaaS, which has a fairly strict timeout.
244
245 To prevent timing out, we send a ping every ten seconds.
246
247 '''
248 async def _do_ping():
249 try:
250 await pinger_facade.Ping()
251 await asyncio.sleep(10, loop=self.loop)
252 except CancelledError:
253 pass
254
255 pinger_facade = client.PingerFacade.from_connection(self)
256 try:
257 while True:
258 await utils.run_with_interrupt(
259 _do_ping(),
260 self.monitor.close_called,
261 loop=self.loop)
262 if self.monitor.close_called.is_set():
263 break
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500264 except websockets.exceptions.ConnectionClosed:
265 # The connection has closed - we can't do anything
266 # more until the connection is restarted.
267 log.debug('ping failed because of closed connection')
268 pass
Adam Israeldcdf82b2017-08-15 15:26:43 -0400269
270 async def rpc(self, msg, encoder=None):
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500271 '''Make an RPC to the API. The message is encoded as JSON
272 using the given encoder if any.
273 :param msg: Parameters for the call (will be encoded as JSON).
274 :param encoder: Encoder to be used when encoding the message.
275 :return: The result of the call.
276 :raises JujuAPIError: When there's an error returned.
277 :raises JujuError:
278 '''
Adam Israeldcdf82b2017-08-15 15:26:43 -0400279 self.__request_id__ += 1
280 msg['request-id'] = self.__request_id__
281 if'params' not in msg:
282 msg['params'] = {}
283 if "version" not in msg:
284 msg['version'] = self.facades[msg['type']]
285 outgoing = json.dumps(msg, indent=2, cls=encoder)
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500286 log.debug('connection {} -> {}'.format(id(self), outgoing))
Adam Israeldcdf82b2017-08-15 15:26:43 -0400287 for attempt in range(3):
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500288 if self.monitor.status == Monitor.DISCONNECTED:
289 # closed cleanly; shouldn't try to reconnect
290 raise websockets.exceptions.ConnectionClosed(
291 0, 'websocket closed')
Adam Israeldcdf82b2017-08-15 15:26:43 -0400292 try:
293 await self.ws.send(outgoing)
294 break
295 except websockets.ConnectionClosed:
296 if attempt == 2:
297 raise
298 log.warning('RPC: Connection closed, reconnecting')
299 # the reconnect has to be done in a separate task because,
300 # if it is triggered by the pinger, then this RPC call will
301 # be cancelled when the pinger is cancelled by the reconnect,
302 # and we don't want the reconnect to be aborted halfway through
303 await asyncio.wait([self.reconnect()], loop=self.loop)
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500304 if self.monitor.status != Monitor.CONNECTED:
305 # reconnect failed; abort and shutdown
306 log.error('RPC: Automatic reconnect failed')
307 raise
308 result = await self._recv(msg['request-id'])
309 log.debug('connection {} <- {}'.format(id(self), result))
Adam Israeldcdf82b2017-08-15 15:26:43 -0400310
311 if not result:
312 return result
313
314 if 'error' in result:
315 # API Error Response
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500316 raise errors.JujuAPIError(result)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400317
318 if 'response' not in result:
319 # This may never happen
320 return result
321
322 if 'results' in result['response']:
323 # Check for errors in a result list.
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500324 # TODO This loses the results that might have succeeded.
325 # Perhaps JujuError should return all the results including
326 # errors, or perhaps a keyword parameter to the rpc method
327 # could be added to trigger this behaviour.
328 err_results = []
Adam Israeldcdf82b2017-08-15 15:26:43 -0400329 for res in result['response']['results']:
330 if res.get('error', {}).get('message'):
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500331 err_results.append(res['error']['message'])
332 if err_results:
333 raise errors.JujuError(err_results)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400334
335 elif result['response'].get('error', {}).get('message'):
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500336 raise errors.JujuError(result['response']['error']['message'])
Adam Israeldcdf82b2017-08-15 15:26:43 -0400337
338 return result
339
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500340 def _http_headers(self):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400341 """Return dictionary of http headers necessary for making an http
342 connection to the endpoint of this Connection.
343
344 :return: Dictionary of headers
345
346 """
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500347 if not self.usertag:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400348 return {}
349
350 creds = u'{}:{}'.format(
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500351 self.usertag,
Adam Israeldcdf82b2017-08-15 15:26:43 -0400352 self.password or ''
353 )
354 token = base64.b64encode(creds.encode())
355 return {
356 'Authorization': 'Basic {}'.format(token.decode())
357 }
358
359 def https_connection(self):
360 """Return an https connection to this Connection's endpoint.
361
362 Returns a 3-tuple containing::
363
364 1. The :class:`HTTPSConnection` instance
365 2. Dictionary of auth headers to be used with the connection
366 3. The root url path (str) to be used for requests.
367
368 """
369 endpoint = self.endpoint
370 host, remainder = endpoint.split(':', 1)
371 port = remainder
372 if '/' in remainder:
373 port, _ = remainder.split('/', 1)
374
375 conn = HTTPSConnection(
376 host, int(port),
377 context=self._get_ssl(self.cacert),
378 )
379
380 path = (
381 "/model/{}".format(self.uuid)
382 if self.uuid else ""
383 )
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500384 return conn, self._http_headers(), path
Adam Israeldcdf82b2017-08-15 15:26:43 -0400385
386 async def clone(self):
387 """Return a new Connection, connected to the same websocket endpoint
388 as this one.
389
390 """
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500391 return await Connection.connect(**self.connect_params())
392
393 def connect_params(self):
394 """Return a tuple of parameters suitable for passing to
395 Connection.connect that can be used to make a new connection
396 to the same controller (and model if specified. The first
397 element in the returned tuple holds the endpoint argument;
398 the other holds a dict of the keyword args.
399 """
400 return {
401 'endpoint': self.endpoint,
402 'uuid': self.uuid,
403 'username': self.username,
404 'password': self.password,
405 'cacert': self.cacert,
406 'bakery_client': self.bakery_client,
407 'loop': self.loop,
408 'max_frame_size': self.max_frame_size,
409 }
Adam Israeldcdf82b2017-08-15 15:26:43 -0400410
411 async def controller(self):
412 """Return a Connection to the controller at self.endpoint
Adam Israeldcdf82b2017-08-15 15:26:43 -0400413 """
414 return await Connection.connect(
415 self.endpoint,
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500416 username=self.username,
417 password=self.password,
418 cacert=self.cacert,
419 bakery_client=self.bakery_client,
420 loop=self.loop,
421 max_frame_size=self.max_frame_size,
Adam Israeldcdf82b2017-08-15 15:26:43 -0400422 )
423
Adam Israeldcdf82b2017-08-15 15:26:43 -0400424 async def reconnect(self):
425 """ Force a reconnection.
426 """
427 monitor = self.monitor
428 if monitor.reconnecting.locked() or monitor.close_called.is_set():
429 return
430 async with monitor.reconnecting:
431 await self.close()
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500432 await self._connect_with_login([(self.endpoint, self.cacert)])
Adam Israeldcdf82b2017-08-15 15:26:43 -0400433
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500434 async def _connect(self, endpoints):
435 if len(endpoints) == 0:
436 raise errors.JujuConnectionError('no endpoints to connect to')
437
438 async def _try_endpoint(endpoint, cacert, delay):
439 if delay:
440 await asyncio.sleep(delay)
441 return await self._open(endpoint, cacert)
442
443 # Try all endpoints in parallel, with slight increasing delay (+100ms
444 # for each subsequent endpoint); the delay allows us to prefer the
445 # earlier endpoints over the latter. Use first successful connection.
446 tasks = [self.loop.create_task(_try_endpoint(endpoint, cacert,
447 0.1 * i))
448 for i, (endpoint, cacert) in enumerate(endpoints)]
449 for task in asyncio.as_completed(tasks, loop=self.loop):
450 try:
451 result = await task
Adam Israeldcdf82b2017-08-15 15:26:43 -0400452 break
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500453 except ConnectionError:
454 continue # ignore; try another endpoint
Adam Israeldcdf82b2017-08-15 15:26:43 -0400455 else:
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500456 raise errors.JujuConnectionError(
457 'Unable to connect to any endpoint: {}'.format(', '.join([
458 endpoint for endpoint, cacert in endpoints])))
459 for task in tasks:
460 task.cancel()
461 self.ws = result[0]
462 self.addr = result[1]
463 self.endpoint = result[2]
464 self.cacert = result[3]
465 self._receiver_task.start()
466 log.info("Driver connected to juju %s", self.addr)
467 self.monitor.close_called.clear()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400468
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500469 async def _connect_with_login(self, endpoints):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400470 """Connect to the websocket.
471
472 If uuid is None, the connection will be to the controller. Otherwise it
473 will be to the model.
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500474 :return: The response field of login response JSON object.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400475 """
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500476 success = False
477 try:
478 await self._connect(endpoints)
479 # It's possible that we may get several discharge-required errors,
480 # corresponding to different levels of authentication, so retry
481 # a few times.
482 for i in range(0, 2):
483 result = (await self.login())['response']
484 macaroonJSON = result.get('discharge-required')
485 if macaroonJSON is None:
486 self.info = result
487 success = True
488 return result
489 macaroon = bakery.Macaroon.from_dict(macaroonJSON)
490 self.bakery_client.handle_error(
491 httpbakery.Error(
492 code=httpbakery.ERR_DISCHARGE_REQUIRED,
493 message=result.get('discharge-required-error'),
494 version=macaroon.version,
495 info=httpbakery.ErrorInfo(
496 macaroon=macaroon,
497 macaroon_path=result.get('macaroon-path'),
498 ),
499 ),
500 # note: remove the port number.
501 'https://' + self.endpoint + '/',
502 )
503 raise errors.JujuAuthError('failed to authenticate '
504 'after several attempts')
505 finally:
506 if not success:
507 await self.close()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400508
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500509 async def _connect_with_redirect(self, endpoints):
510 try:
511 login_result = await self._connect_with_login(endpoints)
512 except errors.JujuRedirectException as e:
513 login_result = await self._connect_with_login(e.endpoints)
514 self._build_facades(login_result.get('facades', {}))
515 self._pinger_task.start()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400516
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500517 def _build_facades(self, facades):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400518 self.facades.clear()
519 for facade in facades:
520 self.facades[facade['name']] = facade['versions'][-1]
521
522 async def login(self):
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500523 params = {}
Adam Israelb0943662018-08-02 15:32:00 -0400524 params['auth-tag'] = self.usertag
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500525 if self.password:
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500526 params['credentials'] = self.password
527 else:
528 macaroons = _macaroons_for_domain(self.bakery_client.cookies,
529 self.endpoint)
530 params['macaroons'] = [[bakery.macaroon_to_dict(m) for m in ms]
531 for ms in macaroons]
Adam Israeldcdf82b2017-08-15 15:26:43 -0400532
Adam Israeldcdf82b2017-08-15 15:26:43 -0400533 try:
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500534 return await self.rpc({
535 "type": "Admin",
536 "request": "Login",
537 "version": 3,
538 "params": params,
539 })
540 except errors.JujuAPIError as e:
541 if e.error_code != 'redirection required':
542 raise
543 log.info('Controller requested redirect')
544 # Fetch additional redirection information now so that
545 # we can safely close the connection after login
546 # fails.
547 redirect_info = (await self.rpc({
Adam Israeldcdf82b2017-08-15 15:26:43 -0400548 "type": "Admin",
549 "request": "RedirectInfo",
550 "version": 3,
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500551 }))['response']
552 raise errors.JujuRedirectException(redirect_info) from e
Adam Israeldcdf82b2017-08-15 15:26:43 -0400553
554
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500555class _Task:
556 def __init__(self, task, loop):
557 self.stopped = asyncio.Event(loop=loop)
558 self.stopped.set()
559 self.task = task
560 self.loop = loop
Adam Israeldcdf82b2017-08-15 15:26:43 -0400561
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500562 def start(self):
563 async def run():
Adam Israeldcdf82b2017-08-15 15:26:43 -0400564 try:
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500565 return await(self.task())
566 finally:
567 self.stopped.set()
568 self.stopped.clear()
569 self.loop.create_task(run())
Adam Israeldcdf82b2017-08-15 15:26:43 -0400570
Adam Israeldcdf82b2017-08-15 15:26:43 -0400571
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500572def _macaroons_for_domain(cookies, domain):
573 '''Return any macaroons from the given cookie jar that
574 apply to the given domain name.'''
575 req = urllib.request.Request('https://' + domain + '/')
576 cookies.add_cookie_header(req)
577 return httpbakery.extract_macaroons(req)