Added Monitor class to Connection. (#105)
[osm/N2VC.git] / juju / client / connection.py
1 import base64
2 import io
3 import json
4 import logging
5 import os
6 import random
7 import shlex
8 import ssl
9 import string
10 import subprocess
11 import websockets
12 from http.client import HTTPSConnection
13
14 import asyncio
15 import yaml
16
17 from juju import tag
18 from juju.errors import JujuError, JujuAPIError, JujuConnectionError
19 from juju.utils import IdQueue
20
21 log = logging.getLogger("websocket")
22
23
24 class Monitor:
25 """
26 Monitor helper class for our Connection class.
27
28 Contains a reference to an instantiated Connection, along with a
29 reference to the Connection.receiver Future. Upon inspecttion of
30 these objects, this class determines whether the connection is in
31 an 'error', 'connected' or 'disconnected' state.
32
33 Use this class to stay up to date on the health of a connection,
34 and take appropriate action if the connection errors out due to
35 network issues or other unexpected circumstances.
36
37 """
38 ERROR = 'error'
39 CONNECTED = 'connected'
40 DISCONNECTED = 'disconnected'
41 UNKNOWN = 'unknown'
42
43 def __init__(self, connection):
44 self.connection = connection
45 self.receiver = None
46
47 @property
48 def status(self):
49 """
50 Determine the status of the connection and receiver, and return
51 ERROR, CONNECTED, or DISCONNECTED as appropriate.
52
53 For simplicity, we only consider ourselves to be connected
54 after the Connection class has setup a receiver task. This
55 only happens after the websocket is open, and the connection
56 isn't usable until that receiver has been started.
57
58 """
59
60 # DISCONNECTED: connection not yet open
61 if not self.connection.ws:
62 return self.DISCONNECTED
63 if not self.receiver:
64 return self.DISCONNECTED
65
66 # ERROR: Connection closed (or errored), but we didn't call
67 # connection.close
68 if not self.connection.close_called and self.receiver_exceptions():
69 return self.ERROR
70 if not self.connection.close_called and not self.connection.ws.open:
71 # The check for self.receiver existing above guards against the
72 # case where we're not open because we simply haven't
73 # setup the connection yet.
74 return self.ERROR
75
76 # DISCONNECTED: cleanly disconnected.
77 if self.connection.close_called and not self.connection.ws.open:
78 return self.DISCONNECTED
79
80 # CONNECTED: everything is fine!
81 if self.connection.ws.open:
82 return self.CONNECTED
83
84 # UNKNOWN: We should never hit this state -- if we do,
85 # something went wrong with the logic above, and we do not
86 # know what state the connection is in.
87 return self.UNKNOWN
88
89 def receiver_exceptions(self):
90 """
91 Return exceptions in the receiver, if any.
92
93 """
94 if not self.receiver:
95 return None
96 if not self.receiver.done():
97 return None
98 return self.receiver.exception()
99
100
101 class Connection:
102 """
103 Usage::
104
105 # Connect to an arbitrary api server
106 client = await Connection.connect(
107 api_endpoint, model_uuid, username, password, cacert)
108
109 # Connect using a controller/model name
110 client = await Connection.connect_model('local.local:default')
111
112 # Connect to the currently active model
113 client = await Connection.connect_current()
114
115 Note: Any connection method or constructor can accept an optional `loop`
116 argument to override the default event loop from `asyncio.get_event_loop`.
117 """
118 def __init__(
119 self, endpoint, uuid, username, password, cacert=None,
120 macaroons=None, loop=None):
121 self.endpoint = endpoint
122 self.uuid = uuid
123 self.username = username
124 self.password = password
125 self.macaroons = macaroons
126 self.cacert = cacert
127 self.loop = loop or asyncio.get_event_loop()
128
129 self.__request_id__ = 0
130 self.addr = None
131 self.ws = None
132 self.facades = {}
133 self.messages = IdQueue(loop=self.loop)
134 self.close_called = False
135 self.monitor = Monitor(connection=self)
136
137 @property
138 def is_open(self):
139 if self.ws:
140 return self.ws.open
141 return False
142
143 def _get_ssl(self, cert=None):
144 return ssl.create_default_context(
145 purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
146
147 async def open(self):
148 if self.uuid:
149 url = "wss://{}/model/{}/api".format(self.endpoint, self.uuid)
150 else:
151 url = "wss://{}/api".format(self.endpoint)
152
153 kw = dict()
154 kw['ssl'] = self._get_ssl(self.cacert)
155 kw['loop'] = self.loop
156 self.addr = url
157 self.ws = await websockets.connect(url, **kw)
158 self.monitor.receiver = self.loop.create_task(self.receiver())
159 log.info("Driver connected to juju %s", url)
160 return self
161
162 async def close(self):
163 self.close_called = True
164 await self.ws.close()
165
166 async def recv(self, request_id):
167 if not self.is_open:
168 raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
169 return await self.messages.get(request_id)
170
171 async def receiver(self):
172 while self.is_open:
173 try:
174 result = await self.ws.recv()
175 if result is not None:
176 result = json.loads(result)
177 await self.messages.put(result['request-id'], result)
178 except Exception as e:
179 await self.messages.put_all(e)
180 if isinstance(e, websockets.ConnectionClosed):
181 # ConnectionClosed is not really exceptional for us,
182 # but it may be for any pending message listeners
183 return
184 raise
185
186 async def rpc(self, msg, encoder=None):
187 self.__request_id__ += 1
188 msg['request-id'] = self.__request_id__
189 if'params' not in msg:
190 msg['params'] = {}
191 if "version" not in msg:
192 msg['version'] = self.facades[msg['type']]
193 outgoing = json.dumps(msg, indent=2, cls=encoder)
194 await self.ws.send(outgoing)
195 result = await self.recv(msg['request-id'])
196
197 if not result:
198 return result
199
200 if 'error' in result:
201 # API Error Response
202 raise JujuAPIError(result)
203
204 if 'response' not in result:
205 # This may never happen
206 return result
207
208 if 'results' in result['response']:
209 # Check for errors in a result list.
210 errors = []
211 for res in result['response']['results']:
212 if res.get('error', {}).get('message'):
213 errors.append(res['error']['message'])
214 if errors:
215 raise JujuError(errors)
216
217 elif result['response'].get('error', {}).get('message'):
218 raise JujuError(result['response']['error']['message'])
219
220 return result
221
222 def http_headers(self):
223 """Return dictionary of http headers necessary for making an http
224 connection to the endpoint of this Connection.
225
226 :return: Dictionary of headers
227
228 """
229 if not self.username:
230 return {}
231
232 creds = u'{}:{}'.format(
233 tag.user(self.username),
234 self.password or ''
235 )
236 token = base64.b64encode(creds.encode())
237 return {
238 'Authorization': 'Basic {}'.format(token.decode())
239 }
240
241 def https_connection(self):
242 """Return an https connection to this Connection's endpoint.
243
244 Returns a 3-tuple containing::
245
246 1. The :class:`HTTPSConnection` instance
247 2. Dictionary of auth headers to be used with the connection
248 3. The root url path (str) to be used for requests.
249
250 """
251 endpoint = self.endpoint
252 host, remainder = endpoint.split(':', 1)
253 port = remainder
254 if '/' in remainder:
255 port, _ = remainder.split('/', 1)
256
257 conn = HTTPSConnection(
258 host, int(port),
259 context=self._get_ssl(self.cacert),
260 )
261
262 path = (
263 "/model/{}".format(self.uuid)
264 if self.uuid else ""
265 )
266 return conn, self.http_headers(), path
267
268 async def clone(self):
269 """Return a new Connection, connected to the same websocket endpoint
270 as this one.
271
272 """
273 return await Connection.connect(
274 self.endpoint,
275 self.uuid,
276 self.username,
277 self.password,
278 self.cacert,
279 self.macaroons,
280 self.loop,
281 )
282
283 async def controller(self):
284 """Return a Connection to the controller at self.endpoint
285
286 """
287 return await Connection.connect(
288 self.endpoint,
289 None,
290 self.username,
291 self.password,
292 self.cacert,
293 self.macaroons,
294 self.loop,
295 )
296
297 @classmethod
298 async def connect(
299 cls, endpoint, uuid, username, password, cacert=None,
300 macaroons=None, loop=None):
301 """Connect to the websocket.
302
303 If uuid is None, the connection will be to the controller. Otherwise it
304 will be to the model.
305
306 """
307 client = cls(endpoint, uuid, username, password, cacert, macaroons,
308 loop)
309 await client.open()
310
311 redirect_info = await client.redirect_info()
312 if not redirect_info:
313 await client.login(username, password, macaroons)
314 return client
315
316 await client.close()
317 servers = [
318 s for servers in redirect_info['servers']
319 for s in servers if s["scope"] == 'public'
320 ]
321 for server in servers:
322 client = cls(
323 "{value}:{port}".format(**server), uuid, username,
324 password, redirect_info['ca-cert'], macaroons)
325 await client.open()
326 try:
327 result = await client.login(username, password, macaroons)
328 if 'discharge-required-error' in result:
329 continue
330 return client
331 except Exception as e:
332 await client.close()
333 log.exception(e)
334
335 raise Exception(
336 "Couldn't authenticate to %s", endpoint)
337
338 @classmethod
339 async def connect_current(cls, loop=None):
340 """Connect to the currently active model.
341
342 """
343 jujudata = JujuData()
344 controller_name = jujudata.current_controller()
345 model_name = jujudata.current_model()
346
347 return await cls.connect_model(
348 '{}:{}'.format(controller_name, model_name), loop)
349
350 @classmethod
351 async def connect_current_controller(cls, loop=None):
352 """Connect to the currently active controller.
353
354 """
355 jujudata = JujuData()
356 controller_name = jujudata.current_controller()
357 if not controller_name:
358 raise JujuConnectionError('No current controller')
359
360 return await cls.connect_controller(controller_name, loop)
361
362 @classmethod
363 async def connect_controller(cls, controller_name, loop=None):
364 """Connect to a controller by name.
365
366 """
367 jujudata = JujuData()
368 controller = jujudata.controllers()[controller_name]
369 endpoint = controller['api-endpoints'][0]
370 cacert = controller.get('ca-cert')
371 accounts = jujudata.accounts()[controller_name]
372 username = accounts['user']
373 password = accounts.get('password')
374 macaroons = get_macaroons() if not password else None
375
376 return await cls.connect(
377 endpoint, None, username, password, cacert, macaroons, loop)
378
379 @classmethod
380 async def connect_model(cls, model, loop=None):
381 """Connect to a model by name.
382
383 :param str model: [<controller>:]<model>
384
385 """
386 jujudata = JujuData()
387
388 if ':' in model:
389 # explicit controller given
390 controller_name, model_name = model.split(':')
391 else:
392 # use the current controller if one isn't explicitly given
393 controller_name = jujudata.current_controller()
394 model_name = model
395
396 accounts = jujudata.accounts()[controller_name]
397 username = accounts['user']
398 # model name must include a user prefix, so add it if it doesn't
399 if '/' not in model_name:
400 model_name = '{}/{}'.format(username, model_name)
401
402 controller = jujudata.controllers()[controller_name]
403 endpoint = controller['api-endpoints'][0]
404 cacert = controller.get('ca-cert')
405 password = accounts.get('password')
406 models = jujudata.models()[controller_name]
407 model_uuid = models['models'][model_name]['uuid']
408 macaroons = get_macaroons() if not password else None
409
410 return await cls.connect(
411 endpoint, model_uuid, username, password, cacert, macaroons, loop)
412
413 def build_facades(self, info):
414 self.facades.clear()
415 for facade in info:
416 self.facades[facade['name']] = facade['versions'][-1]
417
418 async def login(self, username, password, macaroons=None):
419 if macaroons:
420 username = ''
421 password = ''
422
423 if username and not username.startswith('user-'):
424 username = 'user-{}'.format(username)
425
426 result = await self.rpc({
427 "type": "Admin",
428 "request": "Login",
429 "version": 3,
430 "params": {
431 "auth-tag": username,
432 "credentials": password,
433 "nonce": "".join(random.sample(string.printable, 12)),
434 "macaroons": macaroons or []
435 }})
436 response = result['response']
437 self.build_facades(response.get('facades', {}))
438 self.info = response.copy()
439 return response
440
441 async def redirect_info(self):
442 try:
443 result = await self.rpc({
444 "type": "Admin",
445 "request": "RedirectInfo",
446 "version": 3,
447 })
448 except JujuAPIError as e:
449 if e.message == 'not redirected':
450 return None
451 raise
452 return result['response']
453
454
455 class JujuData:
456 def __init__(self):
457 self.path = os.environ.get('JUJU_DATA') or '~/.local/share/juju'
458 self.path = os.path.abspath(os.path.expanduser(self.path))
459
460 def current_controller(self):
461 cmd = shlex.split('juju list-controllers --format yaml')
462 output = subprocess.check_output(cmd)
463 output = yaml.safe_load(output)
464 return output.get('current-controller', '')
465
466 def current_model(self, controller_name=None):
467 if not controller_name:
468 controller_name = self.current_controller()
469 models = self.models()[controller_name]
470 if 'current-model' not in models:
471 raise JujuError('No current model')
472 return models['current-model']
473
474 def controllers(self):
475 return self._load_yaml('controllers.yaml', 'controllers')
476
477 def models(self):
478 return self._load_yaml('models.yaml', 'controllers')
479
480 def accounts(self):
481 return self._load_yaml('accounts.yaml', 'controllers')
482
483 def _load_yaml(self, filename, key):
484 filepath = os.path.join(self.path, filename)
485 with io.open(filepath, 'rt') as f:
486 return yaml.safe_load(f)[key]
487
488
489 def get_macaroons():
490 """Decode and return macaroons from default ~/.go-cookies
491
492 """
493 try:
494 cookie_file = os.path.expanduser('~/.go-cookies')
495 with open(cookie_file, 'r') as f:
496 cookies = json.load(f)
497 except (OSError, ValueError):
498 log.warn("Couldn't load macaroons from %s", cookie_file)
499 return []
500
501 base64_macaroons = [
502 c['Value'] for c in cookies
503 if c['Name'].startswith('macaroon-') and c['Value']
504 ]
505
506 return [
507 json.loads(base64.b64decode(value).decode('utf-8'))
508 for value in base64_macaroons
509 ]