12 from concurrent
.futures
import CancelledError
13 from http
.client
import HTTPSConnection
18 from juju
import tag
, utils
19 from juju
.client
import client
20 from juju
.errors
import JujuError
, JujuAPIError
, JujuConnectionError
21 from juju
.utils
import IdQueue
23 log
= logging
.getLogger("websocket")
28 Monitor helper class for our Connection class.
30 Contains a reference to an instantiated Connection, along with a
31 reference to the Connection.receiver Future. Upon inspecttion of
32 these objects, this class determines whether the connection is in
33 an 'error', 'connected' or 'disconnected' state.
35 Use this class to stay up to date on the health of a connection,
36 and take appropriate action if the connection errors out due to
37 network issues or other unexpected circumstances.
41 CONNECTED
= 'connected'
42 DISCONNECTED
= 'disconnected'
45 def __init__(self
, connection
):
46 self
.connection
= connection
47 self
.close_called
= asyncio
.Event(loop
=self
.connection
.loop
)
48 self
.receiver_stopped
= asyncio
.Event(loop
=self
.connection
.loop
)
49 self
.pinger_stopped
= asyncio
.Event(loop
=self
.connection
.loop
)
50 self
.receiver_stopped
.set()
51 self
.pinger_stopped
.set()
56 Determine the status of the connection and receiver, and return
57 ERROR, CONNECTED, or DISCONNECTED as appropriate.
59 For simplicity, we only consider ourselves to be connected
60 after the Connection class has setup a receiver task. This
61 only happens after the websocket is open, and the connection
62 isn't usable until that receiver has been started.
66 # DISCONNECTED: connection not yet open
67 if not self
.connection
.ws
:
68 return self
.DISCONNECTED
69 if self
.receiver_stopped
.is_set():
70 return self
.DISCONNECTED
72 # ERROR: Connection closed (or errored), but we didn't call
74 if not self
.close_called
.is_set() and self
.receiver_stopped
.is_set():
76 if not self
.close_called
.is_set() and not self
.connection
.ws
.open:
77 # The check for self.receiver_stopped existing above guards
78 # against the case where we're not open because we simply
79 # haven't setup the connection yet.
82 # DISCONNECTED: cleanly disconnected.
83 if self
.close_called
.is_set() and not self
.connection
.ws
.open:
84 return self
.DISCONNECTED
86 # CONNECTED: everything is fine!
87 if self
.connection
.ws
.open:
90 # UNKNOWN: We should never hit this state -- if we do,
91 # something went wrong with the logic above, and we do not
92 # know what state the connection is in.
100 # Connect to an arbitrary api server
101 client = await Connection.connect(
102 api_endpoint, model_uuid, username, password, cacert)
104 # Connect using a controller/model name
105 client = await Connection.connect_model('local.local:default')
107 # Connect to the currently active model
108 client = await Connection.connect_current()
110 Note: Any connection method or constructor can accept an optional `loop`
111 argument to override the default event loop from `asyncio.get_event_loop`.
114 self
, endpoint
, uuid
, username
, password
, cacert
=None,
115 macaroons
=None, loop
=None):
116 self
.endpoint
= endpoint
119 self
.macaroons
= macaroons
124 self
.username
= username
125 self
.password
= password
127 self
.loop
= loop
or asyncio
.get_event_loop()
129 self
.__request
_id
__ = 0
133 self
.messages
= IdQueue(loop
=self
.loop
)
134 self
.monitor
= Monitor(connection
=self
)
142 def _get_ssl(self
, cert
=None):
143 return ssl
.create_default_context(
144 purpose
=ssl
.Purpose
.CLIENT_AUTH
, cadata
=cert
)
146 async def open(self
):
148 url
= "wss://{}/model/{}/api".format(self
.endpoint
, self
.uuid
)
150 url
= "wss://{}/api".format(self
.endpoint
)
153 kw
['ssl'] = self
._get
_ssl
(self
.cacert
)
154 kw
['loop'] = self
.loop
156 self
.ws
= await websockets
.connect(url
, **kw
)
157 self
.loop
.create_task(self
.receiver())
158 self
.monitor
.receiver_stopped
.clear()
159 log
.info("Driver connected to juju %s", url
)
160 self
.monitor
.close_called
.clear()
163 async def close(self
):
166 self
.monitor
.close_called
.set()
167 await self
.monitor
.pinger_stopped
.wait()
168 await self
.monitor
.receiver_stopped
.wait()
169 await self
.ws
.close()
171 async def recv(self
, request_id
):
173 raise websockets
.exceptions
.ConnectionClosed(0, 'websocket closed')
174 return await self
.messages
.get(request_id
)
176 async def receiver(self
):
179 result
= await utils
.run_with_interrupt(
181 self
.monitor
.close_called
,
183 if self
.monitor
.close_called
.is_set():
185 if result
is not None:
186 result
= json
.loads(result
)
187 await self
.messages
.put(result
['request-id'], result
)
188 except CancelledError
:
190 except Exception as e
:
191 await self
.messages
.put_all(e
)
192 if isinstance(e
, websockets
.ConnectionClosed
):
193 # ConnectionClosed is not really exceptional for us,
194 # but it may be for any pending message listeners
196 log
.exception("Error in receiver")
199 self
.monitor
.receiver_stopped
.set()
201 async def pinger(self
):
203 A Controller can time us out if we are silent for too long. This
204 is especially true in JaaS, which has a fairly strict timeout.
206 To prevent timing out, we send a ping every ten seconds.
209 async def _do_ping():
211 await pinger_facade
.Ping()
212 await asyncio
.sleep(10, loop
=self
.loop
)
213 except CancelledError
:
216 pinger_facade
= client
.PingerFacade
.from_connection(self
)
219 await utils
.run_with_interrupt(
221 self
.monitor
.close_called
,
223 if self
.monitor
.close_called
.is_set():
226 self
.monitor
.pinger_stopped
.set()
228 async def rpc(self
, msg
, encoder
=None):
229 self
.__request
_id
__ += 1
230 msg
['request-id'] = self
.__request
_id
__
231 if'params' not in msg
:
233 if "version" not in msg
:
234 msg
['version'] = self
.facades
[msg
['type']]
235 outgoing
= json
.dumps(msg
, indent
=2, cls
=encoder
)
236 await self
.ws
.send(outgoing
)
237 result
= await self
.recv(msg
['request-id'])
242 if 'error' in result
:
244 raise JujuAPIError(result
)
246 if 'response' not in result
:
247 # This may never happen
250 if 'results' in result
['response']:
251 # Check for errors in a result list.
253 for res
in result
['response']['results']:
254 if res
.get('error', {}).get('message'):
255 errors
.append(res
['error']['message'])
257 raise JujuError(errors
)
259 elif result
['response'].get('error', {}).get('message'):
260 raise JujuError(result
['response']['error']['message'])
264 def http_headers(self
):
265 """Return dictionary of http headers necessary for making an http
266 connection to the endpoint of this Connection.
268 :return: Dictionary of headers
271 if not self
.username
:
274 creds
= u
'{}:{}'.format(
275 tag
.user(self
.username
),
278 token
= base64
.b64encode(creds
.encode())
280 'Authorization': 'Basic {}'.format(token
.decode())
283 def https_connection(self
):
284 """Return an https connection to this Connection's endpoint.
286 Returns a 3-tuple containing::
288 1. The :class:`HTTPSConnection` instance
289 2. Dictionary of auth headers to be used with the connection
290 3. The root url path (str) to be used for requests.
293 endpoint
= self
.endpoint
294 host
, remainder
= endpoint
.split(':', 1)
297 port
, _
= remainder
.split('/', 1)
299 conn
= HTTPSConnection(
301 context
=self
._get
_ssl
(self
.cacert
),
305 "/model/{}".format(self
.uuid
)
308 return conn
, self
.http_headers(), path
310 async def clone(self
):
311 """Return a new Connection, connected to the same websocket endpoint
315 return await Connection
.connect(
325 async def controller(self
):
326 """Return a Connection to the controller at self.endpoint
329 return await Connection
.connect(
339 async def _try_endpoint(self
, endpoint
, cacert
):
344 self
.endpoint
= endpoint
348 result
= await self
.login()
349 if 'discharge-required-error' in result
['response']:
350 log
.info('Macaroon discharge required, disconnecting')
353 log
.info('Authenticated')
355 except JujuAPIError
as e
:
356 if e
.error_code
!= 'redirection required':
358 log
.info('Controller requested redirect')
359 redirect_info
= await self
.redirect_info()
360 redir_cacert
= redirect_info
['ca-cert']
362 ("{value}:{port}".format(**s
), redir_cacert
)
363 for servers
in redirect_info
['servers']
364 for s
in servers
if s
["scope"] == 'public'
369 return success
, result
, new_endpoints
373 cls
, endpoint
, uuid
, username
, password
, cacert
=None,
374 macaroons
=None, loop
=None):
375 """Connect to the websocket.
377 If uuid is None, the connection will be to the controller. Otherwise it
378 will be to the model.
381 client
= cls(endpoint
, uuid
, username
, password
, cacert
, macaroons
,
383 endpoints
= [(endpoint
, cacert
)]
385 _endpoint
, _cacert
= endpoints
.pop(0)
386 success
, result
, new_endpoints
= await client
._try
_endpoint
(
390 endpoints
.extend(new_endpoints
)
392 # ran out of endpoints without a successful login
393 raise Exception("Couldn't authenticate to {}".format(endpoint
))
395 response
= result
['response']
396 client
.info
= response
.copy()
397 client
.build_facades(response
.get('facades', {}))
398 client
.loop
.create_task(client
.pinger())
399 client
.monitor
.pinger_stopped
.clear()
404 async def connect_current(cls
, loop
=None):
405 """Connect to the currently active model.
408 jujudata
= JujuData()
409 controller_name
= jujudata
.current_controller()
410 model_name
= jujudata
.current_model()
412 return await cls
.connect_model(
413 '{}:{}'.format(controller_name
, model_name
), loop
)
416 async def connect_current_controller(cls
, loop
=None):
417 """Connect to the currently active controller.
420 jujudata
= JujuData()
421 controller_name
= jujudata
.current_controller()
422 if not controller_name
:
423 raise JujuConnectionError('No current controller')
425 return await cls
.connect_controller(controller_name
, loop
)
428 async def connect_controller(cls
, controller_name
, loop
=None):
429 """Connect to a controller by name.
432 jujudata
= JujuData()
433 controller
= jujudata
.controllers()[controller_name
]
434 endpoint
= controller
['api-endpoints'][0]
435 cacert
= controller
.get('ca-cert')
436 accounts
= jujudata
.accounts()[controller_name
]
437 username
= accounts
['user']
438 password
= accounts
.get('password')
439 macaroons
= get_macaroons() if not password
else None
441 return await cls
.connect(
442 endpoint
, None, username
, password
, cacert
, macaroons
, loop
)
445 async def connect_model(cls
, model
, loop
=None):
446 """Connect to a model by name.
448 :param str model: [<controller>:]<model>
451 jujudata
= JujuData()
454 # explicit controller given
455 controller_name
, model_name
= model
.split(':')
457 # use the current controller if one isn't explicitly given
458 controller_name
= jujudata
.current_controller()
461 accounts
= jujudata
.accounts()[controller_name
]
462 username
= accounts
['user']
463 # model name must include a user prefix, so add it if it doesn't
464 if '/' not in model_name
:
465 model_name
= '{}/{}'.format(username
, model_name
)
467 controller
= jujudata
.controllers()[controller_name
]
468 endpoint
= controller
['api-endpoints'][0]
469 cacert
= controller
.get('ca-cert')
470 password
= accounts
.get('password')
471 models
= jujudata
.models()[controller_name
]
472 model_uuid
= models
['models'][model_name
]['uuid']
473 macaroons
= get_macaroons() if not password
else None
475 return await cls
.connect(
476 endpoint
, model_uuid
, username
, password
, cacert
, macaroons
, loop
)
478 def build_facades(self
, facades
):
480 for facade
in facades
:
481 self
.facades
[facade
['name']] = facade
['versions'][-1]
483 async def login(self
):
484 username
= self
.username
485 if username
and not username
.startswith('user-'):
486 username
= 'user-{}'.format(username
)
488 result
= await self
.rpc({
493 "auth-tag": username
,
494 "credentials": self
.password
,
495 "nonce": "".join(random
.sample(string
.printable
, 12)),
496 "macaroons": self
.macaroons
500 async def redirect_info(self
):
502 result
= await self
.rpc({
504 "request": "RedirectInfo",
507 except JujuAPIError
as e
:
508 if e
.message
== 'not redirected':
511 return result
['response']
516 self
.path
= os
.environ
.get('JUJU_DATA') or '~/.local/share/juju'
517 self
.path
= os
.path
.abspath(os
.path
.expanduser(self
.path
))
519 def current_controller(self
):
520 cmd
= shlex
.split('juju list-controllers --format yaml')
521 output
= subprocess
.check_output(cmd
)
522 output
= yaml
.safe_load(output
)
523 return output
.get('current-controller', '')
525 def current_model(self
, controller_name
=None):
526 if not controller_name
:
527 controller_name
= self
.current_controller()
528 models
= self
.models()[controller_name
]
529 if 'current-model' not in models
:
530 raise JujuError('No current model')
531 return models
['current-model']
533 def controllers(self
):
534 return self
._load
_yaml
('controllers.yaml', 'controllers')
537 return self
._load
_yaml
('models.yaml', 'controllers')
540 return self
._load
_yaml
('accounts.yaml', 'controllers')
542 def _load_yaml(self
, filename
, key
):
543 filepath
= os
.path
.join(self
.path
, filename
)
544 with io
.open(filepath
, 'rt') as f
:
545 return yaml
.safe_load(f
)[key
]
549 """Decode and return macaroons from default ~/.go-cookies
553 cookie_file
= os
.path
.expanduser('~/.go-cookies')
554 with
open(cookie_file
, 'r') as f
:
555 cookies
= json
.load(f
)
556 except (OSError, ValueError):
557 log
.warn("Couldn't load macaroons from %s", cookie_file
)
561 c
['Value'] for c
in cookies
562 if c
['Name'].startswith('macaroon-') and c
['Value']
566 json
.loads(base64
.b64decode(value
).decode('utf-8'))
567 for value
in base64_macaroons