Improve error handling
[osm/N2VC.git] / juju / client / connection.py
index 7119292..b9eb3bc 100644 (file)
@@ -16,6 +16,7 @@ import yaml
 
 from juju import tag
 from juju.errors import JujuError, JujuAPIError, JujuConnectionError
+from juju.utils import IdQueue
 
 log = logging.getLogger("websocket")
 
@@ -52,6 +53,7 @@ class Connection:
         self.addr = None
         self.ws = None
         self.facades = {}
+        self.messages = IdQueue(loop=self.loop)
 
     @property
     def is_open(self):
@@ -74,17 +76,30 @@ class Connection:
         kw['loop'] = self.loop
         self.addr = url
         self.ws = await websockets.connect(url, **kw)
+        self.loop.create_task(self.receiver())
         log.info("Driver connected to juju %s", url)
         return self
 
     async def close(self):
         await self.ws.close()
 
-    async def recv(self):
-        result = await self.ws.recv()
-        if result is not None:
-            result = json.loads(result)
-        return result
+    async def recv(self, request_id):
+        if not self.is_open:
+            raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
+        return await self.messages.get(request_id)
+
+    async def receiver(self):
+        while self.is_open:
+            try:
+                result = await self.ws.recv()
+                if result is not None:
+                    result = json.loads(result)
+                    await self.messages.put(result['request-id'], result)
+            except Exception as e:
+                await self.messages.put_all(e)
+                raise
+        await self.messages.put_all(websockets.exceptions.ConnectionClosed(
+            0, 'websocket closed'))
 
     async def rpc(self, msg, encoder=None):
         self.__request_id__ += 1
@@ -95,9 +110,31 @@ class Connection:
             msg['version'] = self.facades[msg['type']]
         outgoing = json.dumps(msg, indent=2, cls=encoder)
         await self.ws.send(outgoing)
-        result = await self.recv()
-        if result and 'error' in result:
+        result = await self.recv(msg['request-id'])
+
+        if not result:
+            return result
+
+        if 'error' in result:
+            # API Error Response
             raise JujuAPIError(result)
+
+        if 'response' not in result:
+            # This may never happen
+            return result
+
+        if 'results' in result['response']:
+            # Check for errors in a result list.
+            errors = []
+            for res in result['response']['results']:
+                if res.get('error', {}).get('message'):
+                    errors.append(res['error']['message'])
+            if errors:
+                raise JujuError(errors)
+
+        elif result['response'].get('error', {}).get('message'):
+            raise JujuError(result['response']['error']['message'])
+
         return result
 
     def http_headers(self):