Refactor to use IdQueue pattern
[osm/N2VC.git] / juju / client / connection.py
index 5ed073f..625c609 100644 (file)
@@ -9,10 +9,14 @@ import ssl
 import string
 import subprocess
 import websockets
+from http.client import HTTPSConnection
 
+import asyncio
 import yaml
 
-from juju.errors import JujuAPIError, JujuConnectionError
+from juju import tag
+from juju.errors import JujuError, JujuAPIError, JujuConnectionError
+from juju.utils import IdQueue
 
 log = logging.getLogger("websocket")
 
@@ -31,21 +35,25 @@ class Connection:
         # Connect to the currently active model
         client = await Connection.connect_current()
 
+    Note: Any connection method or constructor can accept an optional `loop`
+    argument to override the default event loop from `asyncio.get_event_loop`.
     """
     def __init__(
             self, endpoint, uuid, username, password, cacert=None,
-            macaroons=None):
+            macaroons=None, loop=None):
         self.endpoint = endpoint
         self.uuid = uuid
         self.username = username
         self.password = password
         self.macaroons = macaroons
         self.cacert = cacert
+        self.loop = loop or asyncio.get_event_loop()
 
         self.__request_id__ = 0
         self.addr = None
         self.ws = None
         self.facades = {}
+        self.messages = IdQueue(loop=self.loop)
 
     @property
     def is_open(self):
@@ -65,6 +73,7 @@ class Connection:
 
         kw = dict()
         kw['ssl'] = self._get_ssl(self.cacert)
+        kw['loop'] = self.loop
         self.addr = url
         self.ws = await websockets.connect(url, **kw)
         log.info("Driver connected to juju %s", url)
@@ -73,11 +82,15 @@ class Connection:
     async def close(self):
         await self.ws.close()
 
-    async def recv(self):
-        result = await self.ws.recv()
-        if result is not None:
-            result = json.loads(result)
-        return result
+    async def recv(self, request_id):
+        return await self.messages.get(request_id)
+
+    async def receiver(self):
+        while self.is_open:
+            result = await self.ws.recv()
+            if result is not None:
+                result = json.loads(result)
+                await self.messages.put(result['request-id'], result)
 
     async def rpc(self, msg, encoder=None):
         self.__request_id__ += 1
@@ -88,11 +101,79 @@ class Connection:
             msg['version'] = self.facades[msg['type']]
         outgoing = json.dumps(msg, indent=2, cls=encoder)
         await self.ws.send(outgoing)
-        result = await self.recv()
-        if result and 'error' in result:
+        result = await self.recv(msg['request-id'])
+
+        if not result:
+            return result
+
+        if 'error' in result:
+            # API Error Response
             raise JujuAPIError(result)
+
+        if not 'response' in result:
+            # This may never happen
+            return result
+
+        if 'results' in result['response']:
+            # Check for errors in a result list.
+            errors = []
+            for res in result['response']['results']:
+                if res.get('error', {}).get('message'):
+                    errors.append(res['error']['message'])
+            if errors:
+                raise JujuError(errors)
+
+        elif result['response'].get('error', {}).get('message'):
+            raise JujuError(result['response']['error']['message'])
+
         return result
 
+    def http_headers(self):
+        """Return dictionary of http headers necessary for making an http
+        connection to the endpoint of this Connection.
+
+        :return: Dictionary of headers
+
+        """
+        if not self.username:
+            return {}
+
+        creds = u'{}:{}'.format(
+            tag.user(self.username),
+            self.password or ''
+        )
+        token = base64.b64encode(creds.encode())
+        return {
+            'Authorization': 'Basic {}'.format(token.decode())
+        }
+
+    def https_connection(self):
+        """Return an https connection to this Connection's endpoint.
+
+        Returns a 3-tuple containing::
+
+            1. The :class:`HTTPSConnection` instance
+            2. Dictionary of auth headers to be used with the connection
+            3. The root url path (str) to be used for requests.
+
+        """
+        endpoint = self.endpoint
+        host, remainder = endpoint.split(':', 1)
+        port = remainder
+        if '/' in remainder:
+            port, _ = remainder.split('/', 1)
+
+        conn = HTTPSConnection(
+            host, int(port),
+            context=self._get_ssl(self.cacert),
+        )
+
+        path = (
+            "/model/{}".format(self.uuid)
+            if self.uuid else ""
+        )
+        return conn, self.http_headers(), path
+
     async def clone(self):
         """Return a new Connection, connected to the same websocket endpoint
         as this one.
@@ -105,6 +186,7 @@ class Connection:
             self.password,
             self.cacert,
             self.macaroons,
+            self.loop,
         )
 
     async def controller(self):
@@ -118,20 +200,23 @@ class Connection:
             self.password,
             self.cacert,
             self.macaroons,
+            self.loop,
         )
 
     @classmethod
     async def connect(
             cls, endpoint, uuid, username, password, cacert=None,
-            macaroons=None):
+            macaroons=None, loop=None):
         """Connect to the websocket.
 
         If uuid is None, the connection will be to the controller. Otherwise it
         will be to the model.
 
         """
-        client = cls(endpoint, uuid, username, password, cacert, macaroons)
+        client = cls(endpoint, uuid, username, password, cacert, macaroons,
+                     loop)
         await client.open()
+        client.loop.create_task(client.receiver)
 
         redirect_info = await client.redirect_info()
         if not redirect_info:
@@ -161,20 +246,19 @@ class Connection:
             "Couldn't authenticate to %s", endpoint)
 
     @classmethod
-    async def connect_current(cls):
+    async def connect_current(cls, loop=None):
         """Connect to the currently active model.
 
         """
         jujudata = JujuData()
         controller_name = jujudata.current_controller()
-        models = jujudata.models()[controller_name]
-        model_name = models['current-model']
+        model_name = jujudata.current_model()
 
         return await cls.connect_model(
-            '{}:{}'.format(controller_name, model_name))
+            '{}:{}'.format(controller_name, model_name), loop)
 
     @classmethod
-    async def connect_current_controller(cls):
+    async def connect_current_controller(cls, loop=None):
         """Connect to the currently active controller.
 
         """
@@ -183,10 +267,10 @@ class Connection:
         if not controller_name:
             raise JujuConnectionError('No current controller')
 
-        return await cls.connect_controller(controller_name)
+        return await cls.connect_controller(controller_name, loop)
 
     @classmethod
-    async def connect_controller(cls, controller_name):
+    async def connect_controller(cls, controller_name, loop=None):
         """Connect to a controller by name.
 
         """
@@ -200,30 +284,41 @@ class Connection:
         macaroons = get_macaroons() if not password else None
 
         return await cls.connect(
-            endpoint, None, username, password, cacert, macaroons)
+            endpoint, None, username, password, cacert, macaroons, loop)
 
     @classmethod
-    async def connect_model(cls, model):
+    async def connect_model(cls, model, loop=None):
         """Connect to a model by name.
 
-        :param str model: <controller>:<model>
+        :param str model: [<controller>:]<model>
 
         """
-        controller_name, model_name = model.split(':')
-
         jujudata = JujuData()
+
+        if ':' in model:
+            # explicit controller given
+            controller_name, model_name = model.split(':')
+        else:
+            # use the current controller if one isn't explicitly given
+            controller_name = jujudata.current_controller()
+            model_name = model
+
+        accounts = jujudata.accounts()[controller_name]
+        username = accounts['user']
+        # model name must include a user prefix, so add it if it doesn't
+        if '/' not in model_name:
+            model_name = '{}/{}'.format(username, model_name)
+
         controller = jujudata.controllers()[controller_name]
         endpoint = controller['api-endpoints'][0]
         cacert = controller.get('ca-cert')
-        accounts = jujudata.accounts()[controller_name]
-        username = accounts['user']
         password = accounts.get('password')
         models = jujudata.models()[controller_name]
         model_uuid = models['models'][model_name]['uuid']
         macaroons = get_macaroons() if not password else None
 
         return await cls.connect(
-            endpoint, model_uuid, username, password, cacert, macaroons)
+            endpoint, model_uuid, username, password, cacert, macaroons, loop)
 
     def build_facades(self, info):
         self.facades.clear()
@@ -278,6 +373,14 @@ class JujuData:
         output = yaml.safe_load(output)
         return output.get('current-controller', '')
 
+    def current_model(self, controller_name=None):
+        if not controller_name:
+            controller_name = self.current_controller()
+        models = self.models()[controller_name]
+        if 'current-model' not in models:
+            raise JujuError('No current model')
+        return models['current-model']
+
     def controllers(self):
         return self._load_yaml('controllers.yaml', 'controllers')
 
@@ -301,7 +404,7 @@ def get_macaroons():
         cookie_file = os.path.expanduser('~/.go-cookies')
         with open(cookie_file, 'r') as f:
             cookies = json.load(f)
-    except (OSError, ValueError) as e:
+    except (OSError, ValueError):
         log.warn("Couldn't load macaroons from %s", cookie_file)
         return []