import websockets
from http.client import HTTPSConnection
+import asyncio
import yaml
from juju import tag
-from juju.errors import JujuAPIError, JujuConnectionError
+from juju.errors import JujuError, JujuAPIError, JujuConnectionError
+from juju.utils import IdQueue
log = logging.getLogger("websocket")
+class Monitor:
+ """
+ Monitor helper class for our Connection class.
+
+ Contains a reference to an instantiated Connection, along with a
+ reference to the Connection.receiver Future. Upon inspecttion of
+ these objects, this class determines whether the connection is in
+ an 'error', 'connected' or 'disconnected' state.
+
+ Use this class to stay up to date on the health of a connection,
+ and take appropriate action if the connection errors out due to
+ network issues or other unexpected circumstances.
+
+ """
+ ERROR = 'error'
+ CONNECTED = 'connected'
+ DISCONNECTED = 'disconnected'
+ UNKNOWN = 'unknown'
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.receiver = None
+
+ @property
+ def status(self):
+ """
+ Determine the status of the connection and receiver, and return
+ ERROR, CONNECTED, or DISCONNECTED as appropriate.
+
+ For simplicity, we only consider ourselves to be connected
+ after the Connection class has setup a receiver task. This
+ only happens after the websocket is open, and the connection
+ isn't usable until that receiver has been started.
+
+ """
+
+ # DISCONNECTED: connection not yet open
+ if not self.connection.ws:
+ return self.DISCONNECTED
+ if not self.receiver:
+ return self.DISCONNECTED
+
+ # ERROR: Connection closed (or errored), but we didn't call
+ # connection.close
+ if not self.connection.close_called and self.receiver_exceptions():
+ return self.ERROR
+ if not self.connection.close_called and not self.connection.ws.open:
+ # The check for self.receiver existing above guards against the
+ # case where we're not open because we simply haven't
+ # setup the connection yet.
+ return self.ERROR
+
+ # DISCONNECTED: cleanly disconnected.
+ if self.connection.close_called and not self.connection.ws.open:
+ return self.DISCONNECTED
+
+ # CONNECTED: everything is fine!
+ if self.connection.ws.open:
+ return self.CONNECTED
+
+ # UNKNOWN: We should never hit this state -- if we do,
+ # something went wrong with the logic above, and we do not
+ # know what state the connection is in.
+ return self.UNKNOWN
+
+ def receiver_exceptions(self):
+ """
+ Return exceptions in the receiver, if any.
+
+ """
+ if not self.receiver:
+ return None
+ if not self.receiver.done():
+ return None
+ return self.receiver.exception()
+
+
class Connection:
"""
Usage::
# Connect to the currently active model
client = await Connection.connect_current()
+ Note: Any connection method or constructor can accept an optional `loop`
+ argument to override the default event loop from `asyncio.get_event_loop`.
"""
def __init__(
self, endpoint, uuid, username, password, cacert=None,
- macaroons=None):
+ macaroons=None, loop=None):
self.endpoint = endpoint
self.uuid = uuid
self.username = username
self.password = password
self.macaroons = macaroons
self.cacert = cacert
+ self.loop = loop or asyncio.get_event_loop()
self.__request_id__ = 0
self.addr = None
self.ws = None
self.facades = {}
+ self.messages = IdQueue(loop=self.loop)
+ self.close_called = False
+ self.monitor = Monitor(connection=self)
@property
def is_open(self):
kw = dict()
kw['ssl'] = self._get_ssl(self.cacert)
+ kw['loop'] = self.loop
self.addr = url
self.ws = await websockets.connect(url, **kw)
+ self.monitor.receiver = self.loop.create_task(self.receiver())
log.info("Driver connected to juju %s", url)
return self
async def close(self):
+ self.close_called = True
await self.ws.close()
- async def recv(self):
- result = await self.ws.recv()
- if result is not None:
- result = json.loads(result)
- return result
+ async def recv(self, request_id):
+ if not self.is_open:
+ raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
+ return await self.messages.get(request_id)
+
+ async def receiver(self):
+ while self.is_open:
+ try:
+ result = await self.ws.recv()
+ if result is not None:
+ result = json.loads(result)
+ await self.messages.put(result['request-id'], result)
+ except Exception as e:
+ await self.messages.put_all(e)
+ if isinstance(e, websockets.ConnectionClosed):
+ # ConnectionClosed is not really exceptional for us,
+ # but it may be for any pending message listeners
+ return
+ raise
async def rpc(self, msg, encoder=None):
self.__request_id__ += 1
msg['version'] = self.facades[msg['type']]
outgoing = json.dumps(msg, indent=2, cls=encoder)
await self.ws.send(outgoing)
- result = await self.recv()
- if result and 'error' in result:
+ result = await self.recv(msg['request-id'])
+
+ if not result:
+ return result
+
+ if 'error' in result:
+ # API Error Response
raise JujuAPIError(result)
+
+ if 'response' not in result:
+ # This may never happen
+ return result
+
+ if 'results' in result['response']:
+ # Check for errors in a result list.
+ errors = []
+ for res in result['response']['results']:
+ if res.get('error', {}).get('message'):
+ errors.append(res['error']['message'])
+ if errors:
+ raise JujuError(errors)
+
+ elif result['response'].get('error', {}).get('message'):
+ raise JujuError(result['response']['error']['message'])
+
return result
def http_headers(self):
self.password,
self.cacert,
self.macaroons,
+ self.loop,
)
async def controller(self):
self.password,
self.cacert,
self.macaroons,
+ self.loop,
)
@classmethod
async def connect(
cls, endpoint, uuid, username, password, cacert=None,
- macaroons=None):
+ macaroons=None, loop=None):
"""Connect to the websocket.
If uuid is None, the connection will be to the controller. Otherwise it
will be to the model.
"""
- client = cls(endpoint, uuid, username, password, cacert, macaroons)
+ client = cls(endpoint, uuid, username, password, cacert, macaroons,
+ loop)
await client.open()
redirect_info = await client.redirect_info()
"Couldn't authenticate to %s", endpoint)
@classmethod
- async def connect_current(cls):
+ async def connect_current(cls, loop=None):
"""Connect to the currently active model.
"""
jujudata = JujuData()
controller_name = jujudata.current_controller()
- models = jujudata.models()[controller_name]
- model_name = models['current-model']
+ model_name = jujudata.current_model()
return await cls.connect_model(
- '{}:{}'.format(controller_name, model_name))
+ '{}:{}'.format(controller_name, model_name), loop)
@classmethod
- async def connect_current_controller(cls):
+ async def connect_current_controller(cls, loop=None):
"""Connect to the currently active controller.
"""
if not controller_name:
raise JujuConnectionError('No current controller')
- return await cls.connect_controller(controller_name)
+ return await cls.connect_controller(controller_name, loop)
@classmethod
- async def connect_controller(cls, controller_name):
+ async def connect_controller(cls, controller_name, loop=None):
"""Connect to a controller by name.
"""
macaroons = get_macaroons() if not password else None
return await cls.connect(
- endpoint, None, username, password, cacert, macaroons)
+ endpoint, None, username, password, cacert, macaroons, loop)
@classmethod
- async def connect_model(cls, model):
+ async def connect_model(cls, model, loop=None):
"""Connect to a model by name.
- :param str model: <controller>:<model>
+ :param str model: [<controller>:]<model>
"""
- controller_name, model_name = model.split(':')
-
jujudata = JujuData()
+
+ if ':' in model:
+ # explicit controller given
+ controller_name, model_name = model.split(':')
+ else:
+ # use the current controller if one isn't explicitly given
+ controller_name = jujudata.current_controller()
+ model_name = model
+
+ accounts = jujudata.accounts()[controller_name]
+ username = accounts['user']
+ # model name must include a user prefix, so add it if it doesn't
+ if '/' not in model_name:
+ model_name = '{}/{}'.format(username, model_name)
+
controller = jujudata.controllers()[controller_name]
endpoint = controller['api-endpoints'][0]
cacert = controller.get('ca-cert')
- accounts = jujudata.accounts()[controller_name]
- username = accounts['user']
password = accounts.get('password')
models = jujudata.models()[controller_name]
model_uuid = models['models'][model_name]['uuid']
macaroons = get_macaroons() if not password else None
return await cls.connect(
- endpoint, model_uuid, username, password, cacert, macaroons)
+ endpoint, model_uuid, username, password, cacert, macaroons, loop)
def build_facades(self, info):
self.facades.clear()
output = yaml.safe_load(output)
return output.get('current-controller', '')
+ def current_model(self, controller_name=None):
+ if not controller_name:
+ controller_name = self.current_controller()
+ models = self.models()[controller_name]
+ if 'current-model' not in models:
+ raise JujuError('No current model')
+ return models['current-model']
+
def controllers(self):
return self._load_yaml('controllers.yaml', 'controllers')
cookie_file = os.path.expanduser('~/.go-cookies')
with open(cookie_file, 'r') as f:
cookies = json.load(f)
- except (OSError, ValueError) as e:
+ except (OSError, ValueError):
log.warn("Couldn't load macaroons from %s", cookie_file)
return []