Feature/api version support (#109)
[osm/N2VC.git] / juju / client / connection.py
1 import base64
2 import io
3 import json
4 import logging
5 import os
6 import random
7 import shlex
8 import ssl
9 import string
10 import subprocess
11 import websockets
12 from http.client import HTTPSConnection
13
14 import asyncio
15 import yaml
16
17 from juju import tag
18 from juju.client import client
19 from juju.client.version_map import VERSION_MAP
20 from juju.errors import JujuError, JujuAPIError, JujuConnectionError
21 from juju.utils import IdQueue
22
23 log = logging.getLogger("websocket")
24
25
26 class Monitor:
27 """
28 Monitor helper class for our Connection class.
29
30 Contains a reference to an instantiated Connection, along with a
31 reference to the Connection.receiver Future. Upon inspecttion of
32 these objects, this class determines whether the connection is in
33 an 'error', 'connected' or 'disconnected' state.
34
35 Use this class to stay up to date on the health of a connection,
36 and take appropriate action if the connection errors out due to
37 network issues or other unexpected circumstances.
38
39 """
40 ERROR = 'error'
41 CONNECTED = 'connected'
42 DISCONNECTED = 'disconnected'
43 UNKNOWN = 'unknown'
44
45 def __init__(self, connection):
46 self.connection = connection
47 self.receiver = None
48
49 @property
50 def status(self):
51 """
52 Determine the status of the connection and receiver, and return
53 ERROR, CONNECTED, or DISCONNECTED as appropriate.
54
55 For simplicity, we only consider ourselves to be connected
56 after the Connection class has setup a receiver task. This
57 only happens after the websocket is open, and the connection
58 isn't usable until that receiver has been started.
59
60 """
61
62 # DISCONNECTED: connection not yet open
63 if not self.connection.ws:
64 return self.DISCONNECTED
65 if not self.receiver:
66 return self.DISCONNECTED
67
68 # ERROR: Connection closed (or errored), but we didn't call
69 # connection.close
70 if not self.connection.close_called and self.receiver_exceptions():
71 return self.ERROR
72 if not self.connection.close_called and not self.connection.ws.open:
73 # The check for self.receiver existing above guards against the
74 # case where we're not open because we simply haven't
75 # setup the connection yet.
76 return self.ERROR
77
78 # DISCONNECTED: cleanly disconnected.
79 if self.connection.close_called and not self.connection.ws.open:
80 return self.DISCONNECTED
81
82 # CONNECTED: everything is fine!
83 if self.connection.ws.open:
84 return self.CONNECTED
85
86 # UNKNOWN: We should never hit this state -- if we do,
87 # something went wrong with the logic above, and we do not
88 # know what state the connection is in.
89 return self.UNKNOWN
90
91 def receiver_exceptions(self):
92 """
93 Return exceptions in the receiver, if any.
94
95 """
96 if not self.receiver:
97 return None
98 if not self.receiver.done():
99 return None
100 return self.receiver.exception()
101
102
103 class Connection:
104 """
105 Usage::
106
107 # Connect to an arbitrary api server
108 client = await Connection.connect(
109 api_endpoint, model_uuid, username, password, cacert)
110
111 # Connect using a controller/model name
112 client = await Connection.connect_model('local.local:default')
113
114 # Connect to the currently active model
115 client = await Connection.connect_current()
116
117 Note: Any connection method or constructor can accept an optional `loop`
118 argument to override the default event loop from `asyncio.get_event_loop`.
119 """
120 def __init__(
121 self, endpoint, uuid, username, password, cacert=None,
122 macaroons=None, loop=None):
123 self.endpoint = endpoint
124 self.uuid = uuid
125 self.username = username
126 self.password = password
127 self.macaroons = macaroons
128 self.cacert = cacert
129 self.loop = loop or asyncio.get_event_loop()
130
131 self.__request_id__ = 0
132 self.addr = None
133 self.ws = None
134 self.facades = {}
135 self.messages = IdQueue(loop=self.loop)
136 self.close_called = False
137 self.monitor = Monitor(connection=self)
138
139 @property
140 def is_open(self):
141 if self.ws:
142 return self.ws.open
143 return False
144
145 def _get_ssl(self, cert=None):
146 return ssl.create_default_context(
147 purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
148
149 async def open(self):
150 if self.uuid:
151 url = "wss://{}/model/{}/api".format(self.endpoint, self.uuid)
152 else:
153 url = "wss://{}/api".format(self.endpoint)
154
155 kw = dict()
156 kw['ssl'] = self._get_ssl(self.cacert)
157 kw['loop'] = self.loop
158 self.addr = url
159 self.ws = await websockets.connect(url, **kw)
160 self.monitor.receiver = self.loop.create_task(self.receiver())
161 log.info("Driver connected to juju %s", url)
162 return self
163
164 async def close(self):
165 self.close_called = True
166 await self.ws.close()
167
168 async def recv(self, request_id):
169 if not self.is_open:
170 raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
171 return await self.messages.get(request_id)
172
173 async def receiver(self):
174 while self.is_open:
175 try:
176 result = await self.ws.recv()
177 if result is not None:
178 result = json.loads(result)
179 await self.messages.put(result['request-id'], result)
180 except Exception as e:
181 await self.messages.put_all(e)
182 if isinstance(e, websockets.ConnectionClosed):
183 # ConnectionClosed is not really exceptional for us,
184 # but it may be for any pending message listeners
185 return
186 raise
187
188 async def pinger(self):
189 '''
190 A Controller can time us out if we are silent for too long. This
191 is especially true in JaaS, which has a fairly strict timeout.
192
193 To prevent timing out, we send a ping every ten seconds.
194
195 '''
196 pinger_facade = client.PingerFacade.from_connection(self)
197 while self.is_open:
198 await pinger_facade.Ping()
199 await asyncio.sleep(10)
200
201 async def rpc(self, msg, encoder=None):
202 self.__request_id__ += 1
203 msg['request-id'] = self.__request_id__
204 if'params' not in msg:
205 msg['params'] = {}
206 if "version" not in msg:
207 msg['version'] = self.facades[msg['type']]
208 outgoing = json.dumps(msg, indent=2, cls=encoder)
209 await self.ws.send(outgoing)
210 result = await self.recv(msg['request-id'])
211
212 if not result:
213 return result
214
215 if 'error' in result:
216 # API Error Response
217 raise JujuAPIError(result)
218
219 if 'response' not in result:
220 # This may never happen
221 return result
222
223 if 'results' in result['response']:
224 # Check for errors in a result list.
225 errors = []
226 for res in result['response']['results']:
227 if res.get('error', {}).get('message'):
228 errors.append(res['error']['message'])
229 if errors:
230 raise JujuError(errors)
231
232 elif result['response'].get('error', {}).get('message'):
233 raise JujuError(result['response']['error']['message'])
234
235 return result
236
237 def http_headers(self):
238 """Return dictionary of http headers necessary for making an http
239 connection to the endpoint of this Connection.
240
241 :return: Dictionary of headers
242
243 """
244 if not self.username:
245 return {}
246
247 creds = u'{}:{}'.format(
248 tag.user(self.username),
249 self.password or ''
250 )
251 token = base64.b64encode(creds.encode())
252 return {
253 'Authorization': 'Basic {}'.format(token.decode())
254 }
255
256 def https_connection(self):
257 """Return an https connection to this Connection's endpoint.
258
259 Returns a 3-tuple containing::
260
261 1. The :class:`HTTPSConnection` instance
262 2. Dictionary of auth headers to be used with the connection
263 3. The root url path (str) to be used for requests.
264
265 """
266 endpoint = self.endpoint
267 host, remainder = endpoint.split(':', 1)
268 port = remainder
269 if '/' in remainder:
270 port, _ = remainder.split('/', 1)
271
272 conn = HTTPSConnection(
273 host, int(port),
274 context=self._get_ssl(self.cacert),
275 )
276
277 path = (
278 "/model/{}".format(self.uuid)
279 if self.uuid else ""
280 )
281 return conn, self.http_headers(), path
282
283 async def clone(self):
284 """Return a new Connection, connected to the same websocket endpoint
285 as this one.
286
287 """
288 return await Connection.connect(
289 self.endpoint,
290 self.uuid,
291 self.username,
292 self.password,
293 self.cacert,
294 self.macaroons,
295 self.loop,
296 )
297
298 async def controller(self):
299 """Return a Connection to the controller at self.endpoint
300
301 """
302 return await Connection.connect(
303 self.endpoint,
304 None,
305 self.username,
306 self.password,
307 self.cacert,
308 self.macaroons,
309 self.loop,
310 )
311
312 @classmethod
313 async def connect(
314 cls, endpoint, uuid, username, password, cacert=None,
315 macaroons=None, loop=None):
316 """Connect to the websocket.
317
318 If uuid is None, the connection will be to the controller. Otherwise it
319 will be to the model.
320
321 """
322 client = cls(endpoint, uuid, username, password, cacert, macaroons,
323 loop)
324 await client.open()
325
326 redirect_info = await client.redirect_info()
327 if not redirect_info:
328 await client.login(username, password, macaroons)
329 return client
330
331 await client.close()
332 servers = [
333 s for servers in redirect_info['servers']
334 for s in servers if s["scope"] == 'public'
335 ]
336 for server in servers:
337 client = cls(
338 "{value}:{port}".format(**server), uuid, username,
339 password, redirect_info['ca-cert'], macaroons)
340 await client.open()
341 try:
342 result = await client.login(username, password, macaroons)
343 if 'discharge-required-error' in result:
344 continue
345 return client
346 except Exception as e:
347 await client.close()
348 log.exception(e)
349
350 raise Exception(
351 "Couldn't authenticate to %s", endpoint)
352
353 @classmethod
354 async def connect_current(cls, loop=None):
355 """Connect to the currently active model.
356
357 """
358 jujudata = JujuData()
359 controller_name = jujudata.current_controller()
360 model_name = jujudata.current_model()
361
362 return await cls.connect_model(
363 '{}:{}'.format(controller_name, model_name), loop)
364
365 @classmethod
366 async def connect_current_controller(cls, loop=None):
367 """Connect to the currently active controller.
368
369 """
370 jujudata = JujuData()
371 controller_name = jujudata.current_controller()
372 if not controller_name:
373 raise JujuConnectionError('No current controller')
374
375 return await cls.connect_controller(controller_name, loop)
376
377 @classmethod
378 async def connect_controller(cls, controller_name, loop=None):
379 """Connect to a controller by name.
380
381 """
382 jujudata = JujuData()
383 controller = jujudata.controllers()[controller_name]
384 endpoint = controller['api-endpoints'][0]
385 cacert = controller.get('ca-cert')
386 accounts = jujudata.accounts()[controller_name]
387 username = accounts['user']
388 password = accounts.get('password')
389 macaroons = get_macaroons() if not password else None
390
391 return await cls.connect(
392 endpoint, None, username, password, cacert, macaroons, loop)
393
394 @classmethod
395 async def connect_model(cls, model, loop=None):
396 """Connect to a model by name.
397
398 :param str model: [<controller>:]<model>
399
400 """
401 jujudata = JujuData()
402
403 if ':' in model:
404 # explicit controller given
405 controller_name, model_name = model.split(':')
406 else:
407 # use the current controller if one isn't explicitly given
408 controller_name = jujudata.current_controller()
409 model_name = model
410
411 accounts = jujudata.accounts()[controller_name]
412 username = accounts['user']
413 # model name must include a user prefix, so add it if it doesn't
414 if '/' not in model_name:
415 model_name = '{}/{}'.format(username, model_name)
416
417 controller = jujudata.controllers()[controller_name]
418 endpoint = controller['api-endpoints'][0]
419 cacert = controller.get('ca-cert')
420 password = accounts.get('password')
421 models = jujudata.models()[controller_name]
422 model_uuid = models['models'][model_name]['uuid']
423 macaroons = get_macaroons() if not password else None
424
425 return await cls.connect(
426 endpoint, model_uuid, username, password, cacert, macaroons, loop)
427
428 def build_facades(self, facades):
429 self.facades.clear()
430 # In order to work around an issue where the juju api is not
431 # returning a complete list of facades, we simply look up the
432 # juju version in a pregenerated map, and use that info to
433 # populate our list of facades.
434
435 # TODO: if a future version of juju fixes this bug, restore
436 # the following code for that version and higher:
437 # for facade in facades:
438 # self.facades[facade['name']] = facade['versions'][-1]
439 try:
440 self.facades = VERSION_MAP[self.info['server-version']]
441 except KeyError:
442 log.warning("Could not find a set of facades for {}. Using "
443 "the latest facade set instead".format(
444 self.info['server-version']))
445 self.facades = VERSION_MAP['latest']
446
447 async def login(self, username, password, macaroons=None):
448 if macaroons:
449 username = ''
450 password = ''
451
452 if username and not username.startswith('user-'):
453 username = 'user-{}'.format(username)
454
455 result = await self.rpc({
456 "type": "Admin",
457 "request": "Login",
458 "version": 3,
459 "params": {
460 "auth-tag": username,
461 "credentials": password,
462 "nonce": "".join(random.sample(string.printable, 12)),
463 "macaroons": macaroons or []
464 }})
465 response = result['response']
466 self.info = response.copy()
467 self.build_facades(response.get('facades', {}))
468 # Create a pinger to keep the connection alive (needed for
469 # JaaS; harmless elsewhere).
470 self.loop.create_task(self.pinger())
471 return response
472
473 async def redirect_info(self):
474 try:
475 result = await self.rpc({
476 "type": "Admin",
477 "request": "RedirectInfo",
478 "version": 3,
479 })
480 except JujuAPIError as e:
481 if e.message == 'not redirected':
482 return None
483 raise
484 return result['response']
485
486
487 class JujuData:
488 def __init__(self):
489 self.path = os.environ.get('JUJU_DATA') or '~/.local/share/juju'
490 self.path = os.path.abspath(os.path.expanduser(self.path))
491
492 def current_controller(self):
493 cmd = shlex.split('juju list-controllers --format yaml')
494 output = subprocess.check_output(cmd)
495 output = yaml.safe_load(output)
496 return output.get('current-controller', '')
497
498 def current_model(self, controller_name=None):
499 if not controller_name:
500 controller_name = self.current_controller()
501 models = self.models()[controller_name]
502 if 'current-model' not in models:
503 raise JujuError('No current model')
504 return models['current-model']
505
506 def controllers(self):
507 return self._load_yaml('controllers.yaml', 'controllers')
508
509 def models(self):
510 return self._load_yaml('models.yaml', 'controllers')
511
512 def accounts(self):
513 return self._load_yaml('accounts.yaml', 'controllers')
514
515 def _load_yaml(self, filename, key):
516 filepath = os.path.join(self.path, filename)
517 with io.open(filepath, 'rt') as f:
518 return yaml.safe_load(f)[key]
519
520
521 def get_macaroons():
522 """Decode and return macaroons from default ~/.go-cookies
523
524 """
525 try:
526 cookie_file = os.path.expanduser('~/.go-cookies')
527 with open(cookie_file, 'r') as f:
528 cookies = json.load(f)
529 except (OSError, ValueError):
530 log.warn("Couldn't load macaroons from %s", cookie_file)
531 return []
532
533 base64_macaroons = [
534 c['Value'] for c in cookies
535 if c['Name'].startswith('macaroon-') and c['Value']
536 ]
537
538 return [
539 json.loads(base64.b64decode(value).decode('utf-8'))
540 for value in base64_macaroons
541 ]