Feature/api version support (#109)
[osm/N2VC.git] / juju / client / connection.py
index 486a57f..4a9766d 100644 (file)
@@ -15,11 +15,91 @@ import asyncio
 import yaml
 
 from juju import tag
+from juju.client import client
+from juju.client.version_map import VERSION_MAP
 from juju.errors import JujuError, JujuAPIError, JujuConnectionError
+from juju.utils import IdQueue
 
 log = logging.getLogger("websocket")
 
 
+class Monitor:
+    """
+    Monitor helper class for our Connection class.
+
+    Contains a reference to an instantiated Connection, along with a
+    reference to the Connection.receiver Future. Upon inspecttion of
+    these objects, this class determines whether the connection is in
+    an 'error', 'connected' or 'disconnected' state.
+
+    Use this class to stay up to date on the health of a connection,
+    and take appropriate action if the connection errors out due to
+    network issues or other unexpected circumstances.
+
+    """
+    ERROR = 'error'
+    CONNECTED = 'connected'
+    DISCONNECTED = 'disconnected'
+    UNKNOWN = 'unknown'
+
+    def __init__(self, connection):
+        self.connection = connection
+        self.receiver = None
+
+    @property
+    def status(self):
+        """
+        Determine the status of the connection and receiver, and return
+        ERROR, CONNECTED, or DISCONNECTED as appropriate.
+
+        For simplicity, we only consider ourselves to be connected
+        after the Connection class has setup a receiver task. This
+        only happens after the websocket is open, and the connection
+        isn't usable until that receiver has been started.
+
+        """
+
+        # DISCONNECTED: connection not yet open
+        if not self.connection.ws:
+            return self.DISCONNECTED
+        if not self.receiver:
+            return self.DISCONNECTED
+
+        # ERROR: Connection closed (or errored), but we didn't call
+        # connection.close
+        if not self.connection.close_called and self.receiver_exceptions():
+            return self.ERROR
+        if not self.connection.close_called and not self.connection.ws.open:
+            # The check for self.receiver existing above guards against the
+            # case where we're not open because we simply haven't
+            # setup the connection yet.
+            return self.ERROR
+
+        # DISCONNECTED: cleanly disconnected.
+        if self.connection.close_called and not self.connection.ws.open:
+            return self.DISCONNECTED
+
+        # CONNECTED: everything is fine!
+        if self.connection.ws.open:
+            return self.CONNECTED
+
+        # UNKNOWN: We should never hit this state -- if we do,
+        # something went wrong with the logic above, and we do not
+        # know what state the connection is in.
+        return self.UNKNOWN
+
+    def receiver_exceptions(self):
+        """
+        Return exceptions in the receiver, if any.
+
+        """
+        if not self.receiver:
+            return None
+        if not self.receiver.done():
+            return None
+        return self.receiver.exception()
+
+
 class Connection:
     """
     Usage::
@@ -52,7 +132,9 @@ class Connection:
         self.addr = None
         self.ws = None
         self.facades = {}
-        self.messages = {}
+        self.messages = IdQueue(loop=self.loop)
+        self.close_called = False
+        self.monitor = Monitor(connection=self)
 
     @property
     def is_open(self):
@@ -75,27 +157,46 @@ class Connection:
         kw['loop'] = self.loop
         self.addr = url
         self.ws = await websockets.connect(url, **kw)
+        self.monitor.receiver = self.loop.create_task(self.receiver())
         log.info("Driver connected to juju %s", url)
         return self
 
     async def close(self):
+        self.close_called = True
         await self.ws.close()
 
     async def recv(self, request_id):
-        while not self.messages.get(request_id):
-            await asyncio.sleep(0)
-
-        result = self.messages[request_id]
-
-        del self.messages[request_id]
-        return result
+        if not self.is_open:
+            raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
+        return await self.messages.get(request_id)
 
     async def receiver(self):
         while self.is_open:
-            result = await self.ws.recv()
-            if result is not None:
-                result = json.loads(result)
-                self.messages[result['request-id']] = result
+            try:
+                result = await self.ws.recv()
+                if result is not None:
+                    result = json.loads(result)
+                    await self.messages.put(result['request-id'], result)
+            except Exception as e:
+                await self.messages.put_all(e)
+                if isinstance(e, websockets.ConnectionClosed):
+                    # ConnectionClosed is not really exceptional for us,
+                    # but it may be for any pending message listeners
+                    return
+                raise
+
+    async def pinger(self):
+        '''
+        A Controller can time us out if we are silent for too long. This
+        is especially true in JaaS, which has a fairly strict timeout.
+
+        To prevent timing out, we send a ping every ten seconds.
+
+        '''
+        pinger_facade = client.PingerFacade.from_connection(self)
+        while self.is_open:
+            await pinger_facade.Ping()
+            await asyncio.sleep(10)
 
     async def rpc(self, msg, encoder=None):
         self.__request_id__ += 1
@@ -115,7 +216,7 @@ class Connection:
             # API Error Response
             raise JujuAPIError(result)
 
-        if not 'response' in result:
+        if 'response' not in result:
             # This may never happen
             return result
 
@@ -221,7 +322,6 @@ class Connection:
         client = cls(endpoint, uuid, username, password, cacert, macaroons,
                      loop)
         await client.open()
-        self.loop.create_task(self.receiver)
 
         redirect_info = await client.redirect_info()
         if not redirect_info:
@@ -325,10 +425,24 @@ class Connection:
         return await cls.connect(
             endpoint, model_uuid, username, password, cacert, macaroons, loop)
 
-    def build_facades(self, info):
+    def build_facades(self, facades):
         self.facades.clear()
-        for facade in info:
-            self.facades[facade['name']] = facade['versions'][-1]
+        # In order to work around an issue where the juju api is not
+        # returning a complete list of facades, we simply look up the
+        # juju version in a pregenerated map, and use that info to
+        # populate our list of facades.
+
+        # TODO: if a future version of juju fixes this bug, restore
+        # the following code for that version and higher:
+        # for facade in facades:
+        #     self.facades[facade['name']] = facade['versions'][-1]
+        try:
+            self.facades = VERSION_MAP[self.info['server-version']]
+        except KeyError:
+            log.warning("Could not find a set of facades for {}. Using "
+                        "the latest facade set instead".format(
+                            self.info['server-version']))
+            self.facades = VERSION_MAP['latest']
 
     async def login(self, username, password, macaroons=None):
         if macaroons:
@@ -349,8 +463,11 @@ class Connection:
                 "macaroons": macaroons or []
             }})
         response = result['response']
-        self.build_facades(response.get('facades', {}))
         self.info = response.copy()
+        self.build_facades(response.get('facades', {}))
+        # Create a pinger to keep the connection alive (needed for
+        # JaaS; harmless elsewhere).
+        self.loop.create_task(self.pinger())
         return response
 
     async def redirect_info(self):