from juju import tag
from juju.errors import JujuError, JujuAPIError, JujuConnectionError
+from juju.utils import IdQueue
log = logging.getLogger("websocket")
self.addr = None
self.ws = None
self.facades = {}
+ self.messages = IdQueue(loop=self.loop)
@property
def is_open(self):
kw['loop'] = self.loop
self.addr = url
self.ws = await websockets.connect(url, **kw)
+ self.loop.create_task(self.receiver())
log.info("Driver connected to juju %s", url)
return self
async def close(self):
await self.ws.close()
- async def recv(self):
- result = await self.ws.recv()
- if result is not None:
- result = json.loads(result)
- return result
+ async def recv(self, request_id):
+ return await self.messages.get(request_id)
+
+ async def receiver(self):
+ while self.is_open:
+ result = await self.ws.recv()
+ if result is not None:
+ result = json.loads(result)
+ await self.messages.put(result['request-id'], result)
async def rpc(self, msg, encoder=None):
self.__request_id__ += 1
msg['version'] = self.facades[msg['type']]
outgoing = json.dumps(msg, indent=2, cls=encoder)
await self.ws.send(outgoing)
- result = await self.recv()
+ result = await self.recv(msg['request-id'])
if not result:
return result