+++ /dev/null
-import asyncio
-import os
-from collections import defaultdict
-from functools import partial
-from pathlib import Path
-import base64
-from pyasn1.type import univ, char
-from pyasn1.codec.der.encoder import encode
-
-
-async def execute_process(*cmd, log=None, loop=None):
- '''
- Wrapper around asyncio.create_subprocess_exec.
-
- '''
- p = await asyncio.create_subprocess_exec(
- *cmd,
- stdin=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- loop=loop)
- stdout, stderr = await p.communicate()
- if log:
- log.debug("Exec %s -> %d", cmd, p.returncode)
- if stdout:
- log.debug(stdout.decode('utf-8'))
- if stderr:
- log.debug(stderr.decode('utf-8'))
- return p.returncode == 0
-
-
-def _read_ssh_key():
- '''
- Inner function for read_ssh_key, suitable for passing to our
- Executor.
-
- '''
- default_data_dir = Path(Path.home(), ".local", "share", "juju")
- juju_data = os.environ.get("JUJU_DATA", default_data_dir)
- ssh_key_path = Path(juju_data, 'ssh', 'juju_id_rsa.pub')
- with ssh_key_path.open('r') as ssh_key_file:
- ssh_key = ssh_key_file.readlines()[0].strip()
- return ssh_key
-
-
-async def read_ssh_key(loop):
- '''
- Attempt to read the local juju admin's public ssh key, so that it
- can be passed on to a model.
-
- '''
- loop = loop or asyncio.get_event_loop()
- return await loop.run_in_executor(None, _read_ssh_key)
-
-
-class IdQueue:
- """
- Wrapper around asyncio.Queue that maintains a separate queue for each ID.
- """
- def __init__(self, maxsize=0, *, loop=None):
- self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
-
- async def get(self, id):
- value = await self._queues[id].get()
- del self._queues[id]
- if isinstance(value, Exception):
- raise value
- return value
-
- async def put(self, id, value):
- await self._queues[id].put(value)
-
- async def put_all(self, value):
- for queue in self._queues.values():
- await queue.put(value)
-
-
-async def block_until(*conditions, timeout=None, wait_period=0.5, loop=None):
- """Return only after all conditions are true.
-
- """
- async def _block():
- while not all(c() for c in conditions):
- await asyncio.sleep(wait_period, loop=loop)
- await asyncio.wait_for(_block(), timeout, loop=loop)
-
-
-async def run_with_interrupt(task, *events, loop=None):
- """
- Awaits a task while allowing it to be interrupted by one or more
- `asyncio.Event`s.
-
- If the task finishes without the events becoming set, the results of the
- task will be returned. If the event become set, the task will be cancelled
- ``None`` will be returned.
-
- :param task: Task to run
- :param events: One or more `asyncio.Event`s which, if set, will interrupt
- `task` and cause it to be cancelled.
- :param loop: Optional event loop to use other than the default.
- """
- loop = loop or asyncio.get_event_loop()
- task = asyncio.ensure_future(task, loop=loop)
- event_tasks = [loop.create_task(event.wait()) for event in events]
- done, pending = await asyncio.wait([task] + event_tasks,
- loop=loop,
- return_when=asyncio.FIRST_COMPLETED)
- for f in pending:
- f.cancel() # cancel unfinished tasks
- for f in done:
- f.exception() # prevent "exception was not retrieved" errors
- if task in done:
- return task.result() # may raise exception
- else:
- return None
-
-
-class Addrs(univ.SequenceOf):
- componentType = char.PrintableString()
-
-
-class RegistrationInfo(univ.Sequence):
- """
- ASN.1 representation of:
-
- type RegistrationInfo struct {
- User string
-
- Addrs []string
-
- SecretKey []byte
-
- ControllerName string
- }
- """
- pass
-
-
-def generate_user_controller_access_token(username, controller_endpoints, secret_key, controller_name):
- """" Implement in python what is currently done in GO
- https://github.com/juju/juju/blob/a5ab92ec9b7f5da3678d9ac603fe52d45af24412/cmd/juju/user/utils.go#L16
-
- :param username: name of the user to register
- :param controller_endpoints: juju controller endpoints list in the format <ip>:<port>
- :param secret_key: base64 encoded string of the secret-key generated by juju
- :param controller_name: name of the controller to register to.
- """
-
- # Secret key is returned as base64 encoded string in:
- # https://websockets.readthedocs.io/en/stable/_modules/websockets/protocol.html#WebSocketCommonProtocol.recv
- # Deconding it before marshalling into the ASN.1 message
- secret_key = base64.b64decode(secret_key)
- addr = Addrs()
- for endpoint in controller_endpoints:
- addr.append(endpoint)
-
- registration_string = RegistrationInfo()
- registration_string.setComponentByPosition(0, char.PrintableString(username))
- registration_string.setComponentByPosition(1, addr)
- registration_string.setComponentByPosition(2, univ.OctetString(secret_key))
- registration_string.setComponentByPosition(3, char.PrintableString(controller_name))
- registration_string = encode(registration_string)
- remainder = len(registration_string) % 3
- registration_string += b"\0" * (3 - remainder)
- return base64.urlsafe_b64encode(registration_string)