Update changelog for 0.4.0
[osm/N2VC.git] / juju / utils.py
index b9a9b67..f4db66e 100644 (file)
@@ -1,9 +1,11 @@
 import asyncio
 import os
+from collections import defaultdict
+from functools import partial
 from pathlib import Path
 
 
-async def execute_process(*cmd, log=None):
+async def execute_process(*cmd, log=None, loop=None):
     '''
     Wrapper around asyncio.create_subprocess_exec.
 
@@ -13,7 +15,7 @@ async def execute_process(*cmd, log=None):
             stdin=asyncio.subprocess.PIPE,
             stdout=asyncio.subprocess.PIPE,
             stderr=asyncio.subprocess.PIPE,
-            )
+            loop=loop)
     stdout, stderr = await p.communicate()
     if log:
         log.debug("Exec %s -> %d", cmd, p.returncode)
@@ -24,10 +26,10 @@ async def execute_process(*cmd, log=None):
     return p.returncode == 0
 
 
-def read_ssh_key():
+def _read_ssh_key():
     '''
-    Attempt to read the local juju admin's public ssh key, so that it
-    can be passed on to a model.
+    Inner function for read_ssh_key, suitable for passing to our
+    Executor.
 
     '''
     default_data_dir = Path(Path.home(), ".local", "share", "juju")
@@ -37,3 +39,33 @@ def read_ssh_key():
         ssh_key = ssh_key_file.readlines()[0].strip()
     return ssh_key
 
+
+async def read_ssh_key(loop):
+    '''
+    Attempt to read the local juju admin's public ssh key, so that it
+    can be passed on to a model.
+
+    '''
+    return await loop.run_in_executor(None, _read_ssh_key)
+
+
+class IdQueue:
+    """
+    Wrapper around asyncio.Queue that maintains a separate queue for each ID.
+    """
+    def __init__(self, maxsize=0, *, loop=None):
+        self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
+
+    async def get(self, id):
+        value = await self._queues[id].get()
+        del self._queues[id]
+        if isinstance(value, Exception):
+            raise value
+        return value
+
+    async def put(self, id, value):
+        await self._queues[id].put(value)
+
+    async def put_all(self, value):
+        for queue in self._queues.values():
+            await queue.put(value)