3 from collections
import defaultdict
4 from functools
import partial
5 from pathlib
import Path
8 async def execute_process(*cmd
, log
=None, loop
=None):
10 Wrapper around asyncio.create_subprocess_exec.
13 p
= await asyncio
.create_subprocess_exec(
15 stdin
=asyncio
.subprocess
.PIPE
,
16 stdout
=asyncio
.subprocess
.PIPE
,
17 stderr
=asyncio
.subprocess
.PIPE
,
19 stdout
, stderr
= await p
.communicate()
21 log
.debug("Exec %s -> %d", cmd
, p
.returncode
)
23 log
.debug(stdout
.decode('utf-8'))
25 log
.debug(stderr
.decode('utf-8'))
26 return p
.returncode
== 0
31 Inner function for read_ssh_key, suitable for passing to our
35 default_data_dir
= Path(Path
.home(), ".local", "share", "juju")
36 juju_data
= os
.environ
.get("JUJU_DATA", default_data_dir
)
37 ssh_key_path
= Path(juju_data
, 'ssh', 'juju_id_rsa.pub')
38 with ssh_key_path
.open('r') as ssh_key_file
:
39 ssh_key
= ssh_key_file
.readlines()[0].strip()
43 async def read_ssh_key(loop
):
45 Attempt to read the local juju admin's public ssh key, so that it
46 can be passed on to a model.
49 loop
= loop
or asyncio
.get_event_loop()
50 return await loop
.run_in_executor(None, _read_ssh_key
)
55 Wrapper around asyncio.Queue that maintains a separate queue for each ID.
57 def __init__(self
, maxsize
=0, *, loop
=None):
58 self
._queues
= defaultdict(partial(asyncio
.Queue
, maxsize
, loop
=loop
))
60 async def get(self
, id):
61 value
= await self
._queues
[id].get()
63 if isinstance(value
, Exception):
67 async def put(self
, id, value
):
68 await self
._queues
[id].put(value
)
70 async def put_all(self
, value
):
71 for queue
in self
._queues
.values():
72 await queue
.put(value
)
75 async def block_until(*conditions
, timeout
=None, wait_period
=0.5, loop
=None):
76 """Return only after all conditions are true.
80 while not all(c() for c
in conditions
):
81 await asyncio
.sleep(wait_period
, loop
=loop
)
82 await asyncio
.wait_for(_block(), timeout
, loop
=loop
)
85 async def run_with_interrupt(task
, event
, loop
=None):
87 Awaits a task while allowing it to be interrupted by an `asyncio.Event`.
89 If the task finishes without the event becoming set, the results of the
90 task will be returned. If the event becomes set, the task will be
91 cancelled ``None`` will be returned.
93 :param task: Task to run
94 :param event: An `asyncio.Event` which, if set, will interrupt `task`
95 and cause it to be cancelled.
96 :param loop: Optional event loop to use other than the default.
98 loop
= loop
or asyncio
.get_event_loop()
99 event_task
= loop
.create_task(event
.wait())
100 done
, pending
= await asyncio
.wait([task
, event_task
],
102 return_when
=asyncio
.FIRST_COMPLETED
)
105 exception
= [f
.exception() for f
in done
106 if f
is not event_task
and f
.exception()]
109 result
= [f
.result() for f
in done
if f
is not event_task
]