Update changelog for 0.4.0
[osm/N2VC.git] / juju / utils.py
index c0a500c..f4db66e 100644 (file)
@@ -1,5 +1,7 @@
 import asyncio
 import os
+from collections import defaultdict
+from functools import partial
 from pathlib import Path
 
 
@@ -45,3 +47,25 @@ async def read_ssh_key(loop):
 
     '''
     return await loop.run_in_executor(None, _read_ssh_key)
+
+
+class IdQueue:
+    """
+    Wrapper around asyncio.Queue that maintains a separate queue for each ID.
+    """
+    def __init__(self, maxsize=0, *, loop=None):
+        self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
+
+    async def get(self, id):
+        value = await self._queues[id].get()
+        del self._queues[id]
+        if isinstance(value, Exception):
+            raise value
+        return value
+
+    async def put(self, id, value):
+        await self._queues[id].put(value)
+
+    async def put_all(self, value):
+        for queue in self._queues.values():
+            await queue.put(value)