Refactored login code to better handle redirects (#116)
[osm/N2VC.git] / juju / client / connection.py
index 9e8cb8f..2be360f 100644 (file)
@@ -11,14 +11,96 @@ import subprocess
 import websockets
 from http.client import HTTPSConnection
 
+import asyncio
 import yaml
 
 from juju import tag
-from juju.errors import JujuAPIError, JujuConnectionError
+from juju.client import client
+from juju.client.version_map import VERSION_MAP
+from juju.errors import JujuError, JujuAPIError, JujuConnectionError
+from juju.utils import IdQueue
 
 log = logging.getLogger("websocket")
 
 
+class Monitor:
+    """
+    Monitor helper class for our Connection class.
+
+    Contains a reference to an instantiated Connection, along with a
+    reference to the Connection.receiver Future. Upon inspecttion of
+    these objects, this class determines whether the connection is in
+    an 'error', 'connected' or 'disconnected' state.
+
+    Use this class to stay up to date on the health of a connection,
+    and take appropriate action if the connection errors out due to
+    network issues or other unexpected circumstances.
+
+    """
+    ERROR = 'error'
+    CONNECTED = 'connected'
+    DISCONNECTED = 'disconnected'
+    UNKNOWN = 'unknown'
+
+    def __init__(self, connection):
+        self.connection = connection
+        self.receiver = None
+        self.pinger = None
+
+    @property
+    def status(self):
+        """
+        Determine the status of the connection and receiver, and return
+        ERROR, CONNECTED, or DISCONNECTED as appropriate.
+
+        For simplicity, we only consider ourselves to be connected
+        after the Connection class has setup a receiver task. This
+        only happens after the websocket is open, and the connection
+        isn't usable until that receiver has been started.
+
+        """
+
+        # DISCONNECTED: connection not yet open
+        if not self.connection.ws:
+            return self.DISCONNECTED
+        if not self.receiver:
+            return self.DISCONNECTED
+
+        # ERROR: Connection closed (or errored), but we didn't call
+        # connection.close
+        if not self.connection.close_called and self.receiver_exceptions():
+            return self.ERROR
+        if not self.connection.close_called and not self.connection.ws.open:
+            # The check for self.receiver existing above guards against the
+            # case where we're not open because we simply haven't
+            # setup the connection yet.
+            return self.ERROR
+
+        # DISCONNECTED: cleanly disconnected.
+        if self.connection.close_called and not self.connection.ws.open:
+            return self.DISCONNECTED
+
+        # CONNECTED: everything is fine!
+        if self.connection.ws.open:
+            return self.CONNECTED
+
+        # UNKNOWN: We should never hit this state -- if we do,
+        # something went wrong with the logic above, and we do not
+        # know what state the connection is in.
+        return self.UNKNOWN
+
+    def receiver_exceptions(self):
+        """
+        Return exceptions in the receiver, if any.
+
+        """
+        if not self.receiver:
+            return None
+        if not self.receiver.done():
+            return None
+        return self.receiver.exception()
+
+
 class Connection:
     """
     Usage::
@@ -33,21 +115,32 @@ class Connection:
         # Connect to the currently active model
         client = await Connection.connect_current()
 
+    Note: Any connection method or constructor can accept an optional `loop`
+    argument to override the default event loop from `asyncio.get_event_loop`.
     """
     def __init__(
             self, endpoint, uuid, username, password, cacert=None,
-            macaroons=None):
+            macaroons=None, loop=None):
         self.endpoint = endpoint
         self.uuid = uuid
-        self.username = username
-        self.password = password
-        self.macaroons = macaroons
+        if macaroons:
+            self.macaroons = macaroons
+            self.username = ''
+            self.password = ''
+        else:
+            self.macaroons = []
+            self.username = username
+            self.password = password
         self.cacert = cacert
+        self.loop = loop or asyncio.get_event_loop()
 
         self.__request_id__ = 0
         self.addr = None
         self.ws = None
         self.facades = {}
+        self.messages = IdQueue(loop=self.loop)
+        self.close_called = False
+        self.monitor = Monitor(connection=self)
 
     @property
     def is_open(self):
@@ -67,19 +160,56 @@ class Connection:
 
         kw = dict()
         kw['ssl'] = self._get_ssl(self.cacert)
+        kw['loop'] = self.loop
         self.addr = url
         self.ws = await websockets.connect(url, **kw)
+        self.monitor.receiver = self.loop.create_task(self.receiver())
         log.info("Driver connected to juju %s", url)
         return self
 
     async def close(self):
+        if not self.is_open:
+            return
+        self.close_called = True
+        if self.monitor.pinger:
+            # might be closing due to login failure,
+            # in which case we won't have a pinger yet
+            self.monitor.pinger.cancel()
+        self.monitor.receiver.cancel()
         await self.ws.close()
 
-    async def recv(self):
-        result = await self.ws.recv()
-        if result is not None:
-            result = json.loads(result)
-        return result
+    async def recv(self, request_id):
+        if not self.is_open:
+            raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
+        return await self.messages.get(request_id)
+
+    async def receiver(self):
+        while self.is_open:
+            try:
+                result = await self.ws.recv()
+                if result is not None:
+                    result = json.loads(result)
+                    await self.messages.put(result['request-id'], result)
+            except Exception as e:
+                await self.messages.put_all(e)
+                if isinstance(e, websockets.ConnectionClosed):
+                    # ConnectionClosed is not really exceptional for us,
+                    # but it may be for any pending message listeners
+                    return
+                raise
+
+    async def pinger(self):
+        '''
+        A Controller can time us out if we are silent for too long. This
+        is especially true in JaaS, which has a fairly strict timeout.
+
+        To prevent timing out, we send a ping every ten seconds.
+
+        '''
+        pinger_facade = client.PingerFacade.from_connection(self)
+        while self.is_open:
+            await pinger_facade.Ping()
+            await asyncio.sleep(10, loop=self.loop)
 
     async def rpc(self, msg, encoder=None):
         self.__request_id__ += 1
@@ -90,9 +220,31 @@ class Connection:
             msg['version'] = self.facades[msg['type']]
         outgoing = json.dumps(msg, indent=2, cls=encoder)
         await self.ws.send(outgoing)
-        result = await self.recv()
-        if result and 'error' in result:
+        result = await self.recv(msg['request-id'])
+
+        if not result:
+            return result
+
+        if 'error' in result:
+            # API Error Response
             raise JujuAPIError(result)
+
+        if 'response' not in result:
+            # This may never happen
+            return result
+
+        if 'results' in result['response']:
+            # Check for errors in a result list.
+            errors = []
+            for res in result['response']['results']:
+                if res.get('error', {}).get('message'):
+                    errors.append(res['error']['message'])
+            if errors:
+                raise JujuError(errors)
+
+        elif result['response'].get('error', {}).get('message'):
+            raise JujuError(result['response']['error']['message'])
+
         return result
 
     def http_headers(self):
@@ -153,6 +305,7 @@ class Connection:
             self.password,
             self.cacert,
             self.macaroons,
+            self.loop,
         )
 
     async def controller(self):
@@ -166,63 +319,86 @@ class Connection:
             self.password,
             self.cacert,
             self.macaroons,
+            self.loop,
         )
 
+    async def _try_endpoint(self, endpoint, cacert):
+        success = False
+        result = None
+        new_endpoints = []
+
+        self.endpoint = endpoint
+        self.cacert = cacert
+        await self.open()
+        try:
+            result = await self.login()
+            if 'discharge-required-error' in result['response']:
+                log.info('Macaroon discharge required, disconnecting')
+            else:
+                # successful login!
+                log.info('Authenticated')
+                success = True
+        except JujuAPIError as e:
+            if e.error_code != 'redirection required':
+                raise
+            log.info('Controller requested redirect')
+            redirect_info = await self.redirect_info()
+            redir_cacert = redirect_info['ca-cert']
+            new_endpoints = [
+                ("{value}:{port}".format(**s), redir_cacert)
+                for servers in redirect_info['servers']
+                for s in servers if s["scope"] == 'public'
+            ]
+        finally:
+            if not success:
+                await self.close()
+        return success, result, new_endpoints
+
     @classmethod
     async def connect(
             cls, endpoint, uuid, username, password, cacert=None,
-            macaroons=None):
+            macaroons=None, loop=None):
         """Connect to the websocket.
 
         If uuid is None, the connection will be to the controller. Otherwise it
         will be to the model.
 
         """
-        client = cls(endpoint, uuid, username, password, cacert, macaroons)
-        await client.open()
-
-        redirect_info = await client.redirect_info()
-        if not redirect_info:
-            await client.login(username, password, macaroons)
-            return client
-
-        await client.close()
-        servers = [
-            s for servers in redirect_info['servers']
-            for s in servers if s["scope"] == 'public'
-        ]
-        for server in servers:
-            client = cls(
-                "{value}:{port}".format(**server), uuid, username,
-                password, redirect_info['ca-cert'], macaroons)
-            await client.open()
-            try:
-                result = await client.login(username, password, macaroons)
-                if 'discharge-required-error' in result:
-                    continue
-                return client
-            except Exception as e:
-                await client.close()
-                log.exception(e)
+        client = cls(endpoint, uuid, username, password, cacert, macaroons,
+                     loop)
+        endpoints = [(endpoint, cacert)]
+        while endpoints:
+            _endpoint, _cacert = endpoints.pop(0)
+            success, result, new_endpoints = await client._try_endpoint(
+                _endpoint, _cacert)
+            if success:
+                break
+            endpoints.extend(new_endpoints)
+        else:
+            # ran out of endpoints without a successful login
+            raise Exception("Couldn't authenticate to {}".format(endpoint))
+
+        response = result['response']
+        client.info = response.copy()
+        client.build_facades(response.get('facades', {}))
+        client.monitor.pinger = client.loop.create_task(client.pinger())
 
-        raise Exception(
-            "Couldn't authenticate to %s", endpoint)
+        return client
 
     @classmethod
-    async def connect_current(cls):
+    async def connect_current(cls, loop=None):
         """Connect to the currently active model.
 
         """
         jujudata = JujuData()
         controller_name = jujudata.current_controller()
-        models = jujudata.models()[controller_name]
-        model_name = models['current-model']
+        model_name = jujudata.current_model()
 
         return await cls.connect_model(
-            '{}:{}'.format(controller_name, model_name))
+            '{}:{}'.format(controller_name, model_name), loop)
 
     @classmethod
-    async def connect_current_controller(cls):
+    async def connect_current_controller(cls, loop=None):
         """Connect to the currently active controller.
 
         """
@@ -231,10 +407,10 @@ class Connection:
         if not controller_name:
             raise JujuConnectionError('No current controller')
 
-        return await cls.connect_controller(controller_name)
+        return await cls.connect_controller(controller_name, loop)
 
     @classmethod
-    async def connect_controller(cls, controller_name):
+    async def connect_controller(cls, controller_name, loop=None):
         """Connect to a controller by name.
 
         """
@@ -248,41 +424,63 @@ class Connection:
         macaroons = get_macaroons() if not password else None
 
         return await cls.connect(
-            endpoint, None, username, password, cacert, macaroons)
+            endpoint, None, username, password, cacert, macaroons, loop)
 
     @classmethod
-    async def connect_model(cls, model):
+    async def connect_model(cls, model, loop=None):
         """Connect to a model by name.
 
-        :param str model: <controller>:<model>
+        :param str model: [<controller>:]<model>
 
         """
-        controller_name, model_name = model.split(':')
-
         jujudata = JujuData()
+
+        if ':' in model:
+            # explicit controller given
+            controller_name, model_name = model.split(':')
+        else:
+            # use the current controller if one isn't explicitly given
+            controller_name = jujudata.current_controller()
+            model_name = model
+
+        accounts = jujudata.accounts()[controller_name]
+        username = accounts['user']
+        # model name must include a user prefix, so add it if it doesn't
+        if '/' not in model_name:
+            model_name = '{}/{}'.format(username, model_name)
+
         controller = jujudata.controllers()[controller_name]
         endpoint = controller['api-endpoints'][0]
         cacert = controller.get('ca-cert')
-        accounts = jujudata.accounts()[controller_name]
-        username = accounts['user']
         password = accounts.get('password')
         models = jujudata.models()[controller_name]
         model_uuid = models['models'][model_name]['uuid']
         macaroons = get_macaroons() if not password else None
 
         return await cls.connect(
-            endpoint, model_uuid, username, password, cacert, macaroons)
+            endpoint, model_uuid, username, password, cacert, macaroons, loop)
 
-    def build_facades(self, info):
+    def build_facades(self, facades):
         self.facades.clear()
-        for facade in info:
-            self.facades[facade['name']] = facade['versions'][-1]
-
-    async def login(self, username, password, macaroons=None):
-        if macaroons:
-            username = ''
-            password = ''
-
+        # In order to work around an issue where the juju api is not
+        # returning a complete list of facades, we simply look up the
+        # juju version in a pregenerated map, and use that info to
+        # populate our list of facades.
+
+        # TODO: if a future version of juju fixes this bug, restore
+        # the following code for that version and higher:
+        # for facade in facades:
+        #     self.facades[facade['name']] = facade['versions'][-1]
+        try:
+            self.facades = VERSION_MAP[self.info['server-version']]
+        except KeyError:
+            log.warning("Could not find a set of facades for {}. Using "
+                        "the latest facade set instead".format(
+                            self.info['server-version']))
+            self.facades = VERSION_MAP['latest']
+
+    async def login(self):
+        username = self.username
         if username and not username.startswith('user-'):
             username = 'user-{}'.format(username)
 
@@ -292,14 +490,11 @@ class Connection:
             "version": 3,
             "params": {
                 "auth-tag": username,
-                "credentials": password,
+                "credentials": self.password,
                 "nonce": "".join(random.sample(string.printable, 12)),
-                "macaroons": macaroons or []
+                "macaroons": self.macaroons
             }})
-        response = result['response']
-        self.build_facades(response.get('facades', {}))
-        self.info = response.copy()
-        return response
+        return result
 
     async def redirect_info(self):
         try:
@@ -326,6 +521,14 @@ class JujuData:
         output = yaml.safe_load(output)
         return output.get('current-controller', '')
 
+    def current_model(self, controller_name=None):
+        if not controller_name:
+            controller_name = self.current_controller()
+        models = self.models()[controller_name]
+        if 'current-model' not in models:
+            raise JujuError('No current model')
+        return models['current-model']
+
     def controllers(self):
         return self._load_yaml('controllers.yaml', 'controllers')
 
@@ -349,7 +552,7 @@ def get_macaroons():
         cookie_file = os.path.expanduser('~/.go-cookies')
         with open(cookie_file, 'r') as f:
             cookies = json.load(f)
-    except (OSError, ValueError) as e:
+    except (OSError, ValueError):
         log.warn("Couldn't load macaroons from %s", cookie_file)
         return []