Fix bug 601
[osm/N2VC.git] / modules / libjuju / juju / utils.py
1 import asyncio
2 import os
3 from collections import defaultdict
4 from functools import partial
5 from pathlib import Path
6
7
8 async def execute_process(*cmd, log=None, loop=None):
9 '''
10 Wrapper around asyncio.create_subprocess_exec.
11
12 '''
13 p = await asyncio.create_subprocess_exec(
14 *cmd,
15 stdin=asyncio.subprocess.PIPE,
16 stdout=asyncio.subprocess.PIPE,
17 stderr=asyncio.subprocess.PIPE,
18 loop=loop)
19 stdout, stderr = await p.communicate()
20 if log:
21 log.debug("Exec %s -> %d", cmd, p.returncode)
22 if stdout:
23 log.debug(stdout.decode('utf-8'))
24 if stderr:
25 log.debug(stderr.decode('utf-8'))
26 return p.returncode == 0
27
28
29 def _read_ssh_key():
30 '''
31 Inner function for read_ssh_key, suitable for passing to our
32 Executor.
33
34 '''
35 default_data_dir = Path(Path.home(), ".local", "share", "juju")
36 juju_data = os.environ.get("JUJU_DATA", default_data_dir)
37 ssh_key_path = Path(juju_data, 'ssh', 'juju_id_rsa.pub')
38 with ssh_key_path.open('r') as ssh_key_file:
39 ssh_key = ssh_key_file.readlines()[0].strip()
40 return ssh_key
41
42
43 async def read_ssh_key(loop):
44 '''
45 Attempt to read the local juju admin's public ssh key, so that it
46 can be passed on to a model.
47
48 '''
49 loop = loop or asyncio.get_event_loop()
50 return await loop.run_in_executor(None, _read_ssh_key)
51
52
53 class IdQueue:
54 """
55 Wrapper around asyncio.Queue that maintains a separate queue for each ID.
56 """
57 def __init__(self, maxsize=0, *, loop=None):
58 self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
59
60 async def get(self, id):
61 value = await self._queues[id].get()
62 del self._queues[id]
63 if isinstance(value, Exception):
64 raise value
65 return value
66
67 async def put(self, id, value):
68 await self._queues[id].put(value)
69
70 async def put_all(self, value):
71 for queue in self._queues.values():
72 await queue.put(value)
73
74
75 async def block_until(*conditions, timeout=None, wait_period=0.5, loop=None):
76 """Return only after all conditions are true.
77
78 """
79 async def _block():
80 while not all(c() for c in conditions):
81 await asyncio.sleep(wait_period, loop=loop)
82 await asyncio.wait_for(_block(), timeout, loop=loop)
83
84
85 async def run_with_interrupt(task, event, loop=None):
86 """
87 Awaits a task while allowing it to be interrupted by an `asyncio.Event`.
88
89 If the task finishes without the event becoming set, the results of the
90 task will be returned. If the event becomes set, the task will be
91 cancelled ``None`` will be returned.
92
93 :param task: Task to run
94 :param event: An `asyncio.Event` which, if set, will interrupt `task`
95 and cause it to be cancelled.
96 :param loop: Optional event loop to use other than the default.
97 """
98 loop = loop or asyncio.get_event_loop()
99 event_task = loop.create_task(event.wait())
100 done, pending = await asyncio.wait([task, event_task],
101 loop=loop,
102 return_when=asyncio.FIRST_COMPLETED)
103 for f in pending:
104 f.cancel()
105 exception = [f.exception() for f in done
106 if f is not event_task and f.exception()]
107 if exception:
108 raise exception[0]
109 result = [f.result() for f in done if f is not event_task]
110 if result:
111 return result[0]
112 else:
113 return None