blob: 3565fd630c6a3dc3839fe3f382bdf5e1aab0fdc2 [file] [log] [blame]
Adam Israeldcdf82b2017-08-15 15:26:43 -04001import asyncio
2import os
3from collections import defaultdict
4from functools import partial
5from pathlib import Path
6
7
8async def execute_process(*cmd, log=None, loop=None):
9 '''
10 Wrapper around asyncio.create_subprocess_exec.
11
12 '''
13 p = await asyncio.create_subprocess_exec(
Adam Israel1a15d1c2017-10-23 12:00:49 -040014 *cmd,
15 stdin=asyncio.subprocess.PIPE,
16 stdout=asyncio.subprocess.PIPE,
17 stderr=asyncio.subprocess.PIPE,
18 loop=loop)
Adam Israeldcdf82b2017-08-15 15:26:43 -040019 stdout, stderr = await p.communicate()
20 if log:
21 log.debug("Exec %s -> %d", cmd, p.returncode)
22 if stdout:
23 log.debug(stdout.decode('utf-8'))
24 if stderr:
25 log.debug(stderr.decode('utf-8'))
26 return p.returncode == 0
27
28
29def _read_ssh_key():
30 '''
31 Inner function for read_ssh_key, suitable for passing to our
32 Executor.
33
34 '''
35 default_data_dir = Path(Path.home(), ".local", "share", "juju")
36 juju_data = os.environ.get("JUJU_DATA", default_data_dir)
37 ssh_key_path = Path(juju_data, 'ssh', 'juju_id_rsa.pub')
38 with ssh_key_path.open('r') as ssh_key_file:
39 ssh_key = ssh_key_file.readlines()[0].strip()
40 return ssh_key
41
42
43async def read_ssh_key(loop):
44 '''
45 Attempt to read the local juju admin's public ssh key, so that it
46 can be passed on to a model.
47
48 '''
Adam Israelc3e6c2e2018-03-01 09:31:50 -050049 loop = loop or asyncio.get_event_loop()
Adam Israeldcdf82b2017-08-15 15:26:43 -040050 return await loop.run_in_executor(None, _read_ssh_key)
51
52
53class IdQueue:
54 """
55 Wrapper around asyncio.Queue that maintains a separate queue for each ID.
56 """
57 def __init__(self, maxsize=0, *, loop=None):
58 self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
59
60 async def get(self, id):
61 value = await self._queues[id].get()
62 del self._queues[id]
63 if isinstance(value, Exception):
64 raise value
65 return value
66
67 async def put(self, id, value):
68 await self._queues[id].put(value)
69
70 async def put_all(self, value):
71 for queue in self._queues.values():
72 await queue.put(value)
73
74
Adam Israelc3e6c2e2018-03-01 09:31:50 -050075async def block_until(*conditions, timeout=None, wait_period=0.5, loop=None):
76 """Return only after all conditions are true.
77
78 """
79 async def _block():
80 while not all(c() for c in conditions):
81 await asyncio.sleep(wait_period, loop=loop)
82 await asyncio.wait_for(_block(), timeout, loop=loop)
83
84
Adam Israeldcdf82b2017-08-15 15:26:43 -040085async def run_with_interrupt(task, event, loop=None):
86 """
87 Awaits a task while allowing it to be interrupted by an `asyncio.Event`.
88
89 If the task finishes without the event becoming set, the results of the
90 task will be returned. If the event becomes set, the task will be
91 cancelled ``None`` will be returned.
92
93 :param task: Task to run
94 :param event: An `asyncio.Event` which, if set, will interrupt `task`
95 and cause it to be cancelled.
96 :param loop: Optional event loop to use other than the default.
97 """
98 loop = loop or asyncio.get_event_loop()
99 event_task = loop.create_task(event.wait())
100 done, pending = await asyncio.wait([task, event_task],
101 loop=loop,
102 return_when=asyncio.FIRST_COMPLETED)
103 for f in pending:
104 f.cancel()
Adam Israelc3e6c2e2018-03-01 09:31:50 -0500105 exception = [f.exception() for f in done
106 if f is not event_task and f.exception()]
107 if exception:
108 raise exception[0]
Adam Israeldcdf82b2017-08-15 15:26:43 -0400109 result = [f.result() for f in done if f is not event_task]
110 if result:
111 return result[0]
112 else:
113 return None