import asyncio
-import concurrent.futures
import os
+from collections import defaultdict
+from functools import partial
from pathlib import Path
-async def execute_process(*cmd, log=None):
+async def execute_process(*cmd, log=None, loop=None):
'''
Wrapper around asyncio.create_subprocess_exec.
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
- )
+ loop=loop)
stdout, stderr = await p.communicate()
if log:
log.debug("Exec %s -> %d", cmd, p.returncode)
can be passed on to a model.
'''
- ssh_key = await loop.run_in_executor(
- concurrent.futures.ThreadPoolExecutor(),
- _read_ssh_key
- )
- return ssh_key
+ return await loop.run_in_executor(None, _read_ssh_key)
+
+
+class IdQueue:
+ """
+ Wrapper around asyncio.Queue that maintains a separate queue for each ID.
+ """
+ def __init__(self, maxsize=0, *, loop=None):
+ self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
+
+ async def get(self, id):
+ value = await self._queues[id].get()
+ del self._queues[id]
+ return value
+
+ async def put(self, id, value):
+ await self._queues[id].put(value)