1d1b24ecd647089ebb335a706350fa61dc29a52a
[osm/N2VC.git] / modules / libjuju / juju / utils.py
1 import asyncio
2 import os
3 from collections import defaultdict
4 from functools import partial
5 from pathlib import Path
6
7
8 async def execute_process(*cmd, log=None, loop=None):
9 '''
10 Wrapper around asyncio.create_subprocess_exec.
11
12 '''
13 p = await asyncio.create_subprocess_exec(
14 *cmd,
15 stdin=asyncio.subprocess.PIPE,
16 stdout=asyncio.subprocess.PIPE,
17 stderr=asyncio.subprocess.PIPE,
18 loop=loop)
19 stdout, stderr = await p.communicate()
20 if log:
21 log.debug("Exec %s -> %d", cmd, p.returncode)
22 if stdout:
23 log.debug(stdout.decode('utf-8'))
24 if stderr:
25 log.debug(stderr.decode('utf-8'))
26 return p.returncode == 0
27
28
29 def _read_ssh_key():
30 '''
31 Inner function for read_ssh_key, suitable for passing to our
32 Executor.
33
34 '''
35 default_data_dir = Path(Path.home(), ".local", "share", "juju")
36 juju_data = os.environ.get("JUJU_DATA", default_data_dir)
37 ssh_key_path = Path(juju_data, 'ssh', 'juju_id_rsa.pub')
38 with ssh_key_path.open('r') as ssh_key_file:
39 ssh_key = ssh_key_file.readlines()[0].strip()
40 return ssh_key
41
42
43 async def read_ssh_key(loop):
44 '''
45 Attempt to read the local juju admin's public ssh key, so that it
46 can be passed on to a model.
47
48 '''
49 return await loop.run_in_executor(None, _read_ssh_key)
50
51
52 class IdQueue:
53 """
54 Wrapper around asyncio.Queue that maintains a separate queue for each ID.
55 """
56 def __init__(self, maxsize=0, *, loop=None):
57 self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
58
59 async def get(self, id):
60 value = await self._queues[id].get()
61 del self._queues[id]
62 if isinstance(value, Exception):
63 raise value
64 return value
65
66 async def put(self, id, value):
67 await self._queues[id].put(value)
68
69 async def put_all(self, value):
70 for queue in self._queues.values():
71 await queue.put(value)
72
73
74 async def run_with_interrupt(task, event, loop=None):
75 """
76 Awaits a task while allowing it to be interrupted by an `asyncio.Event`.
77
78 If the task finishes without the event becoming set, the results of the
79 task will be returned. If the event becomes set, the task will be
80 cancelled ``None`` will be returned.
81
82 :param task: Task to run
83 :param event: An `asyncio.Event` which, if set, will interrupt `task`
84 and cause it to be cancelled.
85 :param loop: Optional event loop to use other than the default.
86 """
87 loop = loop or asyncio.get_event_loop()
88 event_task = loop.create_task(event.wait())
89 done, pending = await asyncio.wait([task, event_task],
90 loop=loop,
91 return_when=asyncio.FIRST_COMPLETED)
92 for f in pending:
93 f.cancel()
94 result = [f.result() for f in done if f is not event_task]
95 if result:
96 return result[0]
97 else:
98 return None