Merge commit '19031b24b523c872c1ac367821dc60c950a09755' as 'modules/libjuju'
[osm/N2VC.git] / modules / libjuju / juju / utils.py
diff --git a/modules/libjuju/juju/utils.py b/modules/libjuju/juju/utils.py
new file mode 100644 (file)
index 0000000..1d1b24e
--- /dev/null
@@ -0,0 +1,98 @@
+import asyncio
+import os
+from collections import defaultdict
+from functools import partial
+from pathlib import Path
+
+
+async def execute_process(*cmd, log=None, loop=None):
+    '''
+    Wrapper around asyncio.create_subprocess_exec.
+
+    '''
+    p = await asyncio.create_subprocess_exec(
+            *cmd,
+            stdin=asyncio.subprocess.PIPE,
+            stdout=asyncio.subprocess.PIPE,
+            stderr=asyncio.subprocess.PIPE,
+            loop=loop)
+    stdout, stderr = await p.communicate()
+    if log:
+        log.debug("Exec %s -> %d", cmd, p.returncode)
+        if stdout:
+            log.debug(stdout.decode('utf-8'))
+        if stderr:
+            log.debug(stderr.decode('utf-8'))
+    return p.returncode == 0
+
+
+def _read_ssh_key():
+    '''
+    Inner function for read_ssh_key, suitable for passing to our
+    Executor.
+
+    '''
+    default_data_dir = Path(Path.home(), ".local", "share", "juju")
+    juju_data = os.environ.get("JUJU_DATA", default_data_dir)
+    ssh_key_path = Path(juju_data, 'ssh', 'juju_id_rsa.pub')
+    with ssh_key_path.open('r') as ssh_key_file:
+        ssh_key = ssh_key_file.readlines()[0].strip()
+    return ssh_key
+
+
+async def read_ssh_key(loop):
+    '''
+    Attempt to read the local juju admin's public ssh key, so that it
+    can be passed on to a model.
+
+    '''
+    return await loop.run_in_executor(None, _read_ssh_key)
+
+
+class IdQueue:
+    """
+    Wrapper around asyncio.Queue that maintains a separate queue for each ID.
+    """
+    def __init__(self, maxsize=0, *, loop=None):
+        self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
+
+    async def get(self, id):
+        value = await self._queues[id].get()
+        del self._queues[id]
+        if isinstance(value, Exception):
+            raise value
+        return value
+
+    async def put(self, id, value):
+        await self._queues[id].put(value)
+
+    async def put_all(self, value):
+        for queue in self._queues.values():
+            await queue.put(value)
+
+
+async def run_with_interrupt(task, event, loop=None):
+    """
+    Awaits a task while allowing it to be interrupted by an `asyncio.Event`.
+
+    If the task finishes without the event becoming set, the results of the
+    task will be returned.  If the event becomes set, the task will be
+    cancelled ``None`` will be returned.
+
+    :param task: Task to run
+    :param event: An `asyncio.Event` which, if set, will interrupt `task`
+        and cause it to be cancelled.
+    :param loop: Optional event loop to use other than the default.
+    """
+    loop = loop or asyncio.get_event_loop()
+    event_task = loop.create_task(event.wait())
+    done, pending = await asyncio.wait([task, event_task],
+                                       loop=loop,
+                                       return_when=asyncio.FIRST_COMPLETED)
+    for f in pending:
+        f.cancel()
+    result = [f.result() for f in done if f is not event_task]
+    if result:
+        return result[0]
+    else:
+        return None