8 from concurrent
.futures
import CancelledError
9 from http
.client
import HTTPSConnection
11 import macaroonbakery
.httpbakery
as httpbakery
12 import macaroonbakery
.bakery
as bakery
14 from juju
import errors
, tag
, utils
15 from juju
.client
import client
16 from juju
.utils
import IdQueue
18 log
= logging
.getLogger('juju.client.connection')
23 Monitor helper class for our Connection class.
25 Contains a reference to an instantiated Connection, along with a
26 reference to the Connection.receiver Future. Upon inspection of
27 these objects, this class determines whether the connection is in
28 an 'error', 'connected' or 'disconnected' state.
30 Use this class to stay up to date on the health of a connection,
31 and take appropriate action if the connection errors out due to
32 network issues or other unexpected circumstances.
36 CONNECTED
= 'connected'
37 DISCONNECTING
= 'disconnecting'
38 DISCONNECTED
= 'disconnected'
40 def __init__(self
, connection
):
41 self
.connection
= weakref
.ref(connection
)
42 self
.reconnecting
= asyncio
.Lock(loop
=connection
.loop
)
43 self
.close_called
= asyncio
.Event(loop
=connection
.loop
)
48 Determine the status of the connection and receiver, and return
49 ERROR, CONNECTED, or DISCONNECTED as appropriate.
51 For simplicity, we only consider ourselves to be connected
52 after the Connection class has setup a receiver task. This
53 only happens after the websocket is open, and the connection
54 isn't usable until that receiver has been started.
57 connection
= self
.connection()
59 # the connection instance was destroyed but someone kept
60 # a separate reference to the monitor for some reason
62 return self
.DISCONNECTED
64 # connection cleanly disconnected or not yet opened
66 return self
.DISCONNECTED
68 # close called but not yet complete
69 if self
.close_called
.is_set():
70 return self
.DISCONNECTING
72 # connection closed uncleanly (we didn't call connection.close)
73 stopped
= connection
._receiver
_task
.stopped
.is_set()
74 if stopped
or not connection
.ws
.open:
85 # Connect to an arbitrary api server
86 client = await Connection.connect(
87 api_endpoint, model_uuid, username, password, cacert)
89 Note: Any connection method or constructor can accept an optional `loop`
90 argument to override the default event loop from `asyncio.get_event_loop`.
93 MAX_FRAME_SIZE
= 2**22
94 "Maximum size for a single frame. Defaults to 4MB."
108 """Connect to the websocket.
110 If uuid is None, the connection will be to the controller. Otherwise it
111 will be to the model.
113 :param str endpoint: The hostname:port of the controller to connect to.
114 :param str uuid: The model UUID to connect to (None for a
115 controller-only connection).
116 :param str username: The username for controller-local users (or None
117 to use macaroon-based login.)
118 :param str password: The password for controller-local users.
119 :param str cacert: The CA certificate of the controller
121 :param httpbakery.Client bakery_client: The macaroon bakery client to
122 to use when performing macaroon-based login. Macaroon tokens
123 acquired when logging will be saved to bakery_client.cookies.
124 If this is None, a default bakery_client will be used.
125 :param asyncio.BaseEventLoop loop: The event loop to use for async
127 :param int max_frame_size: The maximum websocket frame size to allow.
131 raise ValueError('no endpoint provided')
133 if bakery_client
is None:
134 bakery_client
= httpbakery
.Client()
135 self
.bakery_client
= bakery_client
136 if username
and '@' in username
and not username
.endswith('@local'):
137 # We're trying to log in as an external user - we need to use
138 # macaroon authentication with no username or password.
139 if password
is not None:
140 raise errors
.JujuAuthError('cannot log in as external '
141 'user with a password')
143 self
.usertag
= tag
.user(username
)
144 self
.password
= password
145 self
.loop
= loop
or asyncio
.get_event_loop()
147 self
.__request
_id
__ = 0
149 # The following instance variables are initialized by the
150 # _connect_with_redirect method, but create them here
151 # as a reminder that they will exist.
158 # Create that _Task objects but don't start the tasks yet.
159 self
._pinger
_task
= _Task(self
._pinger
, self
.loop
)
160 self
._receiver
_task
= _Task(self
._receiver
, self
.loop
)
163 self
.messages
= IdQueue(loop
=self
.loop
)
164 self
.monitor
= Monitor(connection
=self
)
165 if max_frame_size
is None:
166 max_frame_size
= self
.MAX_FRAME_SIZE
167 self
.max_frame_size
= max_frame_size
168 await self
._connect
_with
_redirect
([(endpoint
, cacert
)])
175 return self
.usertag
[len('user-'):]
179 return self
.monitor
.status
== Monitor
.CONNECTED
181 def _get_ssl(self
, cert
=None):
182 return ssl
.create_default_context(
183 purpose
=ssl
.Purpose
.CLIENT_AUTH
, cadata
=cert
)
185 async def _open(self
, endpoint
, cacert
):
187 url
= "wss://{}/model/{}/api".format(endpoint
, self
.uuid
)
189 url
= "wss://{}/api".format(endpoint
)
191 return (await websockets
.connect(
193 ssl
=self
._get
_ssl
(cacert
),
195 max_size
=self
.max_frame_size
,
196 ), url
, endpoint
, cacert
)
198 async def close(self
):
201 self
.monitor
.close_called
.set()
202 await self
._pinger
_task
.stopped
.wait()
203 await self
._receiver
_task
.stopped
.wait()
204 await self
.ws
.close()
207 async def _recv(self
, request_id
):
209 raise websockets
.exceptions
.ConnectionClosed(0, 'websocket closed')
210 return await self
.messages
.get(request_id
)
212 async def _receiver(self
):
215 result
= await utils
.run_with_interrupt(
217 self
.monitor
.close_called
,
219 if self
.monitor
.close_called
.is_set():
221 if result
is not None:
222 result
= json
.loads(result
)
223 await self
.messages
.put(result
['request-id'], result
)
224 except CancelledError
:
226 except websockets
.ConnectionClosed
as e
:
227 log
.warning('Receiver: Connection closed, reconnecting')
228 await self
.messages
.put_all(e
)
229 # the reconnect has to be done as a task because the receiver will
230 # be cancelled by the reconnect and we don't want the reconnect
231 # to be aborted half-way through
232 self
.loop
.create_task(self
.reconnect())
234 except Exception as e
:
235 log
.exception("Error in receiver")
236 # make pending listeners aware of the error
237 await self
.messages
.put_all(e
)
240 async def _pinger(self
):
242 A Controller can time us out if we are silent for too long. This
243 is especially true in JaaS, which has a fairly strict timeout.
245 To prevent timing out, we send a ping every ten seconds.
248 async def _do_ping():
250 await pinger_facade
.Ping()
251 await asyncio
.sleep(10, loop
=self
.loop
)
252 except CancelledError
:
255 pinger_facade
= client
.PingerFacade
.from_connection(self
)
258 await utils
.run_with_interrupt(
260 self
.monitor
.close_called
,
262 if self
.monitor
.close_called
.is_set():
264 except websockets
.exceptions
.ConnectionClosed
:
265 # The connection has closed - we can't do anything
266 # more until the connection is restarted.
267 log
.debug('ping failed because of closed connection')
270 async def rpc(self
, msg
, encoder
=None):
271 '''Make an RPC to the API. The message is encoded as JSON
272 using the given encoder if any.
273 :param msg: Parameters for the call (will be encoded as JSON).
274 :param encoder: Encoder to be used when encoding the message.
275 :return: The result of the call.
276 :raises JujuAPIError: When there's an error returned.
279 self
.__request
_id
__ += 1
280 msg
['request-id'] = self
.__request
_id
__
281 if'params' not in msg
:
283 if "version" not in msg
:
284 msg
['version'] = self
.facades
[msg
['type']]
285 outgoing
= json
.dumps(msg
, indent
=2, cls
=encoder
)
286 log
.debug('connection {} -> {}'.format(id(self
), outgoing
))
287 for attempt
in range(3):
288 if self
.monitor
.status
== Monitor
.DISCONNECTED
:
289 # closed cleanly; shouldn't try to reconnect
290 raise websockets
.exceptions
.ConnectionClosed(
291 0, 'websocket closed')
293 await self
.ws
.send(outgoing
)
295 except websockets
.ConnectionClosed
:
298 log
.warning('RPC: Connection closed, reconnecting')
299 # the reconnect has to be done in a separate task because,
300 # if it is triggered by the pinger, then this RPC call will
301 # be cancelled when the pinger is cancelled by the reconnect,
302 # and we don't want the reconnect to be aborted halfway through
303 await asyncio
.wait([self
.reconnect()], loop
=self
.loop
)
304 if self
.monitor
.status
!= Monitor
.CONNECTED
:
305 # reconnect failed; abort and shutdown
306 log
.error('RPC: Automatic reconnect failed')
308 result
= await self
._recv
(msg
['request-id'])
309 log
.debug('connection {} <- {}'.format(id(self
), result
))
314 if 'error' in result
:
316 raise errors
.JujuAPIError(result
)
318 if 'response' not in result
:
319 # This may never happen
322 if 'results' in result
['response']:
323 # Check for errors in a result list.
324 # TODO This loses the results that might have succeeded.
325 # Perhaps JujuError should return all the results including
326 # errors, or perhaps a keyword parameter to the rpc method
327 # could be added to trigger this behaviour.
329 for res
in result
['response']['results']:
330 if res
.get('error', {}).get('message'):
331 err_results
.append(res
['error']['message'])
333 raise errors
.JujuError(err_results
)
335 elif result
['response'].get('error', {}).get('message'):
336 raise errors
.JujuError(result
['response']['error']['message'])
340 def _http_headers(self
):
341 """Return dictionary of http headers necessary for making an http
342 connection to the endpoint of this Connection.
344 :return: Dictionary of headers
350 creds
= u
'{}:{}'.format(
354 token
= base64
.b64encode(creds
.encode())
356 'Authorization': 'Basic {}'.format(token
.decode())
359 def https_connection(self
):
360 """Return an https connection to this Connection's endpoint.
362 Returns a 3-tuple containing::
364 1. The :class:`HTTPSConnection` instance
365 2. Dictionary of auth headers to be used with the connection
366 3. The root url path (str) to be used for requests.
369 endpoint
= self
.endpoint
370 host
, remainder
= endpoint
.split(':', 1)
373 port
, _
= remainder
.split('/', 1)
375 conn
= HTTPSConnection(
377 context
=self
._get
_ssl
(self
.cacert
),
381 "/model/{}".format(self
.uuid
)
384 return conn
, self
._http
_headers
(), path
386 async def clone(self
):
387 """Return a new Connection, connected to the same websocket endpoint
391 return await Connection
.connect(**self
.connect_params())
393 def connect_params(self
):
394 """Return a tuple of parameters suitable for passing to
395 Connection.connect that can be used to make a new connection
396 to the same controller (and model if specified. The first
397 element in the returned tuple holds the endpoint argument;
398 the other holds a dict of the keyword args.
401 'endpoint': self
.endpoint
,
403 'username': self
.username
,
404 'password': self
.password
,
405 'cacert': self
.cacert
,
406 'bakery_client': self
.bakery_client
,
408 'max_frame_size': self
.max_frame_size
,
411 async def controller(self
):
412 """Return a Connection to the controller at self.endpoint
414 return await Connection
.connect(
416 username
=self
.username
,
417 password
=self
.password
,
419 bakery_client
=self
.bakery_client
,
421 max_frame_size
=self
.max_frame_size
,
424 async def reconnect(self
):
425 """ Force a reconnection.
427 monitor
= self
.monitor
428 if monitor
.reconnecting
.locked() or monitor
.close_called
.is_set():
430 async with monitor
.reconnecting
:
432 await self
._connect
_with
_login
([(self
.endpoint
, self
.cacert
)])
434 async def _connect(self
, endpoints
):
435 if len(endpoints
) == 0:
436 raise errors
.JujuConnectionError('no endpoints to connect to')
438 async def _try_endpoint(endpoint
, cacert
, delay
):
440 await asyncio
.sleep(delay
)
441 return await self
._open
(endpoint
, cacert
)
443 # Try all endpoints in parallel, with slight increasing delay (+100ms
444 # for each subsequent endpoint); the delay allows us to prefer the
445 # earlier endpoints over the latter. Use first successful connection.
446 tasks
= [self
.loop
.create_task(_try_endpoint(endpoint
, cacert
,
448 for i
, (endpoint
, cacert
) in enumerate(endpoints
)]
449 for task
in asyncio
.as_completed(tasks
, loop
=self
.loop
):
453 except ConnectionError
:
454 continue # ignore; try another endpoint
456 raise errors
.JujuConnectionError(
457 'Unable to connect to any endpoint: {}'.format(', '.join([
458 endpoint
for endpoint
, cacert
in endpoints
])))
462 self
.addr
= result
[1]
463 self
.endpoint
= result
[2]
464 self
.cacert
= result
[3]
465 self
._receiver
_task
.start()
466 log
.info("Driver connected to juju %s", self
.addr
)
467 self
.monitor
.close_called
.clear()
469 async def _connect_with_login(self
, endpoints
):
470 """Connect to the websocket.
472 If uuid is None, the connection will be to the controller. Otherwise it
473 will be to the model.
474 :return: The response field of login response JSON object.
478 await self
._connect
(endpoints
)
479 # It's possible that we may get several discharge-required errors,
480 # corresponding to different levels of authentication, so retry
482 for i
in range(0, 2):
483 result
= (await self
.login())['response']
484 macaroonJSON
= result
.get('discharge-required')
485 if macaroonJSON
is None:
489 macaroon
= bakery
.Macaroon
.from_dict(macaroonJSON
)
490 self
.bakery_client
.handle_error(
492 code
=httpbakery
.ERR_DISCHARGE_REQUIRED
,
493 message
=result
.get('discharge-required-error'),
494 version
=macaroon
.version
,
495 info
=httpbakery
.ErrorInfo(
497 macaroon_path
=result
.get('macaroon-path'),
500 # note: remove the port number.
501 'https://' + self
.endpoint
+ '/',
503 raise errors
.JujuAuthError('failed to authenticate '
504 'after several attempts')
509 async def _connect_with_redirect(self
, endpoints
):
511 login_result
= await self
._connect
_with
_login
(endpoints
)
512 except errors
.JujuRedirectException
as e
:
513 login_result
= await self
._connect
_with
_login
(e
.endpoints
)
514 self
._build
_facades
(login_result
.get('facades', {}))
515 self
._pinger
_task
.start()
517 def _build_facades(self
, facades
):
519 for facade
in facades
:
520 self
.facades
[facade
['name']] = facade
['versions'][-1]
522 async def login(self
):
524 params
['auth-tag'] = self
.usertag
526 params
['credentials'] = self
.password
528 macaroons
= _macaroons_for_domain(self
.bakery_client
.cookies
,
530 params
['macaroons'] = [[bakery
.macaroon_to_dict(m
) for m
in ms
]
534 return await self
.rpc({
540 except errors
.JujuAPIError
as e
:
541 if e
.error_code
!= 'redirection required':
543 log
.info('Controller requested redirect')
544 # Fetch additional redirection information now so that
545 # we can safely close the connection after login
547 redirect_info
= (await self
.rpc({
549 "request": "RedirectInfo",
552 raise errors
.JujuRedirectException(redirect_info
) from e
556 def __init__(self
, task
, loop
):
557 self
.stopped
= asyncio
.Event(loop
=loop
)
565 return await(self
.task())
569 self
.loop
.create_task(run())
572 def _macaroons_for_domain(cookies
, domain
):
573 '''Return any macaroons from the given cookie jar that
574 apply to the given domain name.'''
575 req
= urllib
.request
.Request('https://' + domain
+ '/')
576 cookies
.add_cookie_header(req
)
577 return httpbakery
.extract_macaroons(req
)