import yaml
from juju import tag
+from juju.client import client
+from juju.client.version_map import VERSION_MAP
from juju.errors import JujuError, JujuAPIError, JujuConnectionError
from juju.utils import IdQueue
log = logging.getLogger("websocket")
+class Monitor:
+ """
+ Monitor helper class for our Connection class.
+
+ Contains a reference to an instantiated Connection, along with a
+ reference to the Connection.receiver Future. Upon inspecttion of
+ these objects, this class determines whether the connection is in
+ an 'error', 'connected' or 'disconnected' state.
+
+ Use this class to stay up to date on the health of a connection,
+ and take appropriate action if the connection errors out due to
+ network issues or other unexpected circumstances.
+
+ """
+ ERROR = 'error'
+ CONNECTED = 'connected'
+ DISCONNECTED = 'disconnected'
+ UNKNOWN = 'unknown'
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.receiver = None
+
+ @property
+ def status(self):
+ """
+ Determine the status of the connection and receiver, and return
+ ERROR, CONNECTED, or DISCONNECTED as appropriate.
+
+ For simplicity, we only consider ourselves to be connected
+ after the Connection class has setup a receiver task. This
+ only happens after the websocket is open, and the connection
+ isn't usable until that receiver has been started.
+
+ """
+
+ # DISCONNECTED: connection not yet open
+ if not self.connection.ws:
+ return self.DISCONNECTED
+ if not self.receiver:
+ return self.DISCONNECTED
+
+ # ERROR: Connection closed (or errored), but we didn't call
+ # connection.close
+ if not self.connection.close_called and self.receiver_exceptions():
+ return self.ERROR
+ if not self.connection.close_called and not self.connection.ws.open:
+ # The check for self.receiver existing above guards against the
+ # case where we're not open because we simply haven't
+ # setup the connection yet.
+ return self.ERROR
+
+ # DISCONNECTED: cleanly disconnected.
+ if self.connection.close_called and not self.connection.ws.open:
+ return self.DISCONNECTED
+
+ # CONNECTED: everything is fine!
+ if self.connection.ws.open:
+ return self.CONNECTED
+
+ # UNKNOWN: We should never hit this state -- if we do,
+ # something went wrong with the logic above, and we do not
+ # know what state the connection is in.
+ return self.UNKNOWN
+
+ def receiver_exceptions(self):
+ """
+ Return exceptions in the receiver, if any.
+
+ """
+ if not self.receiver:
+ return None
+ if not self.receiver.done():
+ return None
+ return self.receiver.exception()
+
+
class Connection:
"""
Usage::
self.ws = None
self.facades = {}
self.messages = IdQueue(loop=self.loop)
+ self.close_called = False
+ self.monitor = Monitor(connection=self)
@property
def is_open(self):
kw['loop'] = self.loop
self.addr = url
self.ws = await websockets.connect(url, **kw)
- self.loop.create_task(self.receiver())
+ self.monitor.receiver = self.loop.create_task(self.receiver())
log.info("Driver connected to juju %s", url)
return self
async def close(self):
+ self.close_called = True
await self.ws.close()
async def recv(self, request_id):
await self.messages.put(result['request-id'], result)
except Exception as e:
await self.messages.put_all(e)
+ if isinstance(e, websockets.ConnectionClosed):
+ # ConnectionClosed is not really exceptional for us,
+ # but it may be for any pending message listeners
+ return
raise
- await self.messages.put_all(websockets.exceptions.ConnectionClosed(
- 0, 'websocket closed'))
+
+ async def pinger(self):
+ '''
+ A Controller can time us out if we are silent for too long. This
+ is especially true in JaaS, which has a fairly strict timeout.
+
+ To prevent timing out, we send a ping every ten seconds.
+
+ '''
+ pinger_facade = client.PingerFacade.from_connection(self)
+ while self.is_open:
+ await pinger_facade.Ping()
+ await asyncio.sleep(10)
async def rpc(self, msg, encoder=None):
self.__request_id__ += 1
return await cls.connect(
endpoint, model_uuid, username, password, cacert, macaroons, loop)
- def build_facades(self, info):
+ def build_facades(self, facades):
self.facades.clear()
- for facade in info:
- self.facades[facade['name']] = facade['versions'][-1]
+ # In order to work around an issue where the juju api is not
+ # returning a complete list of facades, we simply look up the
+ # juju version in a pregenerated map, and use that info to
+ # populate our list of facades.
+
+ # TODO: if a future version of juju fixes this bug, restore
+ # the following code for that version and higher:
+ # for facade in facades:
+ # self.facades[facade['name']] = facade['versions'][-1]
+ try:
+ self.facades = VERSION_MAP[self.info['server-version']]
+ except KeyError:
+ log.warning("Could not find a set of facades for {}. Using "
+ "the latest facade set instead".format(
+ self.info['server-version']))
+ self.facades = VERSION_MAP['latest']
async def login(self, username, password, macaroons=None):
if macaroons:
"macaroons": macaroons or []
}})
response = result['response']
- self.build_facades(response.get('facades', {}))
self.info = response.copy()
+ self.build_facades(response.get('facades', {}))
+ # Create a pinger to keep the connection alive (needed for
+ # JaaS; harmless elsewhere).
+ self.loop.create_task(self.pinger())
return response
async def redirect_info(self):