X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=modules%2Flibjuju%2Fjuju%2Futils.py;fp=modules%2Flibjuju%2Fjuju%2Futils.py;h=1d1b24ecd647089ebb335a706350fa61dc29a52a;hp=0000000000000000000000000000000000000000;hb=68858c1915122c2dbc8999a5cd3229694abf5f3a;hpb=032a71b2a6692b8b4e30f629a1f906d246f06736 diff --git a/modules/libjuju/juju/utils.py b/modules/libjuju/juju/utils.py new file mode 100644 index 0000000..1d1b24e --- /dev/null +++ b/modules/libjuju/juju/utils.py @@ -0,0 +1,98 @@ +import asyncio +import os +from collections import defaultdict +from functools import partial +from pathlib import Path + + +async def execute_process(*cmd, log=None, loop=None): + ''' + Wrapper around asyncio.create_subprocess_exec. + + ''' + p = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + loop=loop) + stdout, stderr = await p.communicate() + if log: + log.debug("Exec %s -> %d", cmd, p.returncode) + if stdout: + log.debug(stdout.decode('utf-8')) + if stderr: + log.debug(stderr.decode('utf-8')) + return p.returncode == 0 + + +def _read_ssh_key(): + ''' + Inner function for read_ssh_key, suitable for passing to our + Executor. + + ''' + default_data_dir = Path(Path.home(), ".local", "share", "juju") + juju_data = os.environ.get("JUJU_DATA", default_data_dir) + ssh_key_path = Path(juju_data, 'ssh', 'juju_id_rsa.pub') + with ssh_key_path.open('r') as ssh_key_file: + ssh_key = ssh_key_file.readlines()[0].strip() + return ssh_key + + +async def read_ssh_key(loop): + ''' + Attempt to read the local juju admin's public ssh key, so that it + can be passed on to a model. + + ''' + return await loop.run_in_executor(None, _read_ssh_key) + + +class IdQueue: + """ + Wrapper around asyncio.Queue that maintains a separate queue for each ID. + """ + def __init__(self, maxsize=0, *, loop=None): + self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop)) + + async def get(self, id): + value = await self._queues[id].get() + del self._queues[id] + if isinstance(value, Exception): + raise value + return value + + async def put(self, id, value): + await self._queues[id].put(value) + + async def put_all(self, value): + for queue in self._queues.values(): + await queue.put(value) + + +async def run_with_interrupt(task, event, loop=None): + """ + Awaits a task while allowing it to be interrupted by an `asyncio.Event`. + + If the task finishes without the event becoming set, the results of the + task will be returned. If the event becomes set, the task will be + cancelled ``None`` will be returned. + + :param task: Task to run + :param event: An `asyncio.Event` which, if set, will interrupt `task` + and cause it to be cancelled. + :param loop: Optional event loop to use other than the default. + """ + loop = loop or asyncio.get_event_loop() + event_task = loop.create_task(event.wait()) + done, pending = await asyncio.wait([task, event_task], + loop=loop, + return_when=asyncio.FIRST_COMPLETED) + for f in pending: + f.cancel() + result = [f.result() for f in done if f is not event_task] + if result: + return result[0] + else: + return None