3 from collections
import defaultdict
4 from functools
import partial
5 from pathlib
import Path
8 async def execute_process(*cmd
, log
=None, loop
=None):
10 Wrapper around asyncio.create_subprocess_exec.
13 p
= await asyncio
.create_subprocess_exec(
15 stdin
=asyncio
.subprocess
.PIPE
,
16 stdout
=asyncio
.subprocess
.PIPE
,
17 stderr
=asyncio
.subprocess
.PIPE
,
19 stdout
, stderr
= await p
.communicate()
21 log
.debug("Exec %s -> %d", cmd
, p
.returncode
)
23 log
.debug(stdout
.decode('utf-8'))
25 log
.debug(stderr
.decode('utf-8'))
26 return p
.returncode
== 0
31 Inner function for read_ssh_key, suitable for passing to our
35 default_data_dir
= Path(Path
.home(), ".local", "share", "juju")
36 juju_data
= os
.environ
.get("JUJU_DATA", default_data_dir
)
37 ssh_key_path
= Path(juju_data
, 'ssh', 'juju_id_rsa.pub')
38 with ssh_key_path
.open('r') as ssh_key_file
:
39 ssh_key
= ssh_key_file
.readlines()[0].strip()
43 async def read_ssh_key(loop
):
45 Attempt to read the local juju admin's public ssh key, so that it
46 can be passed on to a model.
49 return await loop
.run_in_executor(None, _read_ssh_key
)
54 Wrapper around asyncio.Queue that maintains a separate queue for each ID.
56 def __init__(self
, maxsize
=0, *, loop
=None):
57 self
._queues
= defaultdict(partial(asyncio
.Queue
, maxsize
, loop
=loop
))
59 async def get(self
, id):
60 value
= await self
._queues
[id].get()
62 if isinstance(value
, Exception):
66 async def put(self
, id, value
):
67 await self
._queues
[id].put(value
)
69 async def put_all(self
, value
):
70 for queue
in self
._queues
.values():
71 await queue
.put(value
)
74 async def run_with_interrupt(task
, event
, loop
=None):
76 Awaits a task while allowing it to be interrupted by an `asyncio.Event`.
78 If the task finishes without the event becoming set, the results of the
79 task will be returned. If the event becomes set, the task will be
80 cancelled ``None`` will be returned.
82 :param task: Task to run
83 :param event: An `asyncio.Event` which, if set, will interrupt `task`
84 and cause it to be cancelled.
85 :param loop: Optional event loop to use other than the default.
87 loop
= loop
or asyncio
.get_event_loop()
88 event_task
= loop
.create_task(event
.wait())
89 done
, pending
= await asyncio
.wait([task
, event_task
],
91 return_when
=asyncio
.FIRST_COMPLETED
)
94 result
= [f
.result() for f
in done
if f
is not event_task
]