6c31ab67130648a6bc52579e02fe87837f61fb7b
12 from http
.client
import HTTPSConnection
18 from juju
.errors
import JujuError
, JujuAPIError
, JujuConnectionError
19 from juju
.utils
import IdQueue
21 log
= logging
.getLogger("websocket")
26 Monitor helper class for our Connection class.
28 Contains a reference to an instantiated Connection, along with a
29 reference to the Connection.receiver Future. Upon inspecttion of
30 these objects, this class determines whether the connection is in
31 an 'error', 'connected' or 'disconnected' state.
33 Use this class to stay up to date on the health of a connection,
34 and take appropriate action if the connection errors out due to
35 network issues or other unexpected circumstances.
39 CONNECTED
= 'connected'
40 DISCONNECTED
= 'disconnected'
43 def __init__(self
, connection
):
44 self
.connection
= connection
50 Determine the status of the connection and receiver, and return
51 ERROR, CONNECTED, or DISCONNECTED as appropriate.
53 For simplicity, we only consider ourselves to be connected
54 after the Connection class has setup a receiver task. This
55 only happens after the websocket is open, and the connection
56 isn't usable until that receiver has been started.
60 # DISCONNECTED: connection not yet open
61 if not self
.connection
.ws
:
62 return self
.DISCONNECTED
64 return self
.DISCONNECTED
66 # ERROR: Connection closed (or errored), but we didn't call
68 if not self
.connection
.close_called
and self
.receiver_exceptions():
70 if not self
.connection
.close_called
and not self
.connection
.ws
.open:
71 # The check for self.receiver existing above guards against the
72 # case where we're not open because we simply haven't
73 # setup the connection yet.
76 # DISCONNECTED: cleanly disconnected.
77 if self
.connection
.close_called
and not self
.connection
.ws
.open:
78 return self
.DISCONNECTED
80 # CONNECTED: everything is fine!
81 if self
.connection
.ws
.open:
84 # UNKNOWN: We should never hit this state -- if we do,
85 # something went wrong with the logic above, and we do not
86 # know what state the connection is in.
89 def receiver_exceptions(self
):
91 Return exceptions in the receiver, if any.
96 if not self
.receiver
.done():
98 return self
.receiver
.exception()
105 # Connect to an arbitrary api server
106 client = await Connection.connect(
107 api_endpoint, model_uuid, username, password, cacert)
109 # Connect using a controller/model name
110 client = await Connection.connect_model('local.local:default')
112 # Connect to the currently active model
113 client = await Connection.connect_current()
115 Note: Any connection method or constructor can accept an optional `loop`
116 argument to override the default event loop from `asyncio.get_event_loop`.
119 self
, endpoint
, uuid
, username
, password
, cacert
=None,
120 macaroons
=None, loop
=None):
121 self
.endpoint
= endpoint
123 self
.username
= username
124 self
.password
= password
125 self
.macaroons
= macaroons
127 self
.loop
= loop
or asyncio
.get_event_loop()
129 self
.__request
_id
__ = 0
133 self
.messages
= IdQueue(loop
=self
.loop
)
134 self
.close_called
= False
135 self
.monitor
= Monitor(connection
=self
)
143 def _get_ssl(self
, cert
=None):
144 return ssl
.create_default_context(
145 purpose
=ssl
.Purpose
.CLIENT_AUTH
, cadata
=cert
)
147 async def open(self
):
149 url
= "wss://{}/model/{}/api".format(self
.endpoint
, self
.uuid
)
151 url
= "wss://{}/api".format(self
.endpoint
)
154 kw
['ssl'] = self
._get
_ssl
(self
.cacert
)
155 kw
['loop'] = self
.loop
157 self
.ws
= await websockets
.connect(url
, **kw
)
158 self
.monitor
.receiver
= self
.loop
.create_task(self
.receiver())
159 log
.info("Driver connected to juju %s", url
)
162 async def close(self
):
163 self
.close_called
= True
164 await self
.ws
.close()
166 async def recv(self
, request_id
):
168 raise websockets
.exceptions
.ConnectionClosed(0, 'websocket closed')
169 return await self
.messages
.get(request_id
)
171 async def receiver(self
):
174 result
= await self
.ws
.recv()
175 if result
is not None:
176 result
= json
.loads(result
)
177 await self
.messages
.put(result
['request-id'], result
)
178 except Exception as e
:
179 await self
.messages
.put_all(e
)
180 if isinstance(e
, websockets
.ConnectionClosed
):
181 # ConnectionClosed is not really exceptional for us,
182 # but it may be for any pending message listeners
186 async def rpc(self
, msg
, encoder
=None):
187 self
.__request
_id
__ += 1
188 msg
['request-id'] = self
.__request
_id
__
189 if'params' not in msg
:
191 if "version" not in msg
:
192 msg
['version'] = self
.facades
[msg
['type']]
193 outgoing
= json
.dumps(msg
, indent
=2, cls
=encoder
)
194 await self
.ws
.send(outgoing
)
195 result
= await self
.recv(msg
['request-id'])
200 if 'error' in result
:
202 raise JujuAPIError(result
)
204 if 'response' not in result
:
205 # This may never happen
208 if 'results' in result
['response']:
209 # Check for errors in a result list.
211 for res
in result
['response']['results']:
212 if res
.get('error', {}).get('message'):
213 errors
.append(res
['error']['message'])
215 raise JujuError(errors
)
217 elif result
['response'].get('error', {}).get('message'):
218 raise JujuError(result
['response']['error']['message'])
222 def http_headers(self
):
223 """Return dictionary of http headers necessary for making an http
224 connection to the endpoint of this Connection.
226 :return: Dictionary of headers
229 if not self
.username
:
232 creds
= u
'{}:{}'.format(
233 tag
.user(self
.username
),
236 token
= base64
.b64encode(creds
.encode())
238 'Authorization': 'Basic {}'.format(token
.decode())
241 def https_connection(self
):
242 """Return an https connection to this Connection's endpoint.
244 Returns a 3-tuple containing::
246 1. The :class:`HTTPSConnection` instance
247 2. Dictionary of auth headers to be used with the connection
248 3. The root url path (str) to be used for requests.
251 endpoint
= self
.endpoint
252 host
, remainder
= endpoint
.split(':', 1)
255 port
, _
= remainder
.split('/', 1)
257 conn
= HTTPSConnection(
259 context
=self
._get
_ssl
(self
.cacert
),
263 "/model/{}".format(self
.uuid
)
266 return conn
, self
.http_headers(), path
268 async def clone(self
):
269 """Return a new Connection, connected to the same websocket endpoint
273 return await Connection
.connect(
283 async def controller(self
):
284 """Return a Connection to the controller at self.endpoint
287 return await Connection
.connect(
299 cls
, endpoint
, uuid
, username
, password
, cacert
=None,
300 macaroons
=None, loop
=None):
301 """Connect to the websocket.
303 If uuid is None, the connection will be to the controller. Otherwise it
304 will be to the model.
307 client
= cls(endpoint
, uuid
, username
, password
, cacert
, macaroons
,
311 redirect_info
= await client
.redirect_info()
312 if not redirect_info
:
313 await client
.login(username
, password
, macaroons
)
318 s
for servers
in redirect_info
['servers']
319 for s
in servers
if s
["scope"] == 'public'
321 for server
in servers
:
323 "{value}:{port}".format(**server
), uuid
, username
,
324 password
, redirect_info
['ca-cert'], macaroons
)
327 result
= await client
.login(username
, password
, macaroons
)
328 if 'discharge-required-error' in result
:
331 except Exception as e
:
336 "Couldn't authenticate to %s", endpoint
)
339 async def connect_current(cls
, loop
=None):
340 """Connect to the currently active model.
343 jujudata
= JujuData()
344 controller_name
= jujudata
.current_controller()
345 model_name
= jujudata
.current_model()
347 return await cls
.connect_model(
348 '{}:{}'.format(controller_name
, model_name
), loop
)
351 async def connect_current_controller(cls
, loop
=None):
352 """Connect to the currently active controller.
355 jujudata
= JujuData()
356 controller_name
= jujudata
.current_controller()
357 if not controller_name
:
358 raise JujuConnectionError('No current controller')
360 return await cls
.connect_controller(controller_name
, loop
)
363 async def connect_controller(cls
, controller_name
, loop
=None):
364 """Connect to a controller by name.
367 jujudata
= JujuData()
368 controller
= jujudata
.controllers()[controller_name
]
369 endpoint
= controller
['api-endpoints'][0]
370 cacert
= controller
.get('ca-cert')
371 accounts
= jujudata
.accounts()[controller_name
]
372 username
= accounts
['user']
373 password
= accounts
.get('password')
374 macaroons
= get_macaroons() if not password
else None
376 return await cls
.connect(
377 endpoint
, None, username
, password
, cacert
, macaroons
, loop
)
380 async def connect_model(cls
, model
, loop
=None):
381 """Connect to a model by name.
383 :param str model: [<controller>:]<model>
386 jujudata
= JujuData()
389 # explicit controller given
390 controller_name
, model_name
= model
.split(':')
392 # use the current controller if one isn't explicitly given
393 controller_name
= jujudata
.current_controller()
396 accounts
= jujudata
.accounts()[controller_name
]
397 username
= accounts
['user']
398 # model name must include a user prefix, so add it if it doesn't
399 if '/' not in model_name
:
400 model_name
= '{}/{}'.format(username
, model_name
)
402 controller
= jujudata
.controllers()[controller_name
]
403 endpoint
= controller
['api-endpoints'][0]
404 cacert
= controller
.get('ca-cert')
405 password
= accounts
.get('password')
406 models
= jujudata
.models()[controller_name
]
407 model_uuid
= models
['models'][model_name
]['uuid']
408 macaroons
= get_macaroons() if not password
else None
410 return await cls
.connect(
411 endpoint
, model_uuid
, username
, password
, cacert
, macaroons
, loop
)
413 def build_facades(self
, info
):
416 self
.facades
[facade
['name']] = facade
['versions'][-1]
418 async def login(self
, username
, password
, macaroons
=None):
423 if username
and not username
.startswith('user-'):
424 username
= 'user-{}'.format(username
)
426 result
= await self
.rpc({
431 "auth-tag": username
,
432 "credentials": password
,
433 "nonce": "".join(random
.sample(string
.printable
, 12)),
434 "macaroons": macaroons
or []
436 response
= result
['response']
437 self
.build_facades(response
.get('facades', {}))
438 self
.info
= response
.copy()
441 async def redirect_info(self
):
443 result
= await self
.rpc({
445 "request": "RedirectInfo",
448 except JujuAPIError
as e
:
449 if e
.message
== 'not redirected':
452 return result
['response']
457 self
.path
= os
.environ
.get('JUJU_DATA') or '~/.local/share/juju'
458 self
.path
= os
.path
.abspath(os
.path
.expanduser(self
.path
))
460 def current_controller(self
):
461 cmd
= shlex
.split('juju list-controllers --format yaml')
462 output
= subprocess
.check_output(cmd
)
463 output
= yaml
.safe_load(output
)
464 return output
.get('current-controller', '')
466 def current_model(self
, controller_name
=None):
467 if not controller_name
:
468 controller_name
= self
.current_controller()
469 models
= self
.models()[controller_name
]
470 if 'current-model' not in models
:
471 raise JujuError('No current model')
472 return models
['current-model']
474 def controllers(self
):
475 return self
._load
_yaml
('controllers.yaml', 'controllers')
478 return self
._load
_yaml
('models.yaml', 'controllers')
481 return self
._load
_yaml
('accounts.yaml', 'controllers')
483 def _load_yaml(self
, filename
, key
):
484 filepath
= os
.path
.join(self
.path
, filename
)
485 with io
.open(filepath
, 'rt') as f
:
486 return yaml
.safe_load(f
)[key
]
490 """Decode and return macaroons from default ~/.go-cookies
494 cookie_file
= os
.path
.expanduser('~/.go-cookies')
495 with
open(cookie_file
, 'r') as f
:
496 cookies
= json
.load(f
)
497 except (OSError, ValueError):
498 log
.warn("Couldn't load macaroons from %s", cookie_file
)
502 c
['Value'] for c
in cookies
503 if c
['Name'].startswith('macaroon-') and c
['Value']
507 json
.loads(base64
.b64decode(value
).decode('utf-8'))
508 for value
in base64_macaroons