[bug 581] Fix parameter checking if no data-type
[osm/N2VC.git] / modules / libjuju / juju / client / connection.py
1 import asyncio
2 import base64
3 import json
4 import logging
5 import ssl
6 import urllib.request
7 import weakref
8 from concurrent.futures import CancelledError
9 from http.client import HTTPSConnection
10
11 import macaroonbakery.httpbakery as httpbakery
12 import macaroonbakery.bakery as bakery
13 import websockets
14 from juju import errors, tag, utils
15 from juju.client import client
16 from juju.utils import IdQueue
17
18 log = logging.getLogger('juju.client.connection')
19
20
21 class Monitor:
22 """
23 Monitor helper class for our Connection class.
24
25 Contains a reference to an instantiated Connection, along with a
26 reference to the Connection.receiver Future. Upon inspection of
27 these objects, this class determines whether the connection is in
28 an 'error', 'connected' or 'disconnected' state.
29
30 Use this class to stay up to date on the health of a connection,
31 and take appropriate action if the connection errors out due to
32 network issues or other unexpected circumstances.
33
34 """
35 ERROR = 'error'
36 CONNECTED = 'connected'
37 DISCONNECTING = 'disconnecting'
38 DISCONNECTED = 'disconnected'
39
40 def __init__(self, connection):
41 self.connection = weakref.ref(connection)
42 self.reconnecting = asyncio.Lock(loop=connection.loop)
43 self.close_called = asyncio.Event(loop=connection.loop)
44
45 @property
46 def status(self):
47 """
48 Determine the status of the connection and receiver, and return
49 ERROR, CONNECTED, or DISCONNECTED as appropriate.
50
51 For simplicity, we only consider ourselves to be connected
52 after the Connection class has setup a receiver task. This
53 only happens after the websocket is open, and the connection
54 isn't usable until that receiver has been started.
55
56 """
57 connection = self.connection()
58
59 # the connection instance was destroyed but someone kept
60 # a separate reference to the monitor for some reason
61 if not connection:
62 return self.DISCONNECTED
63
64 # connection cleanly disconnected or not yet opened
65 if not connection.ws:
66 return self.DISCONNECTED
67
68 # close called but not yet complete
69 if self.close_called.is_set():
70 return self.DISCONNECTING
71
72 # connection closed uncleanly (we didn't call connection.close)
73 stopped = connection._receiver_task.stopped.is_set()
74 if stopped or not connection.ws.open:
75 return self.ERROR
76
77 # everything is fine!
78 return self.CONNECTED
79
80
81 class Connection:
82 """
83 Usage::
84
85 # Connect to an arbitrary api server
86 client = await Connection.connect(
87 api_endpoint, model_uuid, username, password, cacert)
88
89 Note: Any connection method or constructor can accept an optional `loop`
90 argument to override the default event loop from `asyncio.get_event_loop`.
91 """
92
93 MAX_FRAME_SIZE = 2**22
94 "Maximum size for a single frame. Defaults to 4MB."
95
96 @classmethod
97 async def connect(
98 cls,
99 endpoint=None,
100 uuid=None,
101 username=None,
102 password=None,
103 cacert=None,
104 bakery_client=None,
105 loop=None,
106 max_frame_size=None,
107 ):
108 """Connect to the websocket.
109
110 If uuid is None, the connection will be to the controller. Otherwise it
111 will be to the model.
112
113 :param str endpoint: The hostname:port of the controller to connect to.
114 :param str uuid: The model UUID to connect to (None for a
115 controller-only connection).
116 :param str username: The username for controller-local users (or None
117 to use macaroon-based login.)
118 :param str password: The password for controller-local users.
119 :param str cacert: The CA certificate of the controller
120 (PEM formatted).
121 :param httpbakery.Client bakery_client: The macaroon bakery client to
122 to use when performing macaroon-based login. Macaroon tokens
123 acquired when logging will be saved to bakery_client.cookies.
124 If this is None, a default bakery_client will be used.
125 :param asyncio.BaseEventLoop loop: The event loop to use for async
126 operations.
127 :param int max_frame_size: The maximum websocket frame size to allow.
128 """
129 self = cls()
130 if endpoint is None:
131 raise ValueError('no endpoint provided')
132 self.uuid = uuid
133 if bakery_client is None:
134 bakery_client = httpbakery.Client()
135 self.bakery_client = bakery_client
136 if username and '@' in username and not username.endswith('@local'):
137 # We're trying to log in as an external user - we need to use
138 # macaroon authentication with no username or password.
139 if password is not None:
140 raise errors.JujuAuthError('cannot log in as external '
141 'user with a password')
142 username = None
143 self.usertag = tag.user(username)
144 self.password = password
145 self.loop = loop or asyncio.get_event_loop()
146
147 self.__request_id__ = 0
148
149 # The following instance variables are initialized by the
150 # _connect_with_redirect method, but create them here
151 # as a reminder that they will exist.
152 self.addr = None
153 self.ws = None
154 self.endpoint = None
155 self.cacert = None
156 self.info = None
157
158 # Create that _Task objects but don't start the tasks yet.
159 self._pinger_task = _Task(self._pinger, self.loop)
160 self._receiver_task = _Task(self._receiver, self.loop)
161
162 self.facades = {}
163 self.messages = IdQueue(loop=self.loop)
164 self.monitor = Monitor(connection=self)
165 if max_frame_size is None:
166 max_frame_size = self.MAX_FRAME_SIZE
167 self.max_frame_size = max_frame_size
168 await self._connect_with_redirect([(endpoint, cacert)])
169 return self
170
171 @property
172 def username(self):
173 if not self.usertag:
174 return None
175 return self.usertag[len('user-'):]
176
177 @property
178 def is_open(self):
179 return self.monitor.status == Monitor.CONNECTED
180
181 def _get_ssl(self, cert=None):
182 return ssl.create_default_context(
183 purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
184
185 async def _open(self, endpoint, cacert):
186 if self.uuid:
187 url = "wss://{}/model/{}/api".format(endpoint, self.uuid)
188 else:
189 url = "wss://{}/api".format(endpoint)
190
191 return (await websockets.connect(
192 url,
193 ssl=self._get_ssl(cacert),
194 loop=self.loop,
195 max_size=self.max_frame_size,
196 ), url, endpoint, cacert)
197
198 async def close(self):
199 if not self.ws:
200 return
201 self.monitor.close_called.set()
202 await self._pinger_task.stopped.wait()
203 await self._receiver_task.stopped.wait()
204 await self.ws.close()
205 self.ws = None
206
207 async def _recv(self, request_id):
208 if not self.is_open:
209 raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
210 return await self.messages.get(request_id)
211
212 async def _receiver(self):
213 try:
214 while self.is_open:
215 result = await utils.run_with_interrupt(
216 self.ws.recv(),
217 self.monitor.close_called,
218 loop=self.loop)
219 if self.monitor.close_called.is_set():
220 break
221 if result is not None:
222 result = json.loads(result)
223 await self.messages.put(result['request-id'], result)
224 except CancelledError:
225 pass
226 except websockets.ConnectionClosed as e:
227 log.warning('Receiver: Connection closed, reconnecting')
228 await self.messages.put_all(e)
229 # the reconnect has to be done as a task because the receiver will
230 # be cancelled by the reconnect and we don't want the reconnect
231 # to be aborted half-way through
232 self.loop.create_task(self.reconnect())
233 return
234 except Exception as e:
235 log.exception("Error in receiver")
236 # make pending listeners aware of the error
237 await self.messages.put_all(e)
238 raise
239
240 async def _pinger(self):
241 '''
242 A Controller can time us out if we are silent for too long. This
243 is especially true in JaaS, which has a fairly strict timeout.
244
245 To prevent timing out, we send a ping every ten seconds.
246
247 '''
248 async def _do_ping():
249 try:
250 await pinger_facade.Ping()
251 await asyncio.sleep(10, loop=self.loop)
252 except CancelledError:
253 pass
254
255 pinger_facade = client.PingerFacade.from_connection(self)
256 try:
257 while True:
258 await utils.run_with_interrupt(
259 _do_ping(),
260 self.monitor.close_called,
261 loop=self.loop)
262 if self.monitor.close_called.is_set():
263 break
264 except websockets.exceptions.ConnectionClosed:
265 # The connection has closed - we can't do anything
266 # more until the connection is restarted.
267 log.debug('ping failed because of closed connection')
268 pass
269
270 async def rpc(self, msg, encoder=None):
271 '''Make an RPC to the API. The message is encoded as JSON
272 using the given encoder if any.
273 :param msg: Parameters for the call (will be encoded as JSON).
274 :param encoder: Encoder to be used when encoding the message.
275 :return: The result of the call.
276 :raises JujuAPIError: When there's an error returned.
277 :raises JujuError:
278 '''
279 self.__request_id__ += 1
280 msg['request-id'] = self.__request_id__
281 if'params' not in msg:
282 msg['params'] = {}
283 if "version" not in msg:
284 msg['version'] = self.facades[msg['type']]
285 outgoing = json.dumps(msg, indent=2, cls=encoder)
286 log.debug('connection {} -> {}'.format(id(self), outgoing))
287 for attempt in range(3):
288 if self.monitor.status == Monitor.DISCONNECTED:
289 # closed cleanly; shouldn't try to reconnect
290 raise websockets.exceptions.ConnectionClosed(
291 0, 'websocket closed')
292 try:
293 await self.ws.send(outgoing)
294 break
295 except websockets.ConnectionClosed:
296 if attempt == 2:
297 raise
298 log.warning('RPC: Connection closed, reconnecting')
299 # the reconnect has to be done in a separate task because,
300 # if it is triggered by the pinger, then this RPC call will
301 # be cancelled when the pinger is cancelled by the reconnect,
302 # and we don't want the reconnect to be aborted halfway through
303 await asyncio.wait([self.reconnect()], loop=self.loop)
304 if self.monitor.status != Monitor.CONNECTED:
305 # reconnect failed; abort and shutdown
306 log.error('RPC: Automatic reconnect failed')
307 raise
308 result = await self._recv(msg['request-id'])
309 log.debug('connection {} <- {}'.format(id(self), result))
310
311 if not result:
312 return result
313
314 if 'error' in result:
315 # API Error Response
316 raise errors.JujuAPIError(result)
317
318 if 'response' not in result:
319 # This may never happen
320 return result
321
322 if 'results' in result['response']:
323 # Check for errors in a result list.
324 # TODO This loses the results that might have succeeded.
325 # Perhaps JujuError should return all the results including
326 # errors, or perhaps a keyword parameter to the rpc method
327 # could be added to trigger this behaviour.
328 err_results = []
329 for res in result['response']['results']:
330 if res.get('error', {}).get('message'):
331 err_results.append(res['error']['message'])
332 if err_results:
333 raise errors.JujuError(err_results)
334
335 elif result['response'].get('error', {}).get('message'):
336 raise errors.JujuError(result['response']['error']['message'])
337
338 return result
339
340 def _http_headers(self):
341 """Return dictionary of http headers necessary for making an http
342 connection to the endpoint of this Connection.
343
344 :return: Dictionary of headers
345
346 """
347 if not self.usertag:
348 return {}
349
350 creds = u'{}:{}'.format(
351 self.usertag,
352 self.password or ''
353 )
354 token = base64.b64encode(creds.encode())
355 return {
356 'Authorization': 'Basic {}'.format(token.decode())
357 }
358
359 def https_connection(self):
360 """Return an https connection to this Connection's endpoint.
361
362 Returns a 3-tuple containing::
363
364 1. The :class:`HTTPSConnection` instance
365 2. Dictionary of auth headers to be used with the connection
366 3. The root url path (str) to be used for requests.
367
368 """
369 endpoint = self.endpoint
370 host, remainder = endpoint.split(':', 1)
371 port = remainder
372 if '/' in remainder:
373 port, _ = remainder.split('/', 1)
374
375 conn = HTTPSConnection(
376 host, int(port),
377 context=self._get_ssl(self.cacert),
378 )
379
380 path = (
381 "/model/{}".format(self.uuid)
382 if self.uuid else ""
383 )
384 return conn, self._http_headers(), path
385
386 async def clone(self):
387 """Return a new Connection, connected to the same websocket endpoint
388 as this one.
389
390 """
391 return await Connection.connect(**self.connect_params())
392
393 def connect_params(self):
394 """Return a tuple of parameters suitable for passing to
395 Connection.connect that can be used to make a new connection
396 to the same controller (and model if specified. The first
397 element in the returned tuple holds the endpoint argument;
398 the other holds a dict of the keyword args.
399 """
400 return {
401 'endpoint': self.endpoint,
402 'uuid': self.uuid,
403 'username': self.username,
404 'password': self.password,
405 'cacert': self.cacert,
406 'bakery_client': self.bakery_client,
407 'loop': self.loop,
408 'max_frame_size': self.max_frame_size,
409 }
410
411 async def controller(self):
412 """Return a Connection to the controller at self.endpoint
413 """
414 return await Connection.connect(
415 self.endpoint,
416 username=self.username,
417 password=self.password,
418 cacert=self.cacert,
419 bakery_client=self.bakery_client,
420 loop=self.loop,
421 max_frame_size=self.max_frame_size,
422 )
423
424 async def reconnect(self):
425 """ Force a reconnection.
426 """
427 monitor = self.monitor
428 if monitor.reconnecting.locked() or monitor.close_called.is_set():
429 return
430 async with monitor.reconnecting:
431 await self.close()
432 await self._connect_with_login([(self.endpoint, self.cacert)])
433
434 async def _connect(self, endpoints):
435 if len(endpoints) == 0:
436 raise errors.JujuConnectionError('no endpoints to connect to')
437
438 async def _try_endpoint(endpoint, cacert, delay):
439 if delay:
440 await asyncio.sleep(delay)
441 return await self._open(endpoint, cacert)
442
443 # Try all endpoints in parallel, with slight increasing delay (+100ms
444 # for each subsequent endpoint); the delay allows us to prefer the
445 # earlier endpoints over the latter. Use first successful connection.
446 tasks = [self.loop.create_task(_try_endpoint(endpoint, cacert,
447 0.1 * i))
448 for i, (endpoint, cacert) in enumerate(endpoints)]
449 for task in asyncio.as_completed(tasks, loop=self.loop):
450 try:
451 result = await task
452 break
453 except ConnectionError:
454 continue # ignore; try another endpoint
455 else:
456 raise errors.JujuConnectionError(
457 'Unable to connect to any endpoint: {}'.format(', '.join([
458 endpoint for endpoint, cacert in endpoints])))
459 for task in tasks:
460 task.cancel()
461 self.ws = result[0]
462 self.addr = result[1]
463 self.endpoint = result[2]
464 self.cacert = result[3]
465 self._receiver_task.start()
466 log.info("Driver connected to juju %s", self.addr)
467 self.monitor.close_called.clear()
468
469 async def _connect_with_login(self, endpoints):
470 """Connect to the websocket.
471
472 If uuid is None, the connection will be to the controller. Otherwise it
473 will be to the model.
474 :return: The response field of login response JSON object.
475 """
476 success = False
477 try:
478 await self._connect(endpoints)
479 # It's possible that we may get several discharge-required errors,
480 # corresponding to different levels of authentication, so retry
481 # a few times.
482 for i in range(0, 2):
483 result = (await self.login())['response']
484 macaroonJSON = result.get('discharge-required')
485 if macaroonJSON is None:
486 self.info = result
487 success = True
488 return result
489 macaroon = bakery.Macaroon.from_dict(macaroonJSON)
490 self.bakery_client.handle_error(
491 httpbakery.Error(
492 code=httpbakery.ERR_DISCHARGE_REQUIRED,
493 message=result.get('discharge-required-error'),
494 version=macaroon.version,
495 info=httpbakery.ErrorInfo(
496 macaroon=macaroon,
497 macaroon_path=result.get('macaroon-path'),
498 ),
499 ),
500 # note: remove the port number.
501 'https://' + self.endpoint + '/',
502 )
503 raise errors.JujuAuthError('failed to authenticate '
504 'after several attempts')
505 finally:
506 if not success:
507 await self.close()
508
509 async def _connect_with_redirect(self, endpoints):
510 try:
511 login_result = await self._connect_with_login(endpoints)
512 except errors.JujuRedirectException as e:
513 login_result = await self._connect_with_login(e.endpoints)
514 self._build_facades(login_result.get('facades', {}))
515 self._pinger_task.start()
516
517 def _build_facades(self, facades):
518 self.facades.clear()
519 for facade in facades:
520 self.facades[facade['name']] = facade['versions'][-1]
521
522 async def login(self):
523 params = {}
524 params['auth-tag'] = self.usertag
525 if self.password:
526 params['credentials'] = self.password
527 else:
528 macaroons = _macaroons_for_domain(self.bakery_client.cookies,
529 self.endpoint)
530 params['macaroons'] = [[bakery.macaroon_to_dict(m) for m in ms]
531 for ms in macaroons]
532
533 try:
534 return await self.rpc({
535 "type": "Admin",
536 "request": "Login",
537 "version": 3,
538 "params": params,
539 })
540 except errors.JujuAPIError as e:
541 if e.error_code != 'redirection required':
542 raise
543 log.info('Controller requested redirect')
544 # Fetch additional redirection information now so that
545 # we can safely close the connection after login
546 # fails.
547 redirect_info = (await self.rpc({
548 "type": "Admin",
549 "request": "RedirectInfo",
550 "version": 3,
551 }))['response']
552 raise errors.JujuRedirectException(redirect_info) from e
553
554
555 class _Task:
556 def __init__(self, task, loop):
557 self.stopped = asyncio.Event(loop=loop)
558 self.stopped.set()
559 self.task = task
560 self.loop = loop
561
562 def start(self):
563 async def run():
564 try:
565 return await(self.task())
566 finally:
567 self.stopped.set()
568 self.stopped.clear()
569 self.loop.create_task(run())
570
571
572 def _macaroons_for_domain(cookies, domain):
573 '''Return any macaroons from the given cookie jar that
574 apply to the given domain name.'''
575 req = urllib.request.Request('https://' + domain + '/')
576 cookies.add_cookie_header(req)
577 return httpbakery.extract_macaroons(req)