Native charm support
[osm/N2VC.git] / modules / libjuju / juju / client / connection.py
1 import asyncio
2 import base64
3 import json
4 import logging
5 import ssl
6 import urllib.request
7 import weakref
8 from concurrent.futures import CancelledError
9 from http.client import HTTPSConnection
10
11 import macaroonbakery.httpbakery as httpbakery
12 import macaroonbakery.bakery as bakery
13 import websockets
14 from juju import errors, tag, utils
15 from juju.client import client
16 from juju.utils import IdQueue
17
18 log = logging.getLogger('juju.client.connection')
19
20
21 class Monitor:
22 """
23 Monitor helper class for our Connection class.
24
25 Contains a reference to an instantiated Connection, along with a
26 reference to the Connection.receiver Future. Upon inspection of
27 these objects, this class determines whether the connection is in
28 an 'error', 'connected' or 'disconnected' state.
29
30 Use this class to stay up to date on the health of a connection,
31 and take appropriate action if the connection errors out due to
32 network issues or other unexpected circumstances.
33
34 """
35 ERROR = 'error'
36 CONNECTED = 'connected'
37 DISCONNECTING = 'disconnecting'
38 DISCONNECTED = 'disconnected'
39
40 def __init__(self, connection):
41 self.connection = weakref.ref(connection)
42 self.reconnecting = asyncio.Lock(loop=connection.loop)
43 self.close_called = asyncio.Event(loop=connection.loop)
44
45 @property
46 def status(self):
47 """
48 Determine the status of the connection and receiver, and return
49 ERROR, CONNECTED, or DISCONNECTED as appropriate.
50
51 For simplicity, we only consider ourselves to be connected
52 after the Connection class has setup a receiver task. This
53 only happens after the websocket is open, and the connection
54 isn't usable until that receiver has been started.
55
56 """
57 connection = self.connection()
58
59 # the connection instance was destroyed but someone kept
60 # a separate reference to the monitor for some reason
61 if not connection:
62 return self.DISCONNECTED
63
64 # connection cleanly disconnected or not yet opened
65 if not connection.ws:
66 return self.DISCONNECTED
67
68 # close called but not yet complete
69 if self.close_called.is_set():
70 return self.DISCONNECTING
71
72 # connection closed uncleanly (we didn't call connection.close)
73 stopped = connection._receiver_task.stopped.is_set()
74 if stopped or not connection.ws.open:
75 return self.ERROR
76
77 # everything is fine!
78 return self.CONNECTED
79
80
81 class Connection:
82 """
83 Usage::
84
85 # Connect to an arbitrary api server
86 client = await Connection.connect(
87 api_endpoint, model_uuid, username, password, cacert)
88
89 Note: Any connection method or constructor can accept an optional `loop`
90 argument to override the default event loop from `asyncio.get_event_loop`.
91 """
92
93 MAX_FRAME_SIZE = 2**22
94 "Maximum size for a single frame. Defaults to 4MB."
95
96 @classmethod
97 async def connect(
98 cls,
99 endpoint=None,
100 uuid=None,
101 username=None,
102 password=None,
103 cacert=None,
104 bakery_client=None,
105 loop=None,
106 max_frame_size=None,
107 retries=3,
108 retry_backoff=10,
109 ):
110 """Connect to the websocket.
111
112 If uuid is None, the connection will be to the controller. Otherwise it
113 will be to the model.
114
115 :param str endpoint: The hostname:port of the controller to connect to.
116 :param str uuid: The model UUID to connect to (None for a
117 controller-only connection).
118 :param str username: The username for controller-local users (or None
119 to use macaroon-based login.)
120 :param str password: The password for controller-local users.
121 :param str cacert: The CA certificate of the controller
122 (PEM formatted).
123 :param httpbakery.Client bakery_client: The macaroon bakery client to
124 to use when performing macaroon-based login. Macaroon tokens
125 acquired when logging will be saved to bakery_client.cookies.
126 If this is None, a default bakery_client will be used.
127 :param asyncio.BaseEventLoop loop: The event loop to use for async
128 operations.
129 :param int max_frame_size: The maximum websocket frame size to allow.
130 :param int retries: When connecting or reconnecting, and all endpoints
131 fail, how many times to retry the connection before giving up.
132 :param int retry_backoff: Number of seconds to increase the wait
133 between connection retry attempts (a backoff of 10 with 3 retries
134 would wait 10s, 20s, and 30s).
135 """
136 self = cls()
137 if endpoint is None:
138 raise ValueError('no endpoint provided')
139 self.uuid = uuid
140 if bakery_client is None:
141 bakery_client = httpbakery.Client()
142 self.bakery_client = bakery_client
143 if username and '@' in username and not username.endswith('@local'):
144 # We're trying to log in as an external user - we need to use
145 # macaroon authentication with no username or password.
146 if password is not None:
147 raise errors.JujuAuthError('cannot log in as external '
148 'user with a password')
149 username = None
150 self.usertag = tag.user(username)
151 self.password = password
152 self.loop = loop or asyncio.get_event_loop()
153
154 self.__request_id__ = 0
155
156 # The following instance variables are initialized by the
157 # _connect_with_redirect method, but create them here
158 # as a reminder that they will exist.
159 self.addr = None
160 self.ws = None
161 self.endpoint = None
162 self.cacert = None
163 self.info = None
164
165 # Create that _Task objects but don't start the tasks yet.
166 self._pinger_task = _Task(self._pinger, self.loop)
167 self._receiver_task = _Task(self._receiver, self.loop)
168
169 self._retries = retries
170 self._retry_backoff = retry_backoff
171
172 self.facades = {}
173 self.messages = IdQueue(loop=self.loop)
174 self.monitor = Monitor(connection=self)
175 if max_frame_size is None:
176 max_frame_size = self.MAX_FRAME_SIZE
177 self.max_frame_size = max_frame_size
178 await self._connect_with_redirect([(endpoint, cacert)])
179 return self
180
181 @property
182 def username(self):
183 if not self.usertag:
184 return None
185 return self.usertag[len('user-'):]
186
187 @property
188 def is_open(self):
189 return self.monitor.status == Monitor.CONNECTED
190
191 def _get_ssl(self, cert=None):
192 return ssl.create_default_context(
193 purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
194
195 async def _open(self, endpoint, cacert):
196 if self.uuid:
197 url = "wss://{}/model/{}/api".format(endpoint, self.uuid)
198 else:
199 url = "wss://{}/api".format(endpoint)
200
201 return (await websockets.connect(
202 url,
203 ssl=self._get_ssl(cacert),
204 loop=self.loop,
205 max_size=self.max_frame_size,
206 ), url, endpoint, cacert)
207
208 async def close(self):
209 if not self.ws:
210 return
211 self.monitor.close_called.set()
212 await self._pinger_task.stopped.wait()
213 await self._receiver_task.stopped.wait()
214 await self.ws.close()
215 self.ws = None
216
217 async def _recv(self, request_id):
218 if not self.is_open:
219 raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
220 return await self.messages.get(request_id)
221
222 async def _receiver(self):
223 try:
224 while self.is_open:
225 result = await utils.run_with_interrupt(
226 self.ws.recv(),
227 self.monitor.close_called,
228 loop=self.loop)
229 if self.monitor.close_called.is_set():
230 break
231 if result is not None:
232 result = json.loads(result)
233 await self.messages.put(result['request-id'], result)
234 except CancelledError:
235 pass
236 except websockets.ConnectionClosed as e:
237 log.warning('Receiver: Connection closed, reconnecting')
238 await self.messages.put_all(e)
239 # the reconnect has to be done as a task because the receiver will
240 # be cancelled by the reconnect and we don't want the reconnect
241 # to be aborted half-way through
242 self.loop.create_task(self.reconnect())
243 return
244 except Exception as e:
245 log.exception("Error in receiver")
246 # make pending listeners aware of the error
247 await self.messages.put_all(e)
248 raise
249
250 async def _pinger(self):
251 '''
252 A Controller can time us out if we are silent for too long. This
253 is especially true in JaaS, which has a fairly strict timeout.
254
255 To prevent timing out, we send a ping every ten seconds.
256
257 '''
258 async def _do_ping():
259 try:
260 await pinger_facade.Ping()
261 await asyncio.sleep(10, loop=self.loop)
262 except CancelledError:
263 pass
264
265 pinger_facade = client.PingerFacade.from_connection(self)
266 try:
267 while True:
268 await utils.run_with_interrupt(
269 _do_ping(),
270 self.monitor.close_called,
271 loop=self.loop)
272 if self.monitor.close_called.is_set():
273 break
274 except websockets.exceptions.ConnectionClosed:
275 # The connection has closed - we can't do anything
276 # more until the connection is restarted.
277 log.debug('ping failed because of closed connection')
278 pass
279
280 async def rpc(self, msg, encoder=None):
281 '''Make an RPC to the API. The message is encoded as JSON
282 using the given encoder if any.
283 :param msg: Parameters for the call (will be encoded as JSON).
284 :param encoder: Encoder to be used when encoding the message.
285 :return: The result of the call.
286 :raises JujuAPIError: When there's an error returned.
287 :raises JujuError:
288 '''
289 self.__request_id__ += 1
290 msg['request-id'] = self.__request_id__
291 if'params' not in msg:
292 msg['params'] = {}
293 if "version" not in msg:
294 msg['version'] = self.facades[msg['type']]
295 outgoing = json.dumps(msg, indent=2, cls=encoder)
296 log.debug('connection {} -> {}'.format(id(self), outgoing))
297 for attempt in range(3):
298 if self.monitor.status == Monitor.DISCONNECTED:
299 # closed cleanly; shouldn't try to reconnect
300 raise websockets.exceptions.ConnectionClosed(
301 0, 'websocket closed')
302 try:
303 await self.ws.send(outgoing)
304 break
305 except websockets.ConnectionClosed:
306 if attempt == 2:
307 raise
308 log.warning('RPC: Connection closed, reconnecting')
309 # the reconnect has to be done in a separate task because,
310 # if it is triggered by the pinger, then this RPC call will
311 # be cancelled when the pinger is cancelled by the reconnect,
312 # and we don't want the reconnect to be aborted halfway through
313 await asyncio.wait([self.reconnect()], loop=self.loop)
314 if self.monitor.status != Monitor.CONNECTED:
315 # reconnect failed; abort and shutdown
316 log.error('RPC: Automatic reconnect failed')
317 raise
318 result = await self._recv(msg['request-id'])
319 log.debug('connection {} <- {}'.format(id(self), result))
320
321 if not result:
322 return result
323
324 if 'error' in result:
325 # API Error Response
326 raise errors.JujuAPIError(result)
327
328 if 'response' not in result:
329 # This may never happen
330 return result
331
332 if 'results' in result['response']:
333 # Check for errors in a result list.
334 # TODO This loses the results that might have succeeded.
335 # Perhaps JujuError should return all the results including
336 # errors, or perhaps a keyword parameter to the rpc method
337 # could be added to trigger this behaviour.
338 err_results = []
339 for res in result['response']['results']:
340 if res.get('error', {}).get('message'):
341 err_results.append(res['error']['message'])
342 if err_results:
343 raise errors.JujuError(err_results)
344
345 elif result['response'].get('error', {}).get('message'):
346 raise errors.JujuError(result['response']['error']['message'])
347
348 return result
349
350 def _http_headers(self):
351 """Return dictionary of http headers necessary for making an http
352 connection to the endpoint of this Connection.
353
354 :return: Dictionary of headers
355
356 """
357 if not self.usertag:
358 return {}
359
360 creds = u'{}:{}'.format(
361 self.usertag,
362 self.password or ''
363 )
364 token = base64.b64encode(creds.encode())
365 return {
366 'Authorization': 'Basic {}'.format(token.decode())
367 }
368
369 def https_connection(self):
370 """Return an https connection to this Connection's endpoint.
371
372 Returns a 3-tuple containing::
373
374 1. The :class:`HTTPSConnection` instance
375 2. Dictionary of auth headers to be used with the connection
376 3. The root url path (str) to be used for requests.
377
378 """
379 endpoint = self.endpoint
380 host, remainder = endpoint.split(':', 1)
381 port = remainder
382 if '/' in remainder:
383 port, _ = remainder.split('/', 1)
384
385 conn = HTTPSConnection(
386 host, int(port),
387 context=self._get_ssl(self.cacert),
388 )
389
390 path = (
391 "/model/{}".format(self.uuid)
392 if self.uuid else ""
393 )
394 return conn, self._http_headers(), path
395
396 async def clone(self):
397 """Return a new Connection, connected to the same websocket endpoint
398 as this one.
399
400 """
401 return await Connection.connect(**self.connect_params())
402
403 def connect_params(self):
404 """Return a tuple of parameters suitable for passing to
405 Connection.connect that can be used to make a new connection
406 to the same controller (and model if specified. The first
407 element in the returned tuple holds the endpoint argument;
408 the other holds a dict of the keyword args.
409 """
410 return {
411 'endpoint': self.endpoint,
412 'uuid': self.uuid,
413 'username': self.username,
414 'password': self.password,
415 'cacert': self.cacert,
416 'bakery_client': self.bakery_client,
417 'loop': self.loop,
418 'max_frame_size': self.max_frame_size,
419 }
420
421 async def controller(self):
422 """Return a Connection to the controller at self.endpoint
423 """
424 return await Connection.connect(
425 self.endpoint,
426 username=self.username,
427 password=self.password,
428 cacert=self.cacert,
429 bakery_client=self.bakery_client,
430 loop=self.loop,
431 max_frame_size=self.max_frame_size,
432 )
433
434 async def reconnect(self):
435 """ Force a reconnection.
436 """
437 monitor = self.monitor
438 if monitor.reconnecting.locked() or monitor.close_called.is_set():
439 return
440 async with monitor.reconnecting:
441 await self.close()
442 await self._connect_with_login([(self.endpoint, self.cacert)])
443
444 async def _connect(self, endpoints):
445 if len(endpoints) == 0:
446 raise errors.JujuConnectionError('no endpoints to connect to')
447
448 async def _try_endpoint(endpoint, cacert, delay):
449 if delay:
450 await asyncio.sleep(delay)
451 return await self._open(endpoint, cacert)
452
453 # Try all endpoints in parallel, with slight increasing delay (+100ms
454 # for each subsequent endpoint); the delay allows us to prefer the
455 # earlier endpoints over the latter. Use first successful connection.
456 tasks = [self.loop.create_task(_try_endpoint(endpoint, cacert,
457 0.1 * i))
458 for i, (endpoint, cacert) in enumerate(endpoints)]
459 for attempt in range(self._retries + 1):
460 for task in asyncio.as_completed(tasks, loop=self.loop):
461 try:
462 result = await task
463 break
464 except ConnectionError:
465 continue # ignore; try another endpoint
466 else:
467 _endpoints_str = ', '.join([endpoint
468 for endpoint, cacert in endpoints])
469 if attempt < self._retries:
470 log.debug('Retrying connection to endpoints: {}; '
471 'attempt {} of {}'.format(_endpoints_str,
472 attempt + 1,
473 self._retries + 1))
474 await asyncio.sleep((attempt + 1) * self._retry_backoff)
475 continue
476 else:
477 raise errors.JujuConnectionError(
478 'Unable to connect to any endpoint: '
479 '{}'.format(_endpoints_str))
480 # only executed if inner loop's else did not continue
481 # (i.e., inner loop did break due to successful connection)
482 break
483 for task in tasks:
484 task.cancel()
485 self.ws = result[0]
486 self.addr = result[1]
487 self.endpoint = result[2]
488 self.cacert = result[3]
489 self._receiver_task.start()
490 log.debug("Driver connected to juju %s", self.addr)
491 self.monitor.close_called.clear()
492
493 async def _connect_with_login(self, endpoints):
494 """Connect to the websocket.
495
496 If uuid is None, the connection will be to the controller. Otherwise it
497 will be to the model.
498 :return: The response field of login response JSON object.
499 """
500 success = False
501 try:
502 await self._connect(endpoints)
503 # It's possible that we may get several discharge-required errors,
504 # corresponding to different levels of authentication, so retry
505 # a few times.
506 for i in range(0, 2):
507 result = (await self.login())['response']
508 macaroonJSON = result.get('discharge-required')
509 if macaroonJSON is None:
510 self.info = result
511 success = True
512 return result
513 macaroon = bakery.Macaroon.from_dict(macaroonJSON)
514 self.bakery_client.handle_error(
515 httpbakery.Error(
516 code=httpbakery.ERR_DISCHARGE_REQUIRED,
517 message=result.get('discharge-required-error'),
518 version=macaroon.version,
519 info=httpbakery.ErrorInfo(
520 macaroon=macaroon,
521 macaroon_path=result.get('macaroon-path'),
522 ),
523 ),
524 # note: remove the port number.
525 'https://' + self.endpoint + '/',
526 )
527 raise errors.JujuAuthError('failed to authenticate '
528 'after several attempts')
529 finally:
530 if not success:
531 await self.close()
532
533 async def _connect_with_redirect(self, endpoints):
534 try:
535 login_result = await self._connect_with_login(endpoints)
536 except errors.JujuRedirectException as e:
537 login_result = await self._connect_with_login(e.endpoints)
538 self._build_facades(login_result.get('facades', {}))
539 self._pinger_task.start()
540
541 def _build_facades(self, facades):
542 self.facades.clear()
543 for facade in facades:
544 self.facades[facade['name']] = facade['versions'][-1]
545
546 async def login(self):
547 params = {}
548 params['auth-tag'] = self.usertag
549 if self.password:
550 params['credentials'] = self.password
551 else:
552 macaroons = _macaroons_for_domain(self.bakery_client.cookies,
553 self.endpoint)
554 params['macaroons'] = [[bakery.macaroon_to_dict(m) for m in ms]
555 for ms in macaroons]
556
557 try:
558 return await self.rpc({
559 "type": "Admin",
560 "request": "Login",
561 "version": 3,
562 "params": params,
563 })
564 except errors.JujuAPIError as e:
565 if e.error_code != 'redirection required':
566 raise
567 log.info('Controller requested redirect')
568 # Fetch additional redirection information now so that
569 # we can safely close the connection after login
570 # fails.
571 redirect_info = (await self.rpc({
572 "type": "Admin",
573 "request": "RedirectInfo",
574 "version": 3,
575 }))['response']
576 raise errors.JujuRedirectException(redirect_info) from e
577
578
579 class _Task:
580 def __init__(self, task, loop):
581 self.stopped = asyncio.Event(loop=loop)
582 self.stopped.set()
583 self.task = task
584 self.loop = loop
585
586 def start(self):
587 async def run():
588 try:
589 return await self.task()
590 finally:
591 self.stopped.set()
592 self.stopped.clear()
593 self.loop.create_task(run())
594
595
596 def _macaroons_for_domain(cookies, domain):
597 '''Return any macaroons from the given cookie jar that
598 apply to the given domain name.'''
599 req = urllib.request.Request('https://' + domain + '/')
600 cookies.add_cookie_header(req)
601 return httpbakery.extract_macaroons(req)