blob: f2150b7dc44481b6e43b0ed844dd2c9b1ea16a91 [file] [log] [blame]
Adam Israelb8a82812019-03-27 14:50:11 -04001import asyncio
Adam Israeldcdf82b2017-08-15 15:26:43 -04002import base64
Adam Israeldcdf82b2017-08-15 15:26:43 -04003import json
4import logging
Adam Israeldcdf82b2017-08-15 15:26:43 -04005import ssl
Adam Israelb8a82812019-03-27 14:50:11 -04006import urllib.request
Adam Israeldcdf82b2017-08-15 15:26:43 -04007import weakref
Adam Israeldcdf82b2017-08-15 15:26:43 -04008from concurrent.futures import CancelledError
9from http.client import HTTPSConnection
Adam Israeldcdf82b2017-08-15 15:26:43 -040010
Adam Israelb8a82812019-03-27 14:50:11 -040011import macaroonbakery.httpbakery as httpbakery
12import macaroonbakery.bakery as bakery
13import websockets
14from juju import errors, tag, utils
Adam Israeldcdf82b2017-08-15 15:26:43 -040015from juju.client import client
Adam Israeldcdf82b2017-08-15 15:26:43 -040016from juju.utils import IdQueue
17
Adam Israelb8a82812019-03-27 14:50:11 -040018log = logging.getLogger('juju.client.connection')
Adam Israeldcdf82b2017-08-15 15:26:43 -040019
20
21class Monitor:
22 """
23 Monitor helper class for our Connection class.
24
25 Contains a reference to an instantiated Connection, along with a
Adam Israelb8a82812019-03-27 14:50:11 -040026 reference to the Connection.receiver Future. Upon inspection of
Adam Israeldcdf82b2017-08-15 15:26:43 -040027 these objects, this class determines whether the connection is in
28 an 'error', 'connected' or 'disconnected' state.
29
30 Use this class to stay up to date on the health of a connection,
31 and take appropriate action if the connection errors out due to
32 network issues or other unexpected circumstances.
33
34 """
35 ERROR = 'error'
36 CONNECTED = 'connected'
37 DISCONNECTING = 'disconnecting'
38 DISCONNECTED = 'disconnected'
39
40 def __init__(self, connection):
41 self.connection = weakref.ref(connection)
42 self.reconnecting = asyncio.Lock(loop=connection.loop)
43 self.close_called = asyncio.Event(loop=connection.loop)
Adam Israeldcdf82b2017-08-15 15:26:43 -040044
45 @property
46 def status(self):
47 """
48 Determine the status of the connection and receiver, and return
49 ERROR, CONNECTED, or DISCONNECTED as appropriate.
50
51 For simplicity, we only consider ourselves to be connected
52 after the Connection class has setup a receiver task. This
53 only happens after the websocket is open, and the connection
54 isn't usable until that receiver has been started.
55
56 """
57 connection = self.connection()
58
59 # the connection instance was destroyed but someone kept
60 # a separate reference to the monitor for some reason
61 if not connection:
62 return self.DISCONNECTED
63
64 # connection cleanly disconnected or not yet opened
65 if not connection.ws:
66 return self.DISCONNECTED
67
68 # close called but not yet complete
69 if self.close_called.is_set():
70 return self.DISCONNECTING
71
72 # connection closed uncleanly (we didn't call connection.close)
Adam Israelb8a82812019-03-27 14:50:11 -040073 stopped = connection._receiver_task.stopped.is_set()
74 if stopped or not connection.ws.open:
Adam Israeldcdf82b2017-08-15 15:26:43 -040075 return self.ERROR
76
77 # everything is fine!
78 return self.CONNECTED
79
80
81class Connection:
82 """
83 Usage::
84
85 # Connect to an arbitrary api server
86 client = await Connection.connect(
87 api_endpoint, model_uuid, username, password, cacert)
88
Adam Israeldcdf82b2017-08-15 15:26:43 -040089 Note: Any connection method or constructor can accept an optional `loop`
90 argument to override the default event loop from `asyncio.get_event_loop`.
91 """
92
Adam Israeldcdf82b2017-08-15 15:26:43 -040093 MAX_FRAME_SIZE = 2**22
94 "Maximum size for a single frame. Defaults to 4MB."
95
Adam Israelb8a82812019-03-27 14:50:11 -040096 @classmethod
97 async def connect(
98 cls,
99 endpoint=None,
100 uuid=None,
101 username=None,
102 password=None,
103 cacert=None,
104 bakery_client=None,
105 loop=None,
106 max_frame_size=None,
107 retries=3,
108 retry_backoff=10,
109 ):
110 """Connect to the websocket.
111
112 If uuid is None, the connection will be to the controller. Otherwise it
113 will be to the model.
114
115 :param str endpoint: The hostname:port of the controller to connect to.
116 :param str uuid: The model UUID to connect to (None for a
117 controller-only connection).
118 :param str username: The username for controller-local users (or None
119 to use macaroon-based login.)
120 :param str password: The password for controller-local users.
121 :param str cacert: The CA certificate of the controller
122 (PEM formatted).
123 :param httpbakery.Client bakery_client: The macaroon bakery client to
124 to use when performing macaroon-based login. Macaroon tokens
125 acquired when logging will be saved to bakery_client.cookies.
126 If this is None, a default bakery_client will be used.
127 :param asyncio.BaseEventLoop loop: The event loop to use for async
128 operations.
129 :param int max_frame_size: The maximum websocket frame size to allow.
130 :param int retries: When connecting or reconnecting, and all endpoints
131 fail, how many times to retry the connection before giving up.
132 :param int retry_backoff: Number of seconds to increase the wait
133 between connection retry attempts (a backoff of 10 with 3 retries
134 would wait 10s, 20s, and 30s).
135 """
136 self = cls()
137 if endpoint is None:
138 raise ValueError('no endpoint provided')
Adam Israeldcdf82b2017-08-15 15:26:43 -0400139 self.uuid = uuid
Adam Israelb8a82812019-03-27 14:50:11 -0400140 if bakery_client is None:
141 bakery_client = httpbakery.Client()
142 self.bakery_client = bakery_client
143 if username and '@' in username and not username.endswith('@local'):
144 # We're trying to log in as an external user - we need to use
145 # macaroon authentication with no username or password.
146 if password is not None:
147 raise errors.JujuAuthError('cannot log in as external '
148 'user with a password')
149 username = None
150 self.usertag = tag.user(username)
151 self.password = password
Adam Israeldcdf82b2017-08-15 15:26:43 -0400152 self.loop = loop or asyncio.get_event_loop()
153
154 self.__request_id__ = 0
Adam Israelb8a82812019-03-27 14:50:11 -0400155
156 # The following instance variables are initialized by the
157 # _connect_with_redirect method, but create them here
158 # as a reminder that they will exist.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400159 self.addr = None
160 self.ws = None
Adam Israelb8a82812019-03-27 14:50:11 -0400161 self.endpoint = None
162 self.cacert = None
163 self.info = None
164
165 # Create that _Task objects but don't start the tasks yet.
166 self._pinger_task = _Task(self._pinger, self.loop)
167 self._receiver_task = _Task(self._receiver, self.loop)
168
169 self._retries = retries
170 self._retry_backoff = retry_backoff
171
Adam Israeldcdf82b2017-08-15 15:26:43 -0400172 self.facades = {}
173 self.messages = IdQueue(loop=self.loop)
174 self.monitor = Monitor(connection=self)
Adam Israelb8a82812019-03-27 14:50:11 -0400175 if max_frame_size is None:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400176 max_frame_size = self.MAX_FRAME_SIZE
177 self.max_frame_size = max_frame_size
Adam Israelb8a82812019-03-27 14:50:11 -0400178 await self._connect_with_redirect([(endpoint, cacert)])
179 return self
180
181 @property
182 def username(self):
183 if not self.usertag:
184 return None
185 return self.usertag[len('user-'):]
Adam Israeldcdf82b2017-08-15 15:26:43 -0400186
187 @property
188 def is_open(self):
189 return self.monitor.status == Monitor.CONNECTED
190
191 def _get_ssl(self, cert=None):
192 return ssl.create_default_context(
193 purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
194
Adam Israelb8a82812019-03-27 14:50:11 -0400195 async def _open(self, endpoint, cacert):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400196 if self.uuid:
Adam Israelb8a82812019-03-27 14:50:11 -0400197 url = "wss://{}/model/{}/api".format(endpoint, self.uuid)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400198 else:
Adam Israelb8a82812019-03-27 14:50:11 -0400199 url = "wss://{}/api".format(endpoint)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400200
Adam Israelb8a82812019-03-27 14:50:11 -0400201 return (await websockets.connect(
202 url,
203 ssl=self._get_ssl(cacert),
204 loop=self.loop,
205 max_size=self.max_frame_size,
206 ), url, endpoint, cacert)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400207
208 async def close(self):
209 if not self.ws:
210 return
211 self.monitor.close_called.set()
Adam Israelb8a82812019-03-27 14:50:11 -0400212 await self._pinger_task.stopped.wait()
213 await self._receiver_task.stopped.wait()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400214 await self.ws.close()
215 self.ws = None
216
Adam Israelb8a82812019-03-27 14:50:11 -0400217 async def _recv(self, request_id):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400218 if not self.is_open:
219 raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
220 return await self.messages.get(request_id)
221
Adam Israelb8a82812019-03-27 14:50:11 -0400222 async def _receiver(self):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400223 try:
224 while self.is_open:
225 result = await utils.run_with_interrupt(
226 self.ws.recv(),
227 self.monitor.close_called,
228 loop=self.loop)
229 if self.monitor.close_called.is_set():
230 break
231 if result is not None:
232 result = json.loads(result)
233 await self.messages.put(result['request-id'], result)
234 except CancelledError:
235 pass
236 except websockets.ConnectionClosed as e:
237 log.warning('Receiver: Connection closed, reconnecting')
238 await self.messages.put_all(e)
239 # the reconnect has to be done as a task because the receiver will
240 # be cancelled by the reconnect and we don't want the reconnect
241 # to be aborted half-way through
242 self.loop.create_task(self.reconnect())
243 return
244 except Exception as e:
245 log.exception("Error in receiver")
246 # make pending listeners aware of the error
247 await self.messages.put_all(e)
248 raise
Adam Israeldcdf82b2017-08-15 15:26:43 -0400249
Adam Israelb8a82812019-03-27 14:50:11 -0400250 async def _pinger(self):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400251 '''
252 A Controller can time us out if we are silent for too long. This
253 is especially true in JaaS, which has a fairly strict timeout.
254
255 To prevent timing out, we send a ping every ten seconds.
256
257 '''
258 async def _do_ping():
259 try:
260 await pinger_facade.Ping()
261 await asyncio.sleep(10, loop=self.loop)
262 except CancelledError:
263 pass
264
265 pinger_facade = client.PingerFacade.from_connection(self)
266 try:
267 while True:
268 await utils.run_with_interrupt(
269 _do_ping(),
270 self.monitor.close_called,
271 loop=self.loop)
272 if self.monitor.close_called.is_set():
273 break
Adam Israelb8a82812019-03-27 14:50:11 -0400274 except websockets.exceptions.ConnectionClosed:
275 # The connection has closed - we can't do anything
276 # more until the connection is restarted.
277 log.debug('ping failed because of closed connection')
278 pass
Adam Israeldcdf82b2017-08-15 15:26:43 -0400279
280 async def rpc(self, msg, encoder=None):
Adam Israelb8a82812019-03-27 14:50:11 -0400281 '''Make an RPC to the API. The message is encoded as JSON
282 using the given encoder if any.
283 :param msg: Parameters for the call (will be encoded as JSON).
284 :param encoder: Encoder to be used when encoding the message.
285 :return: The result of the call.
286 :raises JujuAPIError: When there's an error returned.
287 :raises JujuError:
288 '''
Adam Israeldcdf82b2017-08-15 15:26:43 -0400289 self.__request_id__ += 1
290 msg['request-id'] = self.__request_id__
291 if'params' not in msg:
292 msg['params'] = {}
293 if "version" not in msg:
294 msg['version'] = self.facades[msg['type']]
295 outgoing = json.dumps(msg, indent=2, cls=encoder)
Adam Israelb8a82812019-03-27 14:50:11 -0400296 log.debug('connection {} -> {}'.format(id(self), outgoing))
Adam Israeldcdf82b2017-08-15 15:26:43 -0400297 for attempt in range(3):
Adam Israelb8a82812019-03-27 14:50:11 -0400298 if self.monitor.status == Monitor.DISCONNECTED:
299 # closed cleanly; shouldn't try to reconnect
300 raise websockets.exceptions.ConnectionClosed(
301 0, 'websocket closed')
Adam Israeldcdf82b2017-08-15 15:26:43 -0400302 try:
303 await self.ws.send(outgoing)
304 break
305 except websockets.ConnectionClosed:
306 if attempt == 2:
307 raise
308 log.warning('RPC: Connection closed, reconnecting')
309 # the reconnect has to be done in a separate task because,
310 # if it is triggered by the pinger, then this RPC call will
311 # be cancelled when the pinger is cancelled by the reconnect,
312 # and we don't want the reconnect to be aborted halfway through
313 await asyncio.wait([self.reconnect()], loop=self.loop)
Adam Israelb8a82812019-03-27 14:50:11 -0400314 if self.monitor.status != Monitor.CONNECTED:
315 # reconnect failed; abort and shutdown
316 log.error('RPC: Automatic reconnect failed')
317 raise
318 result = await self._recv(msg['request-id'])
319 log.debug('connection {} <- {}'.format(id(self), result))
Adam Israeldcdf82b2017-08-15 15:26:43 -0400320
321 if not result:
322 return result
323
324 if 'error' in result:
325 # API Error Response
Adam Israelb8a82812019-03-27 14:50:11 -0400326 raise errors.JujuAPIError(result)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400327
328 if 'response' not in result:
329 # This may never happen
330 return result
331
332 if 'results' in result['response']:
333 # Check for errors in a result list.
Adam Israelb8a82812019-03-27 14:50:11 -0400334 # TODO This loses the results that might have succeeded.
335 # Perhaps JujuError should return all the results including
336 # errors, or perhaps a keyword parameter to the rpc method
337 # could be added to trigger this behaviour.
338 err_results = []
Adam Israeldcdf82b2017-08-15 15:26:43 -0400339 for res in result['response']['results']:
340 if res.get('error', {}).get('message'):
Adam Israelb8a82812019-03-27 14:50:11 -0400341 err_results.append(res['error']['message'])
342 if err_results:
343 raise errors.JujuError(err_results)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400344
345 elif result['response'].get('error', {}).get('message'):
Adam Israelb8a82812019-03-27 14:50:11 -0400346 raise errors.JujuError(result['response']['error']['message'])
Adam Israeldcdf82b2017-08-15 15:26:43 -0400347
348 return result
349
Adam Israelb8a82812019-03-27 14:50:11 -0400350 def _http_headers(self):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400351 """Return dictionary of http headers necessary for making an http
352 connection to the endpoint of this Connection.
353
354 :return: Dictionary of headers
355
356 """
Adam Israelb8a82812019-03-27 14:50:11 -0400357 if not self.usertag:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400358 return {}
359
360 creds = u'{}:{}'.format(
Adam Israelb8a82812019-03-27 14:50:11 -0400361 self.usertag,
Adam Israeldcdf82b2017-08-15 15:26:43 -0400362 self.password or ''
363 )
364 token = base64.b64encode(creds.encode())
365 return {
366 'Authorization': 'Basic {}'.format(token.decode())
367 }
368
369 def https_connection(self):
370 """Return an https connection to this Connection's endpoint.
371
372 Returns a 3-tuple containing::
373
374 1. The :class:`HTTPSConnection` instance
375 2. Dictionary of auth headers to be used with the connection
376 3. The root url path (str) to be used for requests.
377
378 """
379 endpoint = self.endpoint
380 host, remainder = endpoint.split(':', 1)
381 port = remainder
382 if '/' in remainder:
383 port, _ = remainder.split('/', 1)
384
385 conn = HTTPSConnection(
386 host, int(port),
387 context=self._get_ssl(self.cacert),
388 )
389
390 path = (
391 "/model/{}".format(self.uuid)
392 if self.uuid else ""
393 )
Adam Israelb8a82812019-03-27 14:50:11 -0400394 return conn, self._http_headers(), path
Adam Israeldcdf82b2017-08-15 15:26:43 -0400395
396 async def clone(self):
397 """Return a new Connection, connected to the same websocket endpoint
398 as this one.
399
400 """
Adam Israelb8a82812019-03-27 14:50:11 -0400401 return await Connection.connect(**self.connect_params())
402
403 def connect_params(self):
404 """Return a tuple of parameters suitable for passing to
405 Connection.connect that can be used to make a new connection
406 to the same controller (and model if specified. The first
407 element in the returned tuple holds the endpoint argument;
408 the other holds a dict of the keyword args.
409 """
410 return {
411 'endpoint': self.endpoint,
412 'uuid': self.uuid,
413 'username': self.username,
414 'password': self.password,
415 'cacert': self.cacert,
416 'bakery_client': self.bakery_client,
417 'loop': self.loop,
418 'max_frame_size': self.max_frame_size,
419 }
Adam Israeldcdf82b2017-08-15 15:26:43 -0400420
421 async def controller(self):
422 """Return a Connection to the controller at self.endpoint
Adam Israeldcdf82b2017-08-15 15:26:43 -0400423 """
424 return await Connection.connect(
425 self.endpoint,
Adam Israelb8a82812019-03-27 14:50:11 -0400426 username=self.username,
427 password=self.password,
428 cacert=self.cacert,
429 bakery_client=self.bakery_client,
430 loop=self.loop,
431 max_frame_size=self.max_frame_size,
Adam Israeldcdf82b2017-08-15 15:26:43 -0400432 )
433
Adam Israeldcdf82b2017-08-15 15:26:43 -0400434 async def reconnect(self):
435 """ Force a reconnection.
436 """
437 monitor = self.monitor
438 if monitor.reconnecting.locked() or monitor.close_called.is_set():
439 return
440 async with monitor.reconnecting:
441 await self.close()
Adam Israelb8a82812019-03-27 14:50:11 -0400442 await self._connect_with_login([(self.endpoint, self.cacert)])
Adam Israeldcdf82b2017-08-15 15:26:43 -0400443
Adam Israelb8a82812019-03-27 14:50:11 -0400444 async def _connect(self, endpoints):
445 if len(endpoints) == 0:
446 raise errors.JujuConnectionError('no endpoints to connect to')
Adam Israeldcdf82b2017-08-15 15:26:43 -0400447
Adam Israelb8a82812019-03-27 14:50:11 -0400448 async def _try_endpoint(endpoint, cacert, delay):
449 if delay:
450 await asyncio.sleep(delay)
451 return await self._open(endpoint, cacert)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400452
Adam Israelb8a82812019-03-27 14:50:11 -0400453 # Try all endpoints in parallel, with slight increasing delay (+100ms
454 # for each subsequent endpoint); the delay allows us to prefer the
455 # earlier endpoints over the latter. Use first successful connection.
456 tasks = [self.loop.create_task(_try_endpoint(endpoint, cacert,
457 0.1 * i))
458 for i, (endpoint, cacert) in enumerate(endpoints)]
459 for attempt in range(self._retries + 1):
460 for task in asyncio.as_completed(tasks, loop=self.loop):
461 try:
462 result = await task
463 break
464 except ConnectionError:
465 continue # ignore; try another endpoint
466 else:
467 _endpoints_str = ', '.join([endpoint
468 for endpoint, cacert in endpoints])
469 if attempt < self._retries:
470 log.debug('Retrying connection to endpoints: {}; '
471 'attempt {} of {}'.format(_endpoints_str,
472 attempt + 1,
473 self._retries + 1))
474 await asyncio.sleep((attempt + 1) * self._retry_backoff)
475 continue
476 else:
477 raise errors.JujuConnectionError(
478 'Unable to connect to any endpoint: '
479 '{}'.format(_endpoints_str))
480 # only executed if inner loop's else did not continue
481 # (i.e., inner loop did break due to successful connection)
482 break
483 for task in tasks:
484 task.cancel()
485 self.ws = result[0]
486 self.addr = result[1]
487 self.endpoint = result[2]
488 self.cacert = result[3]
489 self._receiver_task.start()
490 log.debug("Driver connected to juju %s", self.addr)
491 self.monitor.close_called.clear()
492
493 async def _connect_with_login(self, endpoints):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400494 """Connect to the websocket.
495
496 If uuid is None, the connection will be to the controller. Otherwise it
497 will be to the model.
Adam Israelb8a82812019-03-27 14:50:11 -0400498 :return: The response field of login response JSON object.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400499 """
Adam Israelb8a82812019-03-27 14:50:11 -0400500 success = False
501 try:
502 await self._connect(endpoints)
503 # It's possible that we may get several discharge-required errors,
504 # corresponding to different levels of authentication, so retry
505 # a few times.
506 for i in range(0, 2):
507 result = (await self.login())['response']
508 macaroonJSON = result.get('discharge-required')
509 if macaroonJSON is None:
510 self.info = result
511 success = True
512 return result
513 macaroon = bakery.Macaroon.from_dict(macaroonJSON)
514 self.bakery_client.handle_error(
515 httpbakery.Error(
516 code=httpbakery.ERR_DISCHARGE_REQUIRED,
517 message=result.get('discharge-required-error'),
518 version=macaroon.version,
519 info=httpbakery.ErrorInfo(
520 macaroon=macaroon,
521 macaroon_path=result.get('macaroon-path'),
522 ),
523 ),
524 # note: remove the port number.
525 'https://' + self.endpoint + '/',
526 )
527 raise errors.JujuAuthError('failed to authenticate '
528 'after several attempts')
529 finally:
530 if not success:
531 await self.close()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400532
Adam Israelb8a82812019-03-27 14:50:11 -0400533 async def _connect_with_redirect(self, endpoints):
534 try:
535 login_result = await self._connect_with_login(endpoints)
536 except errors.JujuRedirectException as e:
537 login_result = await self._connect_with_login(e.endpoints)
538 self._build_facades(login_result.get('facades', {}))
539 self._pinger_task.start()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400540
Adam Israelb8a82812019-03-27 14:50:11 -0400541 def _build_facades(self, facades):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400542 self.facades.clear()
543 for facade in facades:
544 self.facades[facade['name']] = facade['versions'][-1]
545
546 async def login(self):
Adam Israelb8a82812019-03-27 14:50:11 -0400547 params = {}
548 params['auth-tag'] = self.usertag
549 if self.password:
550 params['credentials'] = self.password
551 else:
552 macaroons = _macaroons_for_domain(self.bakery_client.cookies,
553 self.endpoint)
554 params['macaroons'] = [[bakery.macaroon_to_dict(m) for m in ms]
555 for ms in macaroons]
Adam Israeldcdf82b2017-08-15 15:26:43 -0400556
Adam Israeldcdf82b2017-08-15 15:26:43 -0400557 try:
Adam Israelb8a82812019-03-27 14:50:11 -0400558 return await self.rpc({
559 "type": "Admin",
560 "request": "Login",
561 "version": 3,
562 "params": params,
563 })
564 except errors.JujuAPIError as e:
565 if e.error_code != 'redirection required':
566 raise
567 log.info('Controller requested redirect')
568 # Fetch additional redirection information now so that
569 # we can safely close the connection after login
570 # fails.
571 redirect_info = (await self.rpc({
Adam Israeldcdf82b2017-08-15 15:26:43 -0400572 "type": "Admin",
573 "request": "RedirectInfo",
574 "version": 3,
Adam Israelb8a82812019-03-27 14:50:11 -0400575 }))['response']
576 raise errors.JujuRedirectException(redirect_info) from e
Adam Israeldcdf82b2017-08-15 15:26:43 -0400577
578
Adam Israelb8a82812019-03-27 14:50:11 -0400579class _Task:
580 def __init__(self, task, loop):
581 self.stopped = asyncio.Event(loop=loop)
582 self.stopped.set()
583 self.task = task
584 self.loop = loop
Adam Israeldcdf82b2017-08-15 15:26:43 -0400585
Adam Israelb8a82812019-03-27 14:50:11 -0400586 def start(self):
587 async def run():
Adam Israeldcdf82b2017-08-15 15:26:43 -0400588 try:
Adam Israelb8a82812019-03-27 14:50:11 -0400589 return await self.task()
590 finally:
591 self.stopped.set()
592 self.stopped.clear()
593 self.loop.create_task(run())
Adam Israeldcdf82b2017-08-15 15:26:43 -0400594
Adam Israeldcdf82b2017-08-15 15:26:43 -0400595
Adam Israelb8a82812019-03-27 14:50:11 -0400596def _macaroons_for_domain(cookies, domain):
597 '''Return any macaroons from the given cookie jar that
598 apply to the given domain name.'''
599 req = urllib.request.Request('https://' + domain + '/')
600 cookies.add_cookie_header(req)
601 return httpbakery.extract_macaroons(req)