+ async def recv(self, request_id):
+ if not self.is_open:
+ raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
+ return await self.messages.get(request_id)
+
+ async def receiver(self):
+ try:
+ while self.is_open:
+ result = await utils.run_with_interrupt(
+ self.ws.recv(),
+ self.monitor.close_called,
+ loop=self.loop)
+ if self.monitor.close_called.is_set():
+ break
+ if result is not None:
+ result = json.loads(result)
+ await self.messages.put(result['request-id'], result)
+ except CancelledError:
+ pass
+ except websockets.ConnectionClosed as e:
+ log.warning('Receiver: Connection closed, reconnecting')
+ await self.messages.put_all(e)
+ # the reconnect has to be done as a task because the receiver will
+ # be cancelled by the reconnect and we don't want the reconnect
+ # to be aborted half-way through
+ self.loop.create_task(self.reconnect())
+ return
+ except Exception as e:
+ log.exception("Error in receiver")
+ # make pending listeners aware of the error
+ await self.messages.put_all(e)
+ raise
+ finally:
+ self.monitor.receiver_stopped.set()
+
+ async def pinger(self):
+ '''
+ A Controller can time us out if we are silent for too long. This
+ is especially true in JaaS, which has a fairly strict timeout.
+
+ To prevent timing out, we send a ping every ten seconds.
+
+ '''
+ async def _do_ping():
+ try:
+ await pinger_facade.Ping()
+ await asyncio.sleep(10, loop=self.loop)
+ except CancelledError:
+ pass
+
+ pinger_facade = client.PingerFacade.from_connection(self)
+ try:
+ while True:
+ await utils.run_with_interrupt(
+ _do_ping(),
+ self.monitor.close_called,
+ loop=self.loop)
+ if self.monitor.close_called.is_set():
+ break
+ finally:
+ self.monitor.pinger_stopped.set()
+ return
+
+ async def rpc(self, msg, encoder=None):
+ self.__request_id__ += 1
+ msg['request-id'] = self.__request_id__
+ if'params' not in msg:
+ msg['params'] = {}
+ if "version" not in msg:
+ msg['version'] = self.facades[msg['type']]
+ outgoing = json.dumps(msg, indent=2, cls=encoder)
+ for attempt in range(3):
+ try:
+ await self.ws.send(outgoing)
+ break
+ except websockets.ConnectionClosed:
+ if attempt == 2:
+ raise
+ log.warning('RPC: Connection closed, reconnecting')
+ # the reconnect has to be done in a separate task because,
+ # if it is triggered by the pinger, then this RPC call will
+ # be cancelled when the pinger is cancelled by the reconnect,
+ # and we don't want the reconnect to be aborted halfway through
+ await asyncio.wait([self.reconnect()], loop=self.loop)
+ result = await self.recv(msg['request-id'])
+
+ if not result:
+ return result
+
+ if 'error' in result:
+ # API Error Response
+ raise JujuAPIError(result)
+
+ if 'response' not in result:
+ # This may never happen
+ return result
+
+ if 'results' in result['response']:
+ # Check for errors in a result list.
+ errors = []
+ for res in result['response']['results']:
+ if res.get('error', {}).get('message'):
+ errors.append(res['error']['message'])
+ if errors:
+ raise JujuError(errors)
+
+ elif result['response'].get('error', {}).get('message'):
+ raise JujuError(result['response']['error']['message'])