3 from collections
import defaultdict
4 from functools
import partial
5 from pathlib
import Path
7 from pyasn1
.type import univ
, char
8 from pyasn1
.codec
.der
.encoder
import encode
11 async def execute_process(*cmd
, log
=None, loop
=None):
13 Wrapper around asyncio.create_subprocess_exec.
16 p
= await asyncio
.create_subprocess_exec(
18 stdin
=asyncio
.subprocess
.PIPE
,
19 stdout
=asyncio
.subprocess
.PIPE
,
20 stderr
=asyncio
.subprocess
.PIPE
,
22 stdout
, stderr
= await p
.communicate()
24 log
.debug("Exec %s -> %d", cmd
, p
.returncode
)
26 log
.debug(stdout
.decode('utf-8'))
28 log
.debug(stderr
.decode('utf-8'))
29 return p
.returncode
== 0
34 Inner function for read_ssh_key, suitable for passing to our
38 default_data_dir
= Path(Path
.home(), ".local", "share", "juju")
39 juju_data
= os
.environ
.get("JUJU_DATA", default_data_dir
)
40 ssh_key_path
= Path(juju_data
, 'ssh', 'juju_id_rsa.pub')
41 with ssh_key_path
.open('r') as ssh_key_file
:
42 ssh_key
= ssh_key_file
.readlines()[0].strip()
46 async def read_ssh_key(loop
):
48 Attempt to read the local juju admin's public ssh key, so that it
49 can be passed on to a model.
52 loop
= loop
or asyncio
.get_event_loop()
53 return await loop
.run_in_executor(None, _read_ssh_key
)
58 Wrapper around asyncio.Queue that maintains a separate queue for each ID.
60 def __init__(self
, maxsize
=0, *, loop
=None):
61 self
._queues
= defaultdict(partial(asyncio
.Queue
, maxsize
, loop
=loop
))
63 async def get(self
, id):
64 value
= await self
._queues
[id].get()
66 if isinstance(value
, Exception):
70 async def put(self
, id, value
):
71 await self
._queues
[id].put(value
)
73 async def put_all(self
, value
):
74 for queue
in self
._queues
.values():
75 await queue
.put(value
)
78 async def block_until(*conditions
, timeout
=None, wait_period
=0.5, loop
=None):
79 """Return only after all conditions are true.
83 while not all(c() for c
in conditions
):
84 await asyncio
.sleep(wait_period
, loop
=loop
)
85 await asyncio
.wait_for(_block(), timeout
, loop
=loop
)
88 async def run_with_interrupt(task
, *events
, loop
=None):
90 Awaits a task while allowing it to be interrupted by one or more
93 If the task finishes without the events becoming set, the results of the
94 task will be returned. If the event become set, the task will be cancelled
95 ``None`` will be returned.
97 :param task: Task to run
98 :param events: One or more `asyncio.Event`s which, if set, will interrupt
99 `task` and cause it to be cancelled.
100 :param loop: Optional event loop to use other than the default.
102 loop
= loop
or asyncio
.get_event_loop()
103 task
= asyncio
.ensure_future(task
, loop
=loop
)
104 event_tasks
= [loop
.create_task(event
.wait()) for event
in events
]
105 done
, pending
= await asyncio
.wait([task
] + event_tasks
,
107 return_when
=asyncio
.FIRST_COMPLETED
)
109 f
.cancel() # cancel unfinished tasks
111 f
.exception() # prevent "exception was not retrieved" errors
113 return task
.result() # may raise exception
118 class Addrs(univ
.SequenceOf
):
119 componentType
= char
.PrintableString()
122 class RegistrationInfo(univ
.Sequence
):
124 ASN.1 representation of:
126 type RegistrationInfo struct {
133 ControllerName string
139 def generate_user_controller_access_token(username
, controller_endpoints
, secret_key
, controller_name
):
140 """" Implement in python what is currently done in GO
141 https://github.com/juju/juju/blob/a5ab92ec9b7f5da3678d9ac603fe52d45af24412/cmd/juju/user/utils.go#L16
143 :param username: name of the user to register
144 :param controller_endpoints: juju controller endpoints list in the format <ip>:<port>
145 :param secret_key: base64 encoded string of the secret-key generated by juju
146 :param controller_name: name of the controller to register to.
149 # Secret key is returned as base64 encoded string in:
150 # https://websockets.readthedocs.io/en/stable/_modules/websockets/protocol.html#WebSocketCommonProtocol.recv
151 # Deconding it before marshalling into the ASN.1 message
152 secret_key
= base64
.b64decode(secret_key
)
154 for endpoint
in controller_endpoints
:
155 addr
.append(endpoint
)
157 registration_string
= RegistrationInfo()
158 registration_string
.setComponentByPosition(0, char
.PrintableString(username
))
159 registration_string
.setComponentByPosition(1, addr
)
160 registration_string
.setComponentByPosition(2, univ
.OctetString(secret_key
))
161 registration_string
.setComponentByPosition(3, char
.PrintableString(controller_name
))
162 registration_string
= encode(registration_string
)
163 remainder
= len(registration_string
) % 3
164 registration_string
+= b
"\0" * (3 - remainder
)
165 return base64
.urlsafe_b64encode(registration_string
)