12 from concurrent
.futures
import CancelledError
13 from http
.client
import HTTPSConnection
14 from pathlib
import Path
19 from juju
import tag
, utils
20 from juju
.client
import client
21 from juju
.errors
import JujuError
, JujuAPIError
, JujuConnectionError
22 from juju
.utils
import IdQueue
24 log
= logging
.getLogger("websocket")
29 Monitor helper class for our Connection class.
31 Contains a reference to an instantiated Connection, along with a
32 reference to the Connection.receiver Future. Upon inspecttion of
33 these objects, this class determines whether the connection is in
34 an 'error', 'connected' or 'disconnected' state.
36 Use this class to stay up to date on the health of a connection,
37 and take appropriate action if the connection errors out due to
38 network issues or other unexpected circumstances.
42 CONNECTED
= 'connected'
43 DISCONNECTED
= 'disconnected'
46 def __init__(self
, connection
):
47 self
.connection
= connection
48 self
.close_called
= asyncio
.Event(loop
=self
.connection
.loop
)
49 self
.receiver_stopped
= asyncio
.Event(loop
=self
.connection
.loop
)
50 self
.pinger_stopped
= asyncio
.Event(loop
=self
.connection
.loop
)
51 self
.receiver_stopped
.set()
52 self
.pinger_stopped
.set()
57 Determine the status of the connection and receiver, and return
58 ERROR, CONNECTED, or DISCONNECTED as appropriate.
60 For simplicity, we only consider ourselves to be connected
61 after the Connection class has setup a receiver task. This
62 only happens after the websocket is open, and the connection
63 isn't usable until that receiver has been started.
67 # DISCONNECTED: connection not yet open
68 if not self
.connection
.ws
:
69 return self
.DISCONNECTED
70 if self
.receiver_stopped
.is_set():
71 return self
.DISCONNECTED
73 # ERROR: Connection closed (or errored), but we didn't call
75 if not self
.close_called
.is_set() and self
.receiver_stopped
.is_set():
77 if not self
.close_called
.is_set() and not self
.connection
.ws
.open:
78 # The check for self.receiver_stopped existing above guards
79 # against the case where we're not open because we simply
80 # haven't setup the connection yet.
83 # DISCONNECTED: cleanly disconnected.
84 if self
.close_called
.is_set() and not self
.connection
.ws
.open:
85 return self
.DISCONNECTED
87 # CONNECTED: everything is fine!
88 if self
.connection
.ws
.open:
91 # UNKNOWN: We should never hit this state -- if we do,
92 # something went wrong with the logic above, and we do not
93 # know what state the connection is in.
101 # Connect to an arbitrary api server
102 client = await Connection.connect(
103 api_endpoint, model_uuid, username, password, cacert)
105 # Connect using a controller/model name
106 client = await Connection.connect_model('local.local:default')
108 # Connect to the currently active model
109 client = await Connection.connect_current()
111 Note: Any connection method or constructor can accept an optional `loop`
112 argument to override the default event loop from `asyncio.get_event_loop`.
115 DEFAULT_FRAME_SIZE
= 'default_frame_size'
116 MAX_FRAME_SIZE
= 2**22
117 "Maximum size for a single frame. Defaults to 4MB."
120 self
, endpoint
, uuid
, username
, password
, cacert
=None,
121 macaroons
=None, loop
=None, max_frame_size
=DEFAULT_FRAME_SIZE
):
122 self
.endpoint
= endpoint
125 self
.macaroons
= macaroons
130 self
.username
= username
131 self
.password
= password
133 self
.loop
= loop
or asyncio
.get_event_loop()
135 self
.__request
_id
__ = 0
139 self
.messages
= IdQueue(loop
=self
.loop
)
140 self
.monitor
= Monitor(connection
=self
)
141 if max_frame_size
is self
.DEFAULT_FRAME_SIZE
:
142 max_frame_size
= self
.MAX_FRAME_SIZE
143 self
.max_frame_size
= max_frame_size
151 def _get_ssl(self
, cert
=None):
152 return ssl
.create_default_context(
153 purpose
=ssl
.Purpose
.CLIENT_AUTH
, cadata
=cert
)
155 async def open(self
):
157 url
= "wss://{}/model/{}/api".format(self
.endpoint
, self
.uuid
)
159 url
= "wss://{}/api".format(self
.endpoint
)
162 kw
['ssl'] = self
._get
_ssl
(self
.cacert
)
163 kw
['loop'] = self
.loop
164 kw
['max_size'] = self
.max_frame_size
166 self
.ws
= await websockets
.connect(url
, **kw
)
167 self
.loop
.create_task(self
.receiver())
168 self
.monitor
.receiver_stopped
.clear()
169 log
.info("Driver connected to juju %s", url
)
170 self
.monitor
.close_called
.clear()
173 async def close(self
):
176 self
.monitor
.close_called
.set()
177 await self
.monitor
.pinger_stopped
.wait()
178 await self
.monitor
.receiver_stopped
.wait()
179 await self
.ws
.close()
181 async def recv(self
, request_id
):
183 raise websockets
.exceptions
.ConnectionClosed(0, 'websocket closed')
184 return await self
.messages
.get(request_id
)
186 async def receiver(self
):
189 result
= await utils
.run_with_interrupt(
191 self
.monitor
.close_called
,
193 if self
.monitor
.close_called
.is_set():
195 if result
is not None:
196 result
= json
.loads(result
)
197 await self
.messages
.put(result
['request-id'], result
)
198 except CancelledError
:
200 except Exception as e
:
201 await self
.messages
.put_all(e
)
202 if isinstance(e
, websockets
.ConnectionClosed
):
203 # ConnectionClosed is not really exceptional for us,
204 # but it may be for any pending message listeners
206 log
.exception("Error in receiver")
209 self
.monitor
.receiver_stopped
.set()
211 async def pinger(self
):
213 A Controller can time us out if we are silent for too long. This
214 is especially true in JaaS, which has a fairly strict timeout.
216 To prevent timing out, we send a ping every ten seconds.
219 async def _do_ping():
221 await pinger_facade
.Ping()
222 await asyncio
.sleep(10, loop
=self
.loop
)
223 except CancelledError
:
226 pinger_facade
= client
.PingerFacade
.from_connection(self
)
229 await utils
.run_with_interrupt(
231 self
.monitor
.close_called
,
233 if self
.monitor
.close_called
.is_set():
236 self
.monitor
.pinger_stopped
.set()
238 async def rpc(self
, msg
, encoder
=None):
239 self
.__request
_id
__ += 1
240 msg
['request-id'] = self
.__request
_id
__
241 if'params' not in msg
:
243 if "version" not in msg
:
244 msg
['version'] = self
.facades
[msg
['type']]
245 outgoing
= json
.dumps(msg
, indent
=2, cls
=encoder
)
246 await self
.ws
.send(outgoing
)
247 result
= await self
.recv(msg
['request-id'])
252 if 'error' in result
:
254 raise JujuAPIError(result
)
256 if 'response' not in result
:
257 # This may never happen
260 if 'results' in result
['response']:
261 # Check for errors in a result list.
263 for res
in result
['response']['results']:
264 if res
.get('error', {}).get('message'):
265 errors
.append(res
['error']['message'])
267 raise JujuError(errors
)
269 elif result
['response'].get('error', {}).get('message'):
270 raise JujuError(result
['response']['error']['message'])
274 def http_headers(self
):
275 """Return dictionary of http headers necessary for making an http
276 connection to the endpoint of this Connection.
278 :return: Dictionary of headers
281 if not self
.username
:
284 creds
= u
'{}:{}'.format(
285 tag
.user(self
.username
),
288 token
= base64
.b64encode(creds
.encode())
290 'Authorization': 'Basic {}'.format(token
.decode())
293 def https_connection(self
):
294 """Return an https connection to this Connection's endpoint.
296 Returns a 3-tuple containing::
298 1. The :class:`HTTPSConnection` instance
299 2. Dictionary of auth headers to be used with the connection
300 3. The root url path (str) to be used for requests.
303 endpoint
= self
.endpoint
304 host
, remainder
= endpoint
.split(':', 1)
307 port
, _
= remainder
.split('/', 1)
309 conn
= HTTPSConnection(
311 context
=self
._get
_ssl
(self
.cacert
),
315 "/model/{}".format(self
.uuid
)
318 return conn
, self
.http_headers(), path
320 async def clone(self
):
321 """Return a new Connection, connected to the same websocket endpoint
325 return await Connection
.connect(
336 async def controller(self
):
337 """Return a Connection to the controller at self.endpoint
340 return await Connection
.connect(
350 async def _try_endpoint(self
, endpoint
, cacert
):
355 self
.endpoint
= endpoint
359 result
= await self
.login()
360 if 'discharge-required-error' in result
['response']:
361 log
.info('Macaroon discharge required, disconnecting')
364 log
.info('Authenticated')
366 except JujuAPIError
as e
:
367 if e
.error_code
!= 'redirection required':
369 log
.info('Controller requested redirect')
370 redirect_info
= await self
.redirect_info()
371 redir_cacert
= redirect_info
['ca-cert']
373 ("{value}:{port}".format(**s
), redir_cacert
)
374 for servers
in redirect_info
['servers']
375 for s
in servers
if s
["scope"] == 'public'
380 return success
, result
, new_endpoints
384 cls
, endpoint
, uuid
, username
, password
, cacert
=None,
385 macaroons
=None, loop
=None, max_frame_size
=None):
386 """Connect to the websocket.
388 If uuid is None, the connection will be to the controller. Otherwise it
389 will be to the model.
392 client
= cls(endpoint
, uuid
, username
, password
, cacert
, macaroons
,
393 loop
, max_frame_size
)
394 endpoints
= [(endpoint
, cacert
)]
396 _endpoint
, _cacert
= endpoints
.pop(0)
397 success
, result
, new_endpoints
= await client
._try
_endpoint
(
401 endpoints
.extend(new_endpoints
)
403 # ran out of endpoints without a successful login
404 raise Exception("Couldn't authenticate to {}".format(endpoint
))
406 response
= result
['response']
407 client
.info
= response
.copy()
408 client
.build_facades(response
.get('facades', {}))
409 client
.loop
.create_task(client
.pinger())
410 client
.monitor
.pinger_stopped
.clear()
415 async def connect_current(cls
, loop
=None, max_frame_size
=None):
416 """Connect to the currently active model.
419 jujudata
= JujuData()
421 controller_name
= jujudata
.current_controller()
422 if not controller_name
:
423 raise JujuConnectionError('No current controller')
425 model_name
= jujudata
.current_model()
427 return await cls
.connect_model(
428 '{}:{}'.format(controller_name
, model_name
), loop
, max_frame_size
)
431 async def connect_current_controller(cls
, loop
=None, max_frame_size
=None):
432 """Connect to the currently active controller.
435 jujudata
= JujuData()
436 controller_name
= jujudata
.current_controller()
437 if not controller_name
:
438 raise JujuConnectionError('No current controller')
440 return await cls
.connect_controller(controller_name
, loop
,
444 async def connect_controller(cls
, controller_name
, loop
=None,
445 max_frame_size
=None):
446 """Connect to a controller by name.
449 jujudata
= JujuData()
450 controller
= jujudata
.controllers()[controller_name
]
451 endpoint
= controller
['api-endpoints'][0]
452 cacert
= controller
.get('ca-cert')
453 accounts
= jujudata
.accounts()[controller_name
]
454 username
= accounts
['user']
455 password
= accounts
.get('password')
456 macaroons
= get_macaroons(controller_name
) if not password
else None
458 return await cls
.connect(
459 endpoint
, None, username
, password
, cacert
, macaroons
, loop
,
463 async def connect_model(cls
, model
, loop
=None, max_frame_size
=None):
464 """Connect to a model by name.
466 :param str model: [<controller>:]<model>
469 jujudata
= JujuData()
472 # explicit controller given
473 controller_name
, model_name
= model
.split(':')
475 # use the current controller if one isn't explicitly given
476 controller_name
= jujudata
.current_controller()
479 accounts
= jujudata
.accounts()[controller_name
]
480 username
= accounts
['user']
481 # model name must include a user prefix, so add it if it doesn't
482 if '/' not in model_name
:
483 model_name
= '{}/{}'.format(username
, model_name
)
485 controller
= jujudata
.controllers()[controller_name
]
486 endpoint
= controller
['api-endpoints'][0]
487 cacert
= controller
.get('ca-cert')
488 password
= accounts
.get('password')
489 models
= jujudata
.models()[controller_name
]
490 model_uuid
= models
['models'][model_name
]['uuid']
491 macaroons
= get_macaroons(controller_name
) if not password
else None
493 return await cls
.connect(
494 endpoint
, model_uuid
, username
, password
, cacert
, macaroons
, loop
,
497 def build_facades(self
, facades
):
499 for facade
in facades
:
500 self
.facades
[facade
['name']] = facade
['versions'][-1]
502 async def login(self
):
503 username
= self
.username
504 if username
and not username
.startswith('user-'):
505 username
= 'user-{}'.format(username
)
507 result
= await self
.rpc({
512 "auth-tag": username
,
513 "credentials": self
.password
,
514 "nonce": "".join(random
.sample(string
.printable
, 12)),
515 "macaroons": self
.macaroons
519 async def redirect_info(self
):
521 result
= await self
.rpc({
523 "request": "RedirectInfo",
526 except JujuAPIError
as e
:
527 if e
.message
== 'not redirected':
530 return result
['response']
535 self
.path
= os
.environ
.get('JUJU_DATA') or '~/.local/share/juju'
536 self
.path
= os
.path
.abspath(os
.path
.expanduser(self
.path
))
538 def current_controller(self
):
539 cmd
= shlex
.split('juju list-controllers --format yaml')
540 output
= subprocess
.check_output(cmd
)
541 output
= yaml
.safe_load(output
)
542 return output
.get('current-controller', '')
544 def current_model(self
, controller_name
=None):
545 if not controller_name
:
546 controller_name
= self
.current_controller()
547 models
= self
.models()[controller_name
]
548 if 'current-model' not in models
:
549 raise JujuError('No current model')
550 return models
['current-model']
552 def controllers(self
):
553 return self
._load
_yaml
('controllers.yaml', 'controllers')
556 return self
._load
_yaml
('models.yaml', 'controllers')
559 return self
._load
_yaml
('accounts.yaml', 'controllers')
561 def _load_yaml(self
, filename
, key
):
562 filepath
= os
.path
.join(self
.path
, filename
)
563 with io
.open(filepath
, 'rt') as f
:
564 return yaml
.safe_load(f
)[key
]
567 def get_macaroons(controller_name
=None):
568 """Decode and return macaroons from default ~/.go-cookies
573 cookie_files
.append('~/.local/share/juju/cookies/{}.json'.format(
575 cookie_files
.append('~/.go-cookies')
576 for cookie_file
in cookie_files
:
577 cookie_file
= Path(cookie_file
).expanduser()
578 if cookie_file
.exists():
580 cookies
= json
.loads(cookie_file
.read_text())
582 except (OSError, ValueError):
583 log
.warn("Couldn't load macaroons from %s", cookie_file
)
586 log
.warn("Couldn't load macaroons from %s", ' or '.join(cookie_files
))
590 c
['Value'] for c
in cookies
591 if c
['Name'].startswith('macaroon-') and c
['Value']
595 json
.loads(base64
.b64decode(value
).decode('utf-8'))
596 for value
in base64_macaroons