X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Futils.py;h=f4db66e723c850770b24ccfb90ce17484f72a517;hb=c8a68279c2043d7b3d3789a881826d52074af04e;hp=b9a9b6707a1f6ea5f45d40b55e0d5015a766e4dd;hpb=e42708b6ce89b35b88ae526065951f994128ceed;p=osm%2FN2VC.git diff --git a/juju/utils.py b/juju/utils.py index b9a9b67..f4db66e 100644 --- a/juju/utils.py +++ b/juju/utils.py @@ -1,9 +1,11 @@ import asyncio import os +from collections import defaultdict +from functools import partial from pathlib import Path -async def execute_process(*cmd, log=None): +async def execute_process(*cmd, log=None, loop=None): ''' Wrapper around asyncio.create_subprocess_exec. @@ -13,7 +15,7 @@ async def execute_process(*cmd, log=None): stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - ) + loop=loop) stdout, stderr = await p.communicate() if log: log.debug("Exec %s -> %d", cmd, p.returncode) @@ -24,10 +26,10 @@ async def execute_process(*cmd, log=None): return p.returncode == 0 -def read_ssh_key(): +def _read_ssh_key(): ''' - Attempt to read the local juju admin's public ssh key, so that it - can be passed on to a model. + Inner function for read_ssh_key, suitable for passing to our + Executor. ''' default_data_dir = Path(Path.home(), ".local", "share", "juju") @@ -37,3 +39,33 @@ def read_ssh_key(): ssh_key = ssh_key_file.readlines()[0].strip() return ssh_key + +async def read_ssh_key(loop): + ''' + Attempt to read the local juju admin's public ssh key, so that it + can be passed on to a model. + + ''' + return await loop.run_in_executor(None, _read_ssh_key) + + +class IdQueue: + """ + Wrapper around asyncio.Queue that maintains a separate queue for each ID. + """ + def __init__(self, maxsize=0, *, loop=None): + self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop)) + + async def get(self, id): + value = await self._queues[id].get() + del self._queues[id] + if isinstance(value, Exception): + raise value + return value + + async def put(self, id, value): + await self._queues[id].put(value) + + async def put_all(self, value): + for queue in self._queues.values(): + await queue.put(value)