4a9766d62da5fa44dfdffef899c7608ec95adca2
12 from http
.client
import HTTPSConnection
18 from juju
.client
import client
19 from juju
.client
.version_map
import VERSION_MAP
20 from juju
.errors
import JujuError
, JujuAPIError
, JujuConnectionError
21 from juju
.utils
import IdQueue
23 log
= logging
.getLogger("websocket")
28 Monitor helper class for our Connection class.
30 Contains a reference to an instantiated Connection, along with a
31 reference to the Connection.receiver Future. Upon inspecttion of
32 these objects, this class determines whether the connection is in
33 an 'error', 'connected' or 'disconnected' state.
35 Use this class to stay up to date on the health of a connection,
36 and take appropriate action if the connection errors out due to
37 network issues or other unexpected circumstances.
41 CONNECTED
= 'connected'
42 DISCONNECTED
= 'disconnected'
45 def __init__(self
, connection
):
46 self
.connection
= connection
52 Determine the status of the connection and receiver, and return
53 ERROR, CONNECTED, or DISCONNECTED as appropriate.
55 For simplicity, we only consider ourselves to be connected
56 after the Connection class has setup a receiver task. This
57 only happens after the websocket is open, and the connection
58 isn't usable until that receiver has been started.
62 # DISCONNECTED: connection not yet open
63 if not self
.connection
.ws
:
64 return self
.DISCONNECTED
66 return self
.DISCONNECTED
68 # ERROR: Connection closed (or errored), but we didn't call
70 if not self
.connection
.close_called
and self
.receiver_exceptions():
72 if not self
.connection
.close_called
and not self
.connection
.ws
.open:
73 # The check for self.receiver existing above guards against the
74 # case where we're not open because we simply haven't
75 # setup the connection yet.
78 # DISCONNECTED: cleanly disconnected.
79 if self
.connection
.close_called
and not self
.connection
.ws
.open:
80 return self
.DISCONNECTED
82 # CONNECTED: everything is fine!
83 if self
.connection
.ws
.open:
86 # UNKNOWN: We should never hit this state -- if we do,
87 # something went wrong with the logic above, and we do not
88 # know what state the connection is in.
91 def receiver_exceptions(self
):
93 Return exceptions in the receiver, if any.
98 if not self
.receiver
.done():
100 return self
.receiver
.exception()
107 # Connect to an arbitrary api server
108 client = await Connection.connect(
109 api_endpoint, model_uuid, username, password, cacert)
111 # Connect using a controller/model name
112 client = await Connection.connect_model('local.local:default')
114 # Connect to the currently active model
115 client = await Connection.connect_current()
117 Note: Any connection method or constructor can accept an optional `loop`
118 argument to override the default event loop from `asyncio.get_event_loop`.
121 self
, endpoint
, uuid
, username
, password
, cacert
=None,
122 macaroons
=None, loop
=None):
123 self
.endpoint
= endpoint
125 self
.username
= username
126 self
.password
= password
127 self
.macaroons
= macaroons
129 self
.loop
= loop
or asyncio
.get_event_loop()
131 self
.__request
_id
__ = 0
135 self
.messages
= IdQueue(loop
=self
.loop
)
136 self
.close_called
= False
137 self
.monitor
= Monitor(connection
=self
)
145 def _get_ssl(self
, cert
=None):
146 return ssl
.create_default_context(
147 purpose
=ssl
.Purpose
.CLIENT_AUTH
, cadata
=cert
)
149 async def open(self
):
151 url
= "wss://{}/model/{}/api".format(self
.endpoint
, self
.uuid
)
153 url
= "wss://{}/api".format(self
.endpoint
)
156 kw
['ssl'] = self
._get
_ssl
(self
.cacert
)
157 kw
['loop'] = self
.loop
159 self
.ws
= await websockets
.connect(url
, **kw
)
160 self
.monitor
.receiver
= self
.loop
.create_task(self
.receiver())
161 log
.info("Driver connected to juju %s", url
)
164 async def close(self
):
165 self
.close_called
= True
166 await self
.ws
.close()
168 async def recv(self
, request_id
):
170 raise websockets
.exceptions
.ConnectionClosed(0, 'websocket closed')
171 return await self
.messages
.get(request_id
)
173 async def receiver(self
):
176 result
= await self
.ws
.recv()
177 if result
is not None:
178 result
= json
.loads(result
)
179 await self
.messages
.put(result
['request-id'], result
)
180 except Exception as e
:
181 await self
.messages
.put_all(e
)
182 if isinstance(e
, websockets
.ConnectionClosed
):
183 # ConnectionClosed is not really exceptional for us,
184 # but it may be for any pending message listeners
188 async def pinger(self
):
190 A Controller can time us out if we are silent for too long. This
191 is especially true in JaaS, which has a fairly strict timeout.
193 To prevent timing out, we send a ping every ten seconds.
196 pinger_facade
= client
.PingerFacade
.from_connection(self
)
198 await pinger_facade
.Ping()
199 await asyncio
.sleep(10)
201 async def rpc(self
, msg
, encoder
=None):
202 self
.__request
_id
__ += 1
203 msg
['request-id'] = self
.__request
_id
__
204 if'params' not in msg
:
206 if "version" not in msg
:
207 msg
['version'] = self
.facades
[msg
['type']]
208 outgoing
= json
.dumps(msg
, indent
=2, cls
=encoder
)
209 await self
.ws
.send(outgoing
)
210 result
= await self
.recv(msg
['request-id'])
215 if 'error' in result
:
217 raise JujuAPIError(result
)
219 if 'response' not in result
:
220 # This may never happen
223 if 'results' in result
['response']:
224 # Check for errors in a result list.
226 for res
in result
['response']['results']:
227 if res
.get('error', {}).get('message'):
228 errors
.append(res
['error']['message'])
230 raise JujuError(errors
)
232 elif result
['response'].get('error', {}).get('message'):
233 raise JujuError(result
['response']['error']['message'])
237 def http_headers(self
):
238 """Return dictionary of http headers necessary for making an http
239 connection to the endpoint of this Connection.
241 :return: Dictionary of headers
244 if not self
.username
:
247 creds
= u
'{}:{}'.format(
248 tag
.user(self
.username
),
251 token
= base64
.b64encode(creds
.encode())
253 'Authorization': 'Basic {}'.format(token
.decode())
256 def https_connection(self
):
257 """Return an https connection to this Connection's endpoint.
259 Returns a 3-tuple containing::
261 1. The :class:`HTTPSConnection` instance
262 2. Dictionary of auth headers to be used with the connection
263 3. The root url path (str) to be used for requests.
266 endpoint
= self
.endpoint
267 host
, remainder
= endpoint
.split(':', 1)
270 port
, _
= remainder
.split('/', 1)
272 conn
= HTTPSConnection(
274 context
=self
._get
_ssl
(self
.cacert
),
278 "/model/{}".format(self
.uuid
)
281 return conn
, self
.http_headers(), path
283 async def clone(self
):
284 """Return a new Connection, connected to the same websocket endpoint
288 return await Connection
.connect(
298 async def controller(self
):
299 """Return a Connection to the controller at self.endpoint
302 return await Connection
.connect(
314 cls
, endpoint
, uuid
, username
, password
, cacert
=None,
315 macaroons
=None, loop
=None):
316 """Connect to the websocket.
318 If uuid is None, the connection will be to the controller. Otherwise it
319 will be to the model.
322 client
= cls(endpoint
, uuid
, username
, password
, cacert
, macaroons
,
326 redirect_info
= await client
.redirect_info()
327 if not redirect_info
:
328 await client
.login(username
, password
, macaroons
)
333 s
for servers
in redirect_info
['servers']
334 for s
in servers
if s
["scope"] == 'public'
336 for server
in servers
:
338 "{value}:{port}".format(**server
), uuid
, username
,
339 password
, redirect_info
['ca-cert'], macaroons
)
342 result
= await client
.login(username
, password
, macaroons
)
343 if 'discharge-required-error' in result
:
346 except Exception as e
:
351 "Couldn't authenticate to %s", endpoint
)
354 async def connect_current(cls
, loop
=None):
355 """Connect to the currently active model.
358 jujudata
= JujuData()
359 controller_name
= jujudata
.current_controller()
360 model_name
= jujudata
.current_model()
362 return await cls
.connect_model(
363 '{}:{}'.format(controller_name
, model_name
), loop
)
366 async def connect_current_controller(cls
, loop
=None):
367 """Connect to the currently active controller.
370 jujudata
= JujuData()
371 controller_name
= jujudata
.current_controller()
372 if not controller_name
:
373 raise JujuConnectionError('No current controller')
375 return await cls
.connect_controller(controller_name
, loop
)
378 async def connect_controller(cls
, controller_name
, loop
=None):
379 """Connect to a controller by name.
382 jujudata
= JujuData()
383 controller
= jujudata
.controllers()[controller_name
]
384 endpoint
= controller
['api-endpoints'][0]
385 cacert
= controller
.get('ca-cert')
386 accounts
= jujudata
.accounts()[controller_name
]
387 username
= accounts
['user']
388 password
= accounts
.get('password')
389 macaroons
= get_macaroons() if not password
else None
391 return await cls
.connect(
392 endpoint
, None, username
, password
, cacert
, macaroons
, loop
)
395 async def connect_model(cls
, model
, loop
=None):
396 """Connect to a model by name.
398 :param str model: [<controller>:]<model>
401 jujudata
= JujuData()
404 # explicit controller given
405 controller_name
, model_name
= model
.split(':')
407 # use the current controller if one isn't explicitly given
408 controller_name
= jujudata
.current_controller()
411 accounts
= jujudata
.accounts()[controller_name
]
412 username
= accounts
['user']
413 # model name must include a user prefix, so add it if it doesn't
414 if '/' not in model_name
:
415 model_name
= '{}/{}'.format(username
, model_name
)
417 controller
= jujudata
.controllers()[controller_name
]
418 endpoint
= controller
['api-endpoints'][0]
419 cacert
= controller
.get('ca-cert')
420 password
= accounts
.get('password')
421 models
= jujudata
.models()[controller_name
]
422 model_uuid
= models
['models'][model_name
]['uuid']
423 macaroons
= get_macaroons() if not password
else None
425 return await cls
.connect(
426 endpoint
, model_uuid
, username
, password
, cacert
, macaroons
, loop
)
428 def build_facades(self
, facades
):
430 # In order to work around an issue where the juju api is not
431 # returning a complete list of facades, we simply look up the
432 # juju version in a pregenerated map, and use that info to
433 # populate our list of facades.
435 # TODO: if a future version of juju fixes this bug, restore
436 # the following code for that version and higher:
437 # for facade in facades:
438 # self.facades[facade['name']] = facade['versions'][-1]
440 self
.facades
= VERSION_MAP
[self
.info
['server-version']]
442 log
.warning("Could not find a set of facades for {}. Using "
443 "the latest facade set instead".format(
444 self
.info
['server-version']))
445 self
.facades
= VERSION_MAP
['latest']
447 async def login(self
, username
, password
, macaroons
=None):
452 if username
and not username
.startswith('user-'):
453 username
= 'user-{}'.format(username
)
455 result
= await self
.rpc({
460 "auth-tag": username
,
461 "credentials": password
,
462 "nonce": "".join(random
.sample(string
.printable
, 12)),
463 "macaroons": macaroons
or []
465 response
= result
['response']
466 self
.info
= response
.copy()
467 self
.build_facades(response
.get('facades', {}))
468 # Create a pinger to keep the connection alive (needed for
469 # JaaS; harmless elsewhere).
470 self
.loop
.create_task(self
.pinger())
473 async def redirect_info(self
):
475 result
= await self
.rpc({
477 "request": "RedirectInfo",
480 except JujuAPIError
as e
:
481 if e
.message
== 'not redirected':
484 return result
['response']
489 self
.path
= os
.environ
.get('JUJU_DATA') or '~/.local/share/juju'
490 self
.path
= os
.path
.abspath(os
.path
.expanduser(self
.path
))
492 def current_controller(self
):
493 cmd
= shlex
.split('juju list-controllers --format yaml')
494 output
= subprocess
.check_output(cmd
)
495 output
= yaml
.safe_load(output
)
496 return output
.get('current-controller', '')
498 def current_model(self
, controller_name
=None):
499 if not controller_name
:
500 controller_name
= self
.current_controller()
501 models
= self
.models()[controller_name
]
502 if 'current-model' not in models
:
503 raise JujuError('No current model')
504 return models
['current-model']
506 def controllers(self
):
507 return self
._load
_yaml
('controllers.yaml', 'controllers')
510 return self
._load
_yaml
('models.yaml', 'controllers')
513 return self
._load
_yaml
('accounts.yaml', 'controllers')
515 def _load_yaml(self
, filename
, key
):
516 filepath
= os
.path
.join(self
.path
, filename
)
517 with io
.open(filepath
, 'rt') as f
:
518 return yaml
.safe_load(f
)[key
]
522 """Decode and return macaroons from default ~/.go-cookies
526 cookie_file
= os
.path
.expanduser('~/.go-cookies')
527 with
open(cookie_file
, 'r') as f
:
528 cookies
= json
.load(f
)
529 except (OSError, ValueError):
530 log
.warn("Couldn't load macaroons from %s", cookie_file
)
534 c
['Value'] for c
in cookies
535 if c
['Name'].startswith('macaroon-') and c
['Value']
539 json
.loads(base64
.b64decode(value
).decode('utf-8'))
540 for value
in base64_macaroons