8 from concurrent
.futures
import CancelledError
9 from http
.client
import HTTPSConnection
11 import macaroonbakery
.httpbakery
as httpbakery
12 import macaroonbakery
.bakery
as bakery
14 from juju
import errors
, tag
, utils
15 from juju
.client
import client
16 from juju
.utils
import IdQueue
18 log
= logging
.getLogger('juju.client.connection')
23 Monitor helper class for our Connection class.
25 Contains a reference to an instantiated Connection, along with a
26 reference to the Connection.receiver Future. Upon inspection of
27 these objects, this class determines whether the connection is in
28 an 'error', 'connected' or 'disconnected' state.
30 Use this class to stay up to date on the health of a connection,
31 and take appropriate action if the connection errors out due to
32 network issues or other unexpected circumstances.
36 CONNECTED
= 'connected'
37 DISCONNECTING
= 'disconnecting'
38 DISCONNECTED
= 'disconnected'
40 def __init__(self
, connection
):
41 self
.connection
= weakref
.ref(connection
)
42 self
.reconnecting
= asyncio
.Lock(loop
=connection
.loop
)
43 self
.close_called
= asyncio
.Event(loop
=connection
.loop
)
48 Determine the status of the connection and receiver, and return
49 ERROR, CONNECTED, or DISCONNECTED as appropriate.
51 For simplicity, we only consider ourselves to be connected
52 after the Connection class has setup a receiver task. This
53 only happens after the websocket is open, and the connection
54 isn't usable until that receiver has been started.
57 connection
= self
.connection()
59 # the connection instance was destroyed but someone kept
60 # a separate reference to the monitor for some reason
62 return self
.DISCONNECTED
64 # connection cleanly disconnected or not yet opened
66 return self
.DISCONNECTED
68 # close called but not yet complete
69 if self
.close_called
.is_set():
70 return self
.DISCONNECTING
72 # connection closed uncleanly (we didn't call connection.close)
73 stopped
= connection
._receiver
_task
.stopped
.is_set()
74 if stopped
or not connection
.ws
.open:
85 # Connect to an arbitrary api server
86 client = await Connection.connect(
87 api_endpoint, model_uuid, username, password, cacert)
89 Note: Any connection method or constructor can accept an optional `loop`
90 argument to override the default event loop from `asyncio.get_event_loop`.
93 MAX_FRAME_SIZE
= 2**22
94 "Maximum size for a single frame. Defaults to 4MB."
110 """Connect to the websocket.
112 If uuid is None, the connection will be to the controller. Otherwise it
113 will be to the model.
115 :param str endpoint: The hostname:port of the controller to connect to.
116 :param str uuid: The model UUID to connect to (None for a
117 controller-only connection).
118 :param str username: The username for controller-local users (or None
119 to use macaroon-based login.)
120 :param str password: The password for controller-local users.
121 :param str cacert: The CA certificate of the controller
123 :param httpbakery.Client bakery_client: The macaroon bakery client to
124 to use when performing macaroon-based login. Macaroon tokens
125 acquired when logging will be saved to bakery_client.cookies.
126 If this is None, a default bakery_client will be used.
127 :param asyncio.BaseEventLoop loop: The event loop to use for async
129 :param int max_frame_size: The maximum websocket frame size to allow.
130 :param int retries: When connecting or reconnecting, and all endpoints
131 fail, how many times to retry the connection before giving up.
132 :param int retry_backoff: Number of seconds to increase the wait
133 between connection retry attempts (a backoff of 10 with 3 retries
134 would wait 10s, 20s, and 30s).
138 raise ValueError('no endpoint provided')
140 if bakery_client
is None:
141 bakery_client
= httpbakery
.Client()
142 self
.bakery_client
= bakery_client
143 if username
and '@' in username
and not username
.endswith('@local'):
144 # We're trying to log in as an external user - we need to use
145 # macaroon authentication with no username or password.
146 if password
is not None:
147 raise errors
.JujuAuthError('cannot log in as external '
148 'user with a password')
150 self
.usertag
= tag
.user(username
)
151 self
.password
= password
152 self
.loop
= loop
or asyncio
.get_event_loop()
154 self
.__request
_id
__ = 0
156 # The following instance variables are initialized by the
157 # _connect_with_redirect method, but create them here
158 # as a reminder that they will exist.
165 # Create that _Task objects but don't start the tasks yet.
166 self
._pinger
_task
= _Task(self
._pinger
, self
.loop
)
167 self
._receiver
_task
= _Task(self
._receiver
, self
.loop
)
169 self
._retries
= retries
170 self
._retry
_backoff
= retry_backoff
173 self
.messages
= IdQueue(loop
=self
.loop
)
174 self
.monitor
= Monitor(connection
=self
)
175 if max_frame_size
is None:
176 max_frame_size
= self
.MAX_FRAME_SIZE
177 self
.max_frame_size
= max_frame_size
178 await self
._connect
_with
_redirect
([(endpoint
, cacert
)])
185 return self
.usertag
[len('user-'):]
189 return self
.monitor
.status
== Monitor
.CONNECTED
191 def _get_ssl(self
, cert
=None):
192 return ssl
.create_default_context(
193 purpose
=ssl
.Purpose
.CLIENT_AUTH
, cadata
=cert
)
195 async def _open(self
, endpoint
, cacert
):
197 url
= "wss://{}/model/{}/api".format(endpoint
, self
.uuid
)
199 url
= "wss://{}/api".format(endpoint
)
201 return (await websockets
.connect(
203 ssl
=self
._get
_ssl
(cacert
),
205 max_size
=self
.max_frame_size
,
206 ), url
, endpoint
, cacert
)
208 async def close(self
):
211 self
.monitor
.close_called
.set()
212 await self
._pinger
_task
.stopped
.wait()
213 await self
._receiver
_task
.stopped
.wait()
214 await self
.ws
.close()
217 async def _recv(self
, request_id
):
219 raise websockets
.exceptions
.ConnectionClosed(0, 'websocket closed')
220 return await self
.messages
.get(request_id
)
222 async def _receiver(self
):
225 result
= await utils
.run_with_interrupt(
227 self
.monitor
.close_called
,
229 if self
.monitor
.close_called
.is_set():
231 if result
is not None:
232 result
= json
.loads(result
)
233 await self
.messages
.put(result
['request-id'], result
)
234 except CancelledError
:
236 except websockets
.ConnectionClosed
as e
:
237 log
.warning('Receiver: Connection closed, reconnecting')
238 await self
.messages
.put_all(e
)
239 # the reconnect has to be done as a task because the receiver will
240 # be cancelled by the reconnect and we don't want the reconnect
241 # to be aborted half-way through
242 self
.loop
.create_task(self
.reconnect())
244 except Exception as e
:
245 log
.exception("Error in receiver")
246 # make pending listeners aware of the error
247 await self
.messages
.put_all(e
)
250 async def _pinger(self
):
252 A Controller can time us out if we are silent for too long. This
253 is especially true in JaaS, which has a fairly strict timeout.
255 To prevent timing out, we send a ping every ten seconds.
258 async def _do_ping():
260 await pinger_facade
.Ping()
261 await asyncio
.sleep(10, loop
=self
.loop
)
262 except CancelledError
:
265 pinger_facade
= client
.PingerFacade
.from_connection(self
)
268 await utils
.run_with_interrupt(
270 self
.monitor
.close_called
,
272 if self
.monitor
.close_called
.is_set():
274 except websockets
.exceptions
.ConnectionClosed
:
275 # The connection has closed - we can't do anything
276 # more until the connection is restarted.
277 log
.debug('ping failed because of closed connection')
280 async def rpc(self
, msg
, encoder
=None):
281 '''Make an RPC to the API. The message is encoded as JSON
282 using the given encoder if any.
283 :param msg: Parameters for the call (will be encoded as JSON).
284 :param encoder: Encoder to be used when encoding the message.
285 :return: The result of the call.
286 :raises JujuAPIError: When there's an error returned.
289 self
.__request
_id
__ += 1
290 msg
['request-id'] = self
.__request
_id
__
291 if'params' not in msg
:
293 if "version" not in msg
:
294 msg
['version'] = self
.facades
[msg
['type']]
295 outgoing
= json
.dumps(msg
, indent
=2, cls
=encoder
)
296 log
.debug('connection {} -> {}'.format(id(self
), outgoing
))
297 for attempt
in range(3):
298 if self
.monitor
.status
== Monitor
.DISCONNECTED
:
299 # closed cleanly; shouldn't try to reconnect
300 raise websockets
.exceptions
.ConnectionClosed(
301 0, 'websocket closed')
303 await self
.ws
.send(outgoing
)
305 except websockets
.ConnectionClosed
:
308 log
.warning('RPC: Connection closed, reconnecting')
309 # the reconnect has to be done in a separate task because,
310 # if it is triggered by the pinger, then this RPC call will
311 # be cancelled when the pinger is cancelled by the reconnect,
312 # and we don't want the reconnect to be aborted halfway through
313 await asyncio
.wait([self
.reconnect()], loop
=self
.loop
)
314 if self
.monitor
.status
!= Monitor
.CONNECTED
:
315 # reconnect failed; abort and shutdown
316 log
.error('RPC: Automatic reconnect failed')
318 result
= await self
._recv
(msg
['request-id'])
319 log
.debug('connection {} <- {}'.format(id(self
), result
))
324 if 'error' in result
:
326 raise errors
.JujuAPIError(result
)
328 if 'response' not in result
:
329 # This may never happen
332 if 'results' in result
['response']:
333 # Check for errors in a result list.
334 # TODO This loses the results that might have succeeded.
335 # Perhaps JujuError should return all the results including
336 # errors, or perhaps a keyword parameter to the rpc method
337 # could be added to trigger this behaviour.
339 for res
in result
['response']['results']:
340 if res
.get('error', {}).get('message'):
341 err_results
.append(res
['error']['message'])
343 raise errors
.JujuError(err_results
)
345 elif result
['response'].get('error', {}).get('message'):
346 raise errors
.JujuError(result
['response']['error']['message'])
350 def _http_headers(self
):
351 """Return dictionary of http headers necessary for making an http
352 connection to the endpoint of this Connection.
354 :return: Dictionary of headers
360 creds
= u
'{}:{}'.format(
364 token
= base64
.b64encode(creds
.encode())
366 'Authorization': 'Basic {}'.format(token
.decode())
369 def https_connection(self
):
370 """Return an https connection to this Connection's endpoint.
372 Returns a 3-tuple containing::
374 1. The :class:`HTTPSConnection` instance
375 2. Dictionary of auth headers to be used with the connection
376 3. The root url path (str) to be used for requests.
379 endpoint
= self
.endpoint
380 host
, remainder
= endpoint
.split(':', 1)
383 port
, _
= remainder
.split('/', 1)
385 conn
= HTTPSConnection(
387 context
=self
._get
_ssl
(self
.cacert
),
391 "/model/{}".format(self
.uuid
)
394 return conn
, self
._http
_headers
(), path
396 async def clone(self
):
397 """Return a new Connection, connected to the same websocket endpoint
401 return await Connection
.connect(**self
.connect_params())
403 def connect_params(self
):
404 """Return a tuple of parameters suitable for passing to
405 Connection.connect that can be used to make a new connection
406 to the same controller (and model if specified. The first
407 element in the returned tuple holds the endpoint argument;
408 the other holds a dict of the keyword args.
411 'endpoint': self
.endpoint
,
413 'username': self
.username
,
414 'password': self
.password
,
415 'cacert': self
.cacert
,
416 'bakery_client': self
.bakery_client
,
418 'max_frame_size': self
.max_frame_size
,
421 async def controller(self
):
422 """Return a Connection to the controller at self.endpoint
424 return await Connection
.connect(
426 username
=self
.username
,
427 password
=self
.password
,
429 bakery_client
=self
.bakery_client
,
431 max_frame_size
=self
.max_frame_size
,
434 async def reconnect(self
):
435 """ Force a reconnection.
437 monitor
= self
.monitor
438 if monitor
.reconnecting
.locked() or monitor
.close_called
.is_set():
440 async with monitor
.reconnecting
:
442 await self
._connect
_with
_login
([(self
.endpoint
, self
.cacert
)])
444 async def _connect(self
, endpoints
):
445 if len(endpoints
) == 0:
446 raise errors
.JujuConnectionError('no endpoints to connect to')
448 async def _try_endpoint(endpoint
, cacert
, delay
):
450 await asyncio
.sleep(delay
)
451 return await self
._open
(endpoint
, cacert
)
453 # Try all endpoints in parallel, with slight increasing delay (+100ms
454 # for each subsequent endpoint); the delay allows us to prefer the
455 # earlier endpoints over the latter. Use first successful connection.
456 tasks
= [self
.loop
.create_task(_try_endpoint(endpoint
, cacert
,
458 for i
, (endpoint
, cacert
) in enumerate(endpoints
)]
459 for attempt
in range(self
._retries
+ 1):
460 for task
in asyncio
.as_completed(tasks
, loop
=self
.loop
):
464 except ConnectionError
:
465 continue # ignore; try another endpoint
467 _endpoints_str
= ', '.join([endpoint
468 for endpoint
, cacert
in endpoints
])
469 if attempt
< self
._retries
:
470 log
.debug('Retrying connection to endpoints: {}; '
471 'attempt {} of {}'.format(_endpoints_str
,
474 await asyncio
.sleep((attempt
+ 1) * self
._retry
_backoff
)
477 raise errors
.JujuConnectionError(
478 'Unable to connect to any endpoint: '
479 '{}'.format(_endpoints_str
))
480 # only executed if inner loop's else did not continue
481 # (i.e., inner loop did break due to successful connection)
486 self
.addr
= result
[1]
487 self
.endpoint
= result
[2]
488 self
.cacert
= result
[3]
489 self
._receiver
_task
.start()
490 log
.debug("Driver connected to juju %s", self
.addr
)
491 self
.monitor
.close_called
.clear()
493 async def _connect_with_login(self
, endpoints
):
494 """Connect to the websocket.
496 If uuid is None, the connection will be to the controller. Otherwise it
497 will be to the model.
498 :return: The response field of login response JSON object.
502 await self
._connect
(endpoints
)
503 # It's possible that we may get several discharge-required errors,
504 # corresponding to different levels of authentication, so retry
506 for i
in range(0, 2):
507 result
= (await self
.login())['response']
508 macaroonJSON
= result
.get('discharge-required')
509 if macaroonJSON
is None:
513 macaroon
= bakery
.Macaroon
.from_dict(macaroonJSON
)
514 self
.bakery_client
.handle_error(
516 code
=httpbakery
.ERR_DISCHARGE_REQUIRED
,
517 message
=result
.get('discharge-required-error'),
518 version
=macaroon
.version
,
519 info
=httpbakery
.ErrorInfo(
521 macaroon_path
=result
.get('macaroon-path'),
524 # note: remove the port number.
525 'https://' + self
.endpoint
+ '/',
527 raise errors
.JujuAuthError('failed to authenticate '
528 'after several attempts')
533 async def _connect_with_redirect(self
, endpoints
):
535 login_result
= await self
._connect
_with
_login
(endpoints
)
536 except errors
.JujuRedirectException
as e
:
537 login_result
= await self
._connect
_with
_login
(e
.endpoints
)
538 self
._build
_facades
(login_result
.get('facades', {}))
539 self
._pinger
_task
.start()
541 def _build_facades(self
, facades
):
543 for facade
in facades
:
544 self
.facades
[facade
['name']] = facade
['versions'][-1]
546 async def login(self
):
548 params
['auth-tag'] = self
.usertag
550 params
['credentials'] = self
.password
552 macaroons
= _macaroons_for_domain(self
.bakery_client
.cookies
,
554 params
['macaroons'] = [[bakery
.macaroon_to_dict(m
) for m
in ms
]
558 return await self
.rpc({
564 except errors
.JujuAPIError
as e
:
565 if e
.error_code
!= 'redirection required':
567 log
.info('Controller requested redirect')
568 # Fetch additional redirection information now so that
569 # we can safely close the connection after login
571 redirect_info
= (await self
.rpc({
573 "request": "RedirectInfo",
576 raise errors
.JujuRedirectException(redirect_info
) from e
580 def __init__(self
, task
, loop
):
581 self
.stopped
= asyncio
.Event(loop
=loop
)
589 return await self
.task()
593 self
.loop
.create_task(run())
596 def _macaroons_for_domain(cookies
, domain
):
597 '''Return any macaroons from the given cookie jar that
598 apply to the given domain name.'''
599 req
= urllib
.request
.Request('https://' + domain
+ '/')
600 cookies
.add_cookie_header(req
)
601 return httpbakery
.extract_macaroons(req
)