Update changelog for 0.4.0
[osm/N2VC.git] / juju / utils.py
index 78322e7..f4db66e 100644 (file)
@@ -1,10 +1,11 @@
 import asyncio
-import concurrent.futures
 import os
+from collections import defaultdict
+from functools import partial
 from pathlib import Path
 
 
-async def execute_process(*cmd, log=None):
+async def execute_process(*cmd, log=None, loop=None):
     '''
     Wrapper around asyncio.create_subprocess_exec.
 
@@ -14,7 +15,7 @@ async def execute_process(*cmd, log=None):
             stdin=asyncio.subprocess.PIPE,
             stdout=asyncio.subprocess.PIPE,
             stderr=asyncio.subprocess.PIPE,
-            )
+            loop=loop)
     stdout, stderr = await p.communicate()
     if log:
         log.debug("Exec %s -> %d", cmd, p.returncode)
@@ -45,8 +46,26 @@ async def read_ssh_key(loop):
     can be passed on to a model.
 
     '''
-    ssh_key = await loop.run_in_executor(
-        concurrent.futures.ThreadPoolExecutor(),
-        _read_ssh_key
-    )
-    return ssh_key
+    return await loop.run_in_executor(None, _read_ssh_key)
+
+
+class IdQueue:
+    """
+    Wrapper around asyncio.Queue that maintains a separate queue for each ID.
+    """
+    def __init__(self, maxsize=0, *, loop=None):
+        self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
+
+    async def get(self, id):
+        value = await self._queues[id].get()
+        del self._queues[id]
+        if isinstance(value, Exception):
+            raise value
+        return value
+
+    async def put(self, id, value):
+        await self._queues[id].put(value)
+
+    async def put_all(self, value):
+        for queue in self._queues.values():
+            await queue.put(value)