X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=modules%2Flibjuju%2Fjuju%2Fmodel.py;h=9a14add9d0289e58cb27f6578a698a5be16eaa71;hp=fc8d5e98dd83880cb00cc3ef92825db7fcd32ead;hb=34cc6609cad010420aee843c15c0ded8fa608835;hpb=1a15d1c84fc826fa7996c1c9d221a324edd33432 diff --git a/modules/libjuju/juju/model.py b/modules/libjuju/juju/model.py index fc8d5e9..9a14add 100644 --- a/modules/libjuju/juju/model.py +++ b/modules/libjuju/juju/model.py @@ -14,26 +14,28 @@ from concurrent.futures import CancelledError from functools import partial from pathlib import Path -import websockets -import yaml import theblues.charmstore import theblues.errors +import websockets +import yaml from . import tag, utils -from .client import client -from .client import connection +from .client import client, connector from .client.client import ConfigValue -from .constraints import parse as parse_constraints, normalize_key -from .delta import get_entity_delta -from .delta import get_entity_class +from .client.client import Value +from .constraints import parse as parse_constraints +from .constraints import normalize_key +from .delta import get_entity_class, get_entity_delta +from .errors import JujuAPIError, JujuError from .exceptions import DeadEntityException -from .errors import JujuError, JujuAPIError from .placement import parse as parse_placement +from . import provisioner + log = logging.getLogger(__name__) -class _Observer(object): +class _Observer: """Wrapper around an observer callable. This wrapper allows filter criteria to be associated with the @@ -77,7 +79,7 @@ class _Observer(object): return True -class ModelObserver(object): +class ModelObserver: """ Base class for creating observers that react to changes in a model. """ @@ -100,7 +102,7 @@ class ModelObserver(object): pass -class ModelState(object): +class ModelState: """Holds the state of the model, including the delta history of all entities in the model. @@ -144,6 +146,14 @@ class ModelState(object): """ return self._live_entity_map('unit') + @property + def relations(self): + """Return a map of relation-id:Relation for all relations currently in + the model. + + """ + return self._live_entity_map('relation') + def entity_history(self, entity_type, entity_id): """Return the history deque for an entity. @@ -209,7 +219,7 @@ class ModelState(object): connected=connected) -class ModelEntity(object): +class ModelEntity: """An object in the Model tree""" def __init__(self, entity_id, model, history_index=-1, connected=True): @@ -228,7 +238,7 @@ class ModelEntity(object): self.model = model self._history_index = history_index self.connected = connected - self.connection = model.connection + self.connection = model.connection() def __repr__(self): return '<{} entity_id="{}">'.format(type(self).__name__, @@ -380,90 +390,207 @@ class ModelEntity(object): return self.model.state.get_entity(self.entity_type, self.entity_id) -class Model(object): +class Model: """ The main API for interacting with a Juju model. """ - def __init__(self, loop=None, - max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE): - """Instantiate a new connected Model. + def __init__( + self, + loop=None, + max_frame_size=None, + bakery_client=None, + jujudata=None, + ): + """Instantiate a new Model. + + The connect method will need to be called before this + object can be used for anything interesting. + + If jujudata is None, jujudata.FileJujuData will be used. :param loop: an asyncio event loop :param max_frame_size: See `juju.client.connection.Connection.MAX_FRAME_SIZE` + :param bakery_client httpbakery.Client: The bakery client to use + for macaroon authorization. + :param jujudata JujuData: The source for current controller information + """ + self._connector = connector.Connector( + loop=loop, + max_frame_size=max_frame_size, + bakery_client=bakery_client, + jujudata=jujudata, + ) + self._observers = weakref.WeakValueDictionary() + self.state = ModelState(self) + self._info = None + self._watch_stopping = asyncio.Event(loop=self._connector.loop) + self._watch_stopped = asyncio.Event(loop=self._connector.loop) + self._watch_received = asyncio.Event(loop=self._connector.loop) + self._watch_stopped.set() + self._charmstore = CharmStore(self._connector.loop) + def is_connected(self): + """Reports whether the Model is currently connected.""" + return self._connector.is_connected() + + @property + def loop(self): + return self._connector.loop + + def connection(self): + """Return the current Connection object. It raises an exception + if the Model is disconnected""" + return self._connector.connection() + + async def get_controller(self): + """Return a Controller instance for the currently connected model. + :return Controller: """ - self.loop = loop or asyncio.get_event_loop() - self.max_frame_size = max_frame_size - self.connection = None - self.observers = weakref.WeakValueDictionary() - self.state = ModelState(self) - self.info = None - self._watch_stopping = asyncio.Event(loop=self.loop) - self._watch_stopped = asyncio.Event(loop=self.loop) - self._watch_received = asyncio.Event(loop=self.loop) - self._charmstore = CharmStore(self.loop) + from juju.controller import Controller + controller = Controller(jujudata=self._connector.jujudata) + kwargs = self.connection().connect_params() + kwargs.pop('uuid') + await controller._connect_direct(**kwargs) + return controller async def __aenter__(self): - await self.connect_current() + await self.connect() return self async def __aexit__(self, exc_type, exc, tb): await self.disconnect() - if exc_type is not None: - return False + async def connect(self, *args, **kwargs): + """Connect to a juju model. - async def connect(self, *args, **kw): - """Connect to an arbitrary Juju model. + This supports two calling conventions: - args and kw are passed through to Connection.connect() + The model and (optionally) authentication information can be taken + from the data files created by the Juju CLI. This convention will + be used if a ``model_name`` is specified, or if the ``endpoint`` + and ``uuid`` are not. - """ - if 'loop' not in kw: - kw['loop'] = self.loop - if 'max_frame_size' not in kw: - kw['max_frame_size'] = self.max_frame_size - self.connection = await connection.Connection.connect(*args, **kw) - await self._after_connect() + Otherwise, all of the ``endpoint``, ``uuid``, and authentication + information (``username`` and ``password``, or ``bakery_client`` and/or + ``macaroons``) are required. - async def connect_current(self): - """Connect to the current Juju model. + If a single positional argument is given, it will be assumed to be + the ``model_name``. Otherwise, the first positional argument, if any, + must be the ``endpoint``. + + Available parameters are: + :param model_name: Format [controller:][user/]model + :param str endpoint: The hostname:port of the controller to connect to. + :param str uuid: The model UUID to connect to. + :param str username: The username for controller-local users (or None + to use macaroon-based login.) + :param str password: The password for controller-local users. + :param str cacert: The CA certificate of the controller + (PEM formatted). + :param httpbakery.Client bakery_client: The macaroon bakery client to + to use when performing macaroon-based login. Macaroon tokens + acquired when logging will be saved to bakery_client.cookies. + If this is None, a default bakery_client will be used. + :param list macaroons: List of macaroons to load into the + ``bakery_client``. + :param asyncio.BaseEventLoop loop: The event loop to use for async + operations. + :param int max_frame_size: The maximum websocket frame size to allow. """ - self.connection = await connection.Connection.connect_current( - self.loop, max_frame_size=self.max_frame_size) + await self.disconnect() + if 'endpoint' not in kwargs and len(args) < 2: + if args and 'model_name' in kwargs: + raise TypeError('connect() got multiple values for model_name') + elif args: + model_name = args[0] + else: + model_name = kwargs.pop('model_name', None) + await self._connector.connect_model(model_name, **kwargs) + else: + if 'model_name' in kwargs: + raise TypeError('connect() got values for both ' + 'model_name and endpoint') + if args and 'endpoint' in kwargs: + raise TypeError('connect() got multiple values for endpoint') + if len(args) < 2 and 'uuid' not in kwargs: + raise TypeError('connect() missing value for uuid') + has_userpass = (len(args) >= 4 or + {'username', 'password'}.issubset(kwargs)) + has_macaroons = (len(args) >= 6 or not + {'bakery_client', 'macaroons'}.isdisjoint(kwargs)) + if not (has_userpass or has_macaroons): + raise TypeError('connect() missing auth params') + arg_names = [ + 'endpoint', + 'uuid', + 'username', + 'password', + 'cacert', + 'bakery_client', + 'macaroons', + 'loop', + 'max_frame_size', + ] + for i, arg in enumerate(args): + kwargs[arg_names[i]] = arg + if not {'endpoint', 'uuid'}.issubset(kwargs): + raise ValueError('endpoint and uuid are required ' + 'if model_name not given') + if not ({'username', 'password'}.issubset(kwargs) or + {'bakery_client', 'macaroons'}.intersection(kwargs)): + raise ValueError('Authentication parameters are required ' + 'if model_name not given') + await self._connector.connect(**kwargs) await self._after_connect() async def connect_model(self, model_name): - """Connect to a specific Juju model by name. - - :param model_name: Format [controller:][user/]model + """ + .. deprecated:: 0.6.2 + Use ``connect(model_name=model_name)`` instead. + """ + return await self.connect(model_name=model_name) + async def connect_current(self): + """ + .. deprecated:: 0.6.2 + Use ``connect()`` instead. """ - self.connection = await connection.Connection.connect_model( - model_name, self.loop, self.max_frame_size) + return await self.connect() + + async def _connect_direct(self, **kwargs): + await self.disconnect() + await self._connector.connect(**kwargs) await self._after_connect() async def _after_connect(self): - """Run initialization steps after connecting to websocket. - - """ self._watch() + + # Wait for the first packet of data from the AllWatcher, + # which contains all information on the model. + # TODO this means that we can't do anything until + # we've received all the model data, which might be + # a whole load of unneeded data if all the client wants + # to do is make one RPC call. await self._watch_received.wait() + await self.get_info() async def disconnect(self): """Shut down the watcher task and close websockets. """ - if self.connection and self.connection.is_open: + if not self._watch_stopped.is_set(): log.debug('Stopping watcher task') self._watch_stopping.set() await self._watch_stopped.wait() + self._watch_stopping.clear() + + if self.is_connected(): log.debug('Closing model connection') - await self.connection.close() - self.connection = None + await self._connector.disconnect() + self._info = None async def add_local_charm_dir(self, charm_dir, series): """Upload a local charm to the model. @@ -480,7 +607,7 @@ class Model(object): with fh: func = partial( self.add_local_charm, fh, series, os.stat(fh.name).st_size) - charm_url = await self.loop.run_in_executor(None, func) + charm_url = await self._connector.loop.run_in_executor(None, func) log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url) return charm_url @@ -505,7 +632,7 @@ class Model(object): instead. """ - conn, headers, path_prefix = self.connection.https_connection() + conn, headers, path_prefix = self.connection().https_connection() path = "%s/charms?series=%s" % (path_prefix, series) headers['Content-Type'] = 'application/zip' if size: @@ -549,13 +676,20 @@ class Model(object): async def block_until(self, *conditions, timeout=None, wait_period=0.5): """Return only after all conditions are true. + Raises `websockets.ConnectionClosed` if disconnected. """ - async def _block(): - while not all(c() for c in conditions): - if not (self.connection and self.connection.is_open): - raise websockets.ConnectionClosed(1006, 'no reason') - await asyncio.sleep(wait_period, loop=self.loop) - await asyncio.wait_for(_block(), timeout, loop=self.loop) + def _disconnected(): + return not (self.is_connected() and self.connection().is_open) + + def done(): + return _disconnected() or all(c() for c in conditions) + + await utils.block_until(done, + timeout=timeout, + wait_period=wait_period, + loop=self.loop) + if _disconnected(): + raise websockets.ConnectionClosed(1006, 'no reason') @property def applications(self): @@ -581,6 +715,13 @@ class Model(object): """ return self.state.units + @property + def relations(self): + """Return a list of all Relations currently in the model. + + """ + return list(self.state.relations.values()) + async def get_info(self): """Return a client.ModelInfo object for this Model. @@ -594,13 +735,21 @@ class Model(object): explicit call to this method. """ - facade = client.ClientFacade.from_connection(self.connection) + facade = client.ClientFacade.from_connection(self.connection()) - self.info = await facade.ModelInfo() + self._info = await facade.ModelInfo() log.debug('Got ModelInfo: %s', vars(self.info)) return self.info + @property + def info(self): + """Return the cached client.ModelInfo object for this Model. + + If Model.get_info() has not been called, this will return None. + """ + return self._info + def add_observer( self, callable_, entity_type=None, action=None, entity_id=None, predicate=None): @@ -639,7 +788,7 @@ class Model(object): """ observer = _Observer( callable_, entity_type, action, entity_id, predicate) - self.observers[observer] = callable_ + self._observers[observer] = callable_ def _watch(self): """Start an asynchronous watch against this model. @@ -650,13 +799,13 @@ class Model(object): async def _all_watcher(): try: allwatcher = client.AllWatcherFacade.from_connection( - self.connection) + self.connection()) while not self._watch_stopping.is_set(): try: results = await utils.run_with_interrupt( allwatcher.Next(), self._watch_stopping, - self.loop) + loop=self._connector.loop) except JujuAPIError as e: if 'watcher was stopped' not in str(e): raise @@ -673,24 +822,35 @@ class Model(object): del allwatcher.Id continue except websockets.ConnectionClosed: - monitor = self.connection.monitor + monitor = self.connection().monitor if monitor.status == monitor.ERROR: # closed unexpectedly, try to reopen log.warning( 'Watcher: connection closed, reopening') - await self.connection.reconnect() + await self.connection().reconnect() + if monitor.status != monitor.CONNECTED: + # reconnect failed; abort and shutdown + log.error('Watcher: automatic reconnect ' + 'failed; stopping watcher') + break del allwatcher.Id continue else: # closed on request, go ahead and shutdown break if self._watch_stopping.is_set(): - await allwatcher.Stop() + try: + await allwatcher.Stop() + except websockets.ConnectionClosed: + pass # can't stop on a closed conn break for delta in results.deltas: - delta = get_entity_delta(delta) - old_obj, new_obj = self.state.apply_delta(delta) - await self._notify_observers(delta, old_obj, new_obj) + try: + delta = get_entity_delta(delta) + old_obj, new_obj = self.state.apply_delta(delta) + await self._notify_observers(delta, old_obj, new_obj) + except KeyError as e: + log.debug("unknown delta type: %s", e.args[0]) self._watch_received.set() except CancelledError: pass @@ -704,7 +864,7 @@ class Model(object): self._watch_received.clear() self._watch_stopping.clear() self._watch_stopped.clear() - self.loop.create_task(_all_watcher()) + self._connector.loop.create_task(_all_watcher()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state @@ -724,10 +884,10 @@ class Model(object): 'Model changed: %s %s %s', delta.entity, delta.type, delta.get_id()) - for o in self.observers: + for o in self._observers: if o.cares_about(delta): asyncio.ensure_future(o(delta, old_obj, new_obj, self), - loop=self.loop) + loop=self._connector.loop) async def _wait(self, entity_type, entity_id, action, predicate=None): """ @@ -744,7 +904,7 @@ class Model(object): has a 'completed' status. See the _Observer class for details. """ - q = asyncio.Queue(loop=self.loop) + q = asyncio.Queue(loop=self._connector.loop) async def callback(delta, old, new, model): await q.put(delta.get_id()) @@ -755,24 +915,19 @@ class Model(object): # 'remove' action return self.state._live_entity_map(entity_type).get(entity_id) - async def _wait_for_new(self, entity_type, entity_id=None, predicate=None): + async def _wait_for_new(self, entity_type, entity_id): """Wait for a new object to appear in the Model and return it. - Waits for an object of type ``entity_type`` with id ``entity_id``. - If ``entity_id`` is ``None``, it will wait for the first new entity - of the correct type. - - This coroutine blocks until the new object appears in the model. + Waits for an object of type ``entity_type`` with id ``entity_id`` + to appear in the model. This is similar to watching for the + object using ``block_until``, but uses the watcher rather than + polling. """ # if the entity is already in the model, just return it if entity_id in self.state._live_entity_map(entity_type): return self.state._live_entity_map(entity_type)[entity_id] - # if we know the entity_id, we can trigger on any action that puts - # the enitty into the model; otherwise, we have to watch for the - # next "add" action on that entity_type - action = 'add' if entity_id is None else None - return await self._wait(entity_type, entity_id, action, predicate) + return await self._wait(entity_type, entity_id, None) async def wait_for_action(self, action_id): """Given an action, wait for it to complete.""" @@ -785,7 +940,7 @@ class Model(object): def predicate(delta): return delta.data['status'] in ('completed', 'failed') - return await self._wait('action', action_id, 'change', predicate) + return await self._wait('action', action_id, None, predicate) async def add_machine( self, spec=None, constraints=None, disks=None, series=None): @@ -798,7 +953,8 @@ class Model(object): (None) - starts a new machine 'lxd' - starts a new machine with one lxd container 'lxd:4' - starts a new lxd container on machine 4 - 'ssh:user@10.10.0.3' - manually provisions a machine with ssh + 'ssh:user@10.10.0.3:/path/to/private/key' - manually provision + a machine with ssh and the private key used for authentication 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS 'maas2.name' - acquire machine maas2.name on MAAS @@ -847,12 +1003,25 @@ class Model(object): """ params = client.AddMachineParams() - params.jobs = ['JobHostUnits'] if spec: - placement = parse_placement(spec) - if placement: - params.placement = placement[0] + if spec.startswith("ssh:"): + placement, target, private_key_path = spec.split(":") + user, host = target.split("@") + + sshProvisioner = provisioner.SSHProvisioner( + host=host, + user=user, + private_key_path=private_key_path, + ) + + params = sshProvisioner.provision_machine() + else: + placement = parse_placement(spec) + if placement: + params.placement = placement[0] + + params.jobs = ['JobHostUnits'] if constraints: params.constraints = client.Value.from_json(constraints) @@ -865,12 +1034,23 @@ class Model(object): params.series = series # Submit the request. - client_facade = client.ClientFacade.from_connection(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection()) results = await client_facade.AddMachines([params]) error = results.machines[0].error if error: raise ValueError("Error adding machine: %s" % error.message) machine_id = results.machines[0].machine + + if spec: + if spec.startswith("ssh:"): + # Need to run this after AddMachines has been called, + # as we need the machine_id + await sshProvisioner.install_agent( + self.connection(), + params.nonce, + machine_id, + ) + log.debug('Added new machine %s', machine_id) return await self._wait_for_new('machine', machine_id) @@ -881,29 +1061,34 @@ class Model(object): :param str relation2: '[:]' """ - app_facade = client.ApplicationFacade.from_connection(self.connection) + connection = self.connection() + app_facade = client.ApplicationFacade.from_connection(connection) log.debug( 'Adding relation %s <-> %s', relation1, relation2) + def _find_relation(*specs): + for rel in self.relations: + if rel.matches(*specs): + return rel + return None + try: result = await app_facade.AddRelation([relation1, relation2]) except JujuAPIError as e: if 'relation already exists' not in e.message: raise - log.debug( - 'Relation %s <-> %s already exists', relation1, relation2) - # TODO: if relation already exists we should return the - # Relation ModelEntity here - return None + rel = _find_relation(relation1, relation2) + if rel: + return rel + raise JujuError('Relation {} {} exists but not in model'.format( + relation1, relation2)) - def predicate(delta): - endpoints = {} - for endpoint in delta.data['endpoints']: - endpoints[endpoint['application-name']] = endpoint['relation'] - return endpoints == result.endpoints + specs = ['{}:{}'.format(app, data['name']) + for app, data in result.endpoints.items()] - return await self._wait_for_new('relation', None, predicate) + await self.block_until(lambda: _find_relation(*specs) is not None) + return _find_relation(*specs) def add_space(self, name, *cidrs): """Add a new network space. @@ -912,7 +1097,7 @@ class Model(object): (optional) list of existing subnet CIDRs with it. :param str name: Name of the space - :param \*cidrs: Optional list of existing subnet CIDRs + :param *cidrs: Optional list of existing subnet CIDRs """ raise NotImplementedError() @@ -924,7 +1109,7 @@ class Model(object): :param str key: The public ssh key """ - key_facade = client.KeyManagerFacade.from_connection(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection()) return await key_facade.AddKeys([key], user) add_ssh_keys = add_ssh_key @@ -933,7 +1118,7 @@ class Model(object): :param str cidr_or_id: CIDR or provider ID of the existing subnet :param str space: Network space with which to associate - :param str \*zones: Zone(s) in which the subnet resides + :param str *zones: Zone(s) in which the subnet resides """ raise NotImplementedError() @@ -947,7 +1132,7 @@ class Model(object): def block(self, *commands): """Add a new block to this model. - :param str \*commands: The commands to block. Valid values are + :param str *commands: The commands to block. Valid values are 'all-changes', 'destroy-model', 'remove-object' """ @@ -984,7 +1169,7 @@ class Model(object): :param str name: Name to give the storage pool :param str provider_type: Pool provider type - :param \*\*pool_config: key/value pool configuration pairs + :param **pool_config: key/value pool configuration pairs """ raise NotImplementedError() @@ -1033,7 +1218,7 @@ class Model(object): self, entity_url, application_name=None, bind=None, budget=None, channel=None, config=None, constraints=None, force=False, num_units=1, plan=None, resources=None, series=None, storage=None, - to=None): + to=None, devices=None): """Deploy a new service or bundle. :param str entity_url: Charm or bundle url @@ -1072,20 +1257,27 @@ class Model(object): for k, v in storage.items() } + entity_path = Path(entity_url.replace('local:', '')) + bundle_path = entity_path / 'bundle.yaml' + metadata_path = entity_path / 'metadata.yaml' + is_local = ( entity_url.startswith('local:') or - os.path.isdir(entity_url) + entity_path.is_dir() or + entity_path.is_file() ) if is_local: entity_id = entity_url.replace('local:', '') else: - entity = await self.charmstore.entity(entity_url, channel=channel) + entity = await self.charmstore.entity(entity_url, channel=channel, + include_stats=False) entity_id = entity['Id'] - client_facade = client.ClientFacade.from_connection(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection()) is_bundle = ((is_local and - (Path(entity_id) / 'bundle.yaml').exists()) or + (entity_id.endswith('.yaml') and entity_path.exists()) or + bundle_path.exists()) or (not is_local and 'bundle/' in entity_id)) if is_bundle: @@ -1100,9 +1292,9 @@ class Model(object): await asyncio.gather(*[ asyncio.ensure_future( self._wait_for_new('application', app_name), - loop=self.loop) + loop=self._connector.loop) for app_name in pending_apps - ], loop=self.loop) + ], loop=self._connector.loop) return [app for name, app in self.applications.items() if name in handler.applications] else: @@ -1116,8 +1308,11 @@ class Model(object): # actually support them yet anyway resources = await self._add_store_resources(application_name, entity_id, - entity) + entity=entity) else: + if not application_name: + metadata = yaml.load(metadata_path.read_text()) + application_name = metadata['name'] # We have a local charm dir that needs to be uploaded charm_dir = os.path.abspath( os.path.expanduser(entity_id)) @@ -1139,13 +1334,16 @@ class Model(object): storage=storage, channel=channel, num_units=num_units, - placement=parse_placement(to) + placement=parse_placement(to), + devices=devices, ) - async def _add_store_resources(self, application, entity_url, entity=None): + async def _add_store_resources(self, application, entity_url, + overrides=None, entity=None): if not entity: # avoid extra charm store call if one was already made - entity = await self.charmstore.entity(entity_url) + entity = await self.charmstore.entity(entity_url, + include_stats=False) resources = [ { 'description': resource['Description'], @@ -1159,11 +1357,22 @@ class Model(object): } for resource in entity['Meta']['resources'] ] + if overrides: + names = {r['name'] for r in resources} + unknown = overrides.keys() - names + if unknown: + raise JujuError('Unrecognized resource{}: {}'.format( + 's' if len(unknown) > 1 else '', + ', '.join(unknown))) + for resource in resources: + if resource['name'] in overrides: + resource['revision'] = overrides[resource['name']] + if not resources: return None resources_facade = client.ResourcesFacade.from_connection( - self.connection) + self.connection()) response = await resources_facade.AddPendingResources( tag.application(application), entity_url, @@ -1175,7 +1384,8 @@ class Model(object): async def _deploy(self, charm_url, application, series, config, constraints, endpoint_bindings, resources, storage, - channel=None, num_units=None, placement=None): + channel=None, num_units=None, placement=None, + devices=None): """Logic shared between `Model.deploy` and `BundleHandler.deploy`. """ log.info('Deploying %s', charm_url) @@ -1186,7 +1396,7 @@ class Model(object): default_flow_style=False) app_facade = client.ApplicationFacade.from_connection( - self.connection) + self.connection()) app = client.ApplicationDeploy( charm_url=charm_url, @@ -1199,9 +1409,9 @@ class Model(object): num_units=num_units, resources=resources, storage=storage, - placement=placement + placement=placement, + devices=devices, ) - result = await app_facade.Deploy([app]) errors = [r.error.message for r in result.results if r.error] if errors: @@ -1218,7 +1428,8 @@ class Model(object): """Destroy units by name. """ - app_facade = client.ApplicationFacade.from_connection(self.connection) + connection = self.connection() + app_facade = client.ApplicationFacade.from_connection(connection) log.debug( 'Destroying unit%s %s', @@ -1263,7 +1474,7 @@ class Model(object): which have `source` and `value` attributes. """ config_facade = client.ModelConfigFacade.from_connection( - self.connection + self.connection() ) result = await config_facade.ModelGet() config = result.config @@ -1271,27 +1482,30 @@ class Model(object): config[key] = ConfigValue.from_json(value) return config - def get_constraints(self): + async def get_constraints(self): """Return the machine constraints for this model. - """ - raise NotImplementedError() - - async def grant(self, username, acl='read'): - """Grant a user access to this model. - - :param str username: Username - :param str acl: Access control ('read' or 'write') - - """ - controller_conn = await self.connection.controller() - model_facade = client.ModelManagerFacade.from_connection( - controller_conn) - user = tag.user(username) - model = tag.model(self.info.uuid) - changes = client.ModifyModelAccess(acl, 'grant', model, user) - await self.revoke(username) - return await model_facade.ModifyModelAccess([changes]) + :returns: A ``dict`` of constraints. + """ + constraints = {} + client_facade = client.ClientFacade.from_connection(self.connection()) + result = await client_facade.GetModelConstraints() + + # GetModelConstraints returns GetConstraintsResults which has a + # 'constraints' attribute. If no constraints have been set + # GetConstraintsResults.constraints is None. Otherwise + # GetConstraintsResults.constraints has an attribute for each possible + # constraint, each of these in turn will be None if they have not been + # set. + if result.constraints: + constraint_types = [a for a in dir(result.constraints) + if a in Value._toSchema.keys()] + for constraint in constraint_types: + value = getattr(result.constraints, constraint) + if value is not None: + constraints[constraint] = getattr(result.constraints, + constraint) + return constraints def import_ssh_key(self, identity): """Add a public SSH key from a trusted indentity source to this model. @@ -1326,7 +1540,7 @@ class Model(object): else it's fingerprint """ - key_facade = client.KeyManagerFacade.from_connection(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection()) entity = {'tag': tag.model(self.info.uuid)} entities = client.Entities([entity]) return await key_facade.ListKeys(entities, raw_ssh) @@ -1386,7 +1600,7 @@ class Model(object): def remove_machine(self, *machine_ids): """Remove a machine from this model. - :param str \*machine_ids: Ids of the machines to remove + :param str *machine_ids: Ids of the machines to remove """ raise NotImplementedError() @@ -1399,7 +1613,7 @@ class Model(object): :param str user: Juju user to which the key is registered """ - key_facade = client.KeyManagerFacade.from_connection(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection()) key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii'))) key = hashlib.md5(key).hexdigest() key = ':'.join(a + b for a, b in zip(key[::2], key[1::2])) @@ -1427,20 +1641,6 @@ class Model(object): """ raise NotImplementedError() - async def revoke(self, username): - """Revoke a user's access to this model. - - :param str username: Username to revoke - - """ - controller_conn = await self.connection.controller() - model_facade = client.ModelManagerFacade.from_connection( - controller_conn) - user = tag.user(username) - model = tag.model(self.info.uuid) - changes = client.ModifyModelAccess('read', 'revoke', model, user) - return await model_facade.ModifyModelAccess([changes]) - def run(self, command, timeout=None): """Run command on all machines in this model. @@ -1457,38 +1657,86 @@ class Model(object): `ConfigValue` instances, as returned by `get_config`. """ config_facade = client.ModelConfigFacade.from_connection( - self.connection + self.connection() ) for key, value in config.items(): if isinstance(value, ConfigValue): config[key] = value.value await config_facade.ModelSet(config) - def set_constraints(self, constraints): + async def set_constraints(self, constraints): """Set machine constraints on this model. - :param :class:`juju.Constraints` constraints: Machine constraints - + :param dict config: Mapping of model constraints """ - raise NotImplementedError() + client_facade = client.ClientFacade.from_connection(self.connection()) + await client_facade.SetModelConstraints( + application='', + constraints=constraints) - def get_action_output(self, action_uuid, wait=-1): + async def get_action_output(self, action_uuid, wait=None): """Get the results of an action by ID. :param str action_uuid: Id of the action - :param int wait: Time in seconds to wait for action to complete - + :param int wait: Time in seconds to wait for action to complete. + :return dict: Output from action + :raises: :class:`JujuError` if invalid action_uuid """ - raise NotImplementedError() + action_facade = client.ActionFacade.from_connection( + self.connection() + ) + entity = [{'tag': tag.action(action_uuid)}] + # Cannot use self.wait_for_action as the action event has probably + # already happened and self.wait_for_action works by processing + # model deltas and checking if they match our type. If the action + # has already occured then the delta has gone. + + async def _wait_for_action_status(): + while True: + action_output = await action_facade.Actions(entity) + if action_output.results[0].status in ('completed', 'failed'): + return + else: + await asyncio.sleep(1) + await asyncio.wait_for( + _wait_for_action_status(), + timeout=wait) + action_output = await action_facade.Actions(entity) + # ActionResult.output is None if the action produced no output + if action_output.results[0].output is None: + output = {} + else: + output = action_output.results[0].output + return output - def get_action_status(self, uuid_or_prefix=None, name=None): - """Get the status of all actions, filtered by ID, ID prefix, or action name. + async def get_action_status(self, uuid_or_prefix=None, name=None): + """Get the status of all actions, filtered by ID, ID prefix, or name. :param str uuid_or_prefix: Filter by action uuid or prefix :param str name: Filter by action name """ - raise NotImplementedError() + results = {} + action_results = [] + action_facade = client.ActionFacade.from_connection( + self.connection() + ) + if name: + name_results = await action_facade.FindActionsByNames([name]) + action_results.extend(name_results.actions[0].actions) + if uuid_or_prefix: + # Collect list of actions matching uuid or prefix + matching_actions = await action_facade.FindActionTagsByPrefix( + [uuid_or_prefix]) + entities = [] + for actions in matching_actions.matches.values(): + entities = [{'tag': a.tag} for a in actions] + # Get action results matching action tags + uuid_results = await action_facade.Actions(entities) + action_results.extend(uuid_results.results) + for a in action_results: + results[tag.untag('action-', a.action.tag)] = a.status + return results def get_budget(self, budget_name): """Get budget usage info. @@ -1506,7 +1754,7 @@ class Model(object): :param bool utc: Display time as UTC in RFC3339 format """ - client_facade = client.ClientFacade.from_connection(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection()) return await client_facade.FullStatus(filters) def sync_tools( @@ -1529,7 +1777,7 @@ class Model(object): def unblock(self, *commands): """Unblock an operation that would alter this model. - :param str \*commands: The commands to unblock. Valid values are + :param str *commands: The commands to unblock. Valid values are 'all-changes', 'destroy-model', 'remove-object' """ @@ -1538,7 +1786,7 @@ class Model(object): def unset_config(self, *keys): """Unset configuration on this model. - :param str \*keys: The keys to unset + :param str *keys: The keys to unset """ raise NotImplementedError() @@ -1578,7 +1826,7 @@ class Model(object): async def get_metrics(self, *tags): """Retrieve metrics. - :param str \*tags: Tags of entities from which to retrieve metrics. + :param str *tags: Tags of entities from which to retrieve metrics. No tags retrieves the metrics of all units in the model. :return: Dictionary of unit_name:metrics @@ -1587,7 +1835,7 @@ class Model(object): ', '.join(tags) if tags else "all units") metrics_facade = client.MetricsDebugFacade.from_connection( - self.connection) + self.connection()) entities = [client.Entity(tag) for tag in tags] metrics_result = await metrics_facade.GetMetrics(entities) @@ -1623,7 +1871,7 @@ def get_charm_series(path): return series[0] if series else None -class BundleHandler(object): +class BundleHandler: """ Handle bundles by using the API to translate bundle YAML into a plan of steps and then dispatching each of those using the API. @@ -1637,12 +1885,14 @@ class BundleHandler(object): for unit_name, unit in model.units.items(): app_units = self._units_by_app.setdefault(unit.application, []) app_units.append(unit_name) + self.bundle_facade = client.BundleFacade.from_connection( + model.connection()) self.client_facade = client.ClientFacade.from_connection( - model.connection) + model.connection()) self.app_facade = client.ApplicationFacade.from_connection( - model.connection) + model.connection()) self.ann_facade = client.AnnotationsFacade.from_connection( - model.connection) + model.connection()) async def _handle_local_charms(self, bundle): """Search for references to local charms (i.e. filesystem paths) @@ -1694,8 +1944,11 @@ class BundleHandler(object): return bundle async def fetch_plan(self, entity_id): - is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id) - if is_local: + is_store_url = entity_id.startswith('cs:') + + if not is_store_url and os.path.isfile(entity_id): + bundle_yaml = Path(entity_id).read_text() + elif not is_store_url and os.path.isdir(entity_id): bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text() else: bundle_yaml = await self.charmstore.files(entity_id, @@ -1704,9 +1957,12 @@ class BundleHandler(object): self.bundle = yaml.safe_load(bundle_yaml) self.bundle = await self._handle_local_charms(self.bundle) - self.plan = await self.client_facade.GetBundleChanges( + self.plan = await self.bundle_facade.GetChanges( yaml.dump(self.bundle)) + if self.plan.errors: + raise JujuError(self.plan.errors) + async def execute_plan(self): for step in self.plan.changes: method = getattr(self, step.method) @@ -1772,7 +2028,11 @@ class BundleHandler(object): # Fix up values, as necessary. if 'parent_id' in params: - params['parent_id'] = self.resolve(params['parent_id']) + if params['parent_id'].startswith('$addUnit'): + unit = self.resolve(params['parent_id'])[0] + params['parent_id'] = unit.machine.entity_id + else: + params['parent_id'] = self.resolve(params['parent_id']) params['constraints'] = parse_constraints( params.get('constraints')) @@ -1813,7 +2073,7 @@ class BundleHandler(object): return await self.model.add_relation(*endpoints) async def deploy(self, charm, series, application, options, constraints, - storage, endpoint_bindings, resources): + storage, endpoint_bindings, *args): """ :param charm string: Charm holds the URL of the charm to be used to deploy this @@ -1838,17 +2098,35 @@ class BundleHandler(object): :param endpoint_bindings map[string]string: EndpointBindings holds the optional endpoint bindings + :param devices map[string]string: + Devices holds the optional devices constraints. + (Only given on Juju 2.5+) + :param resources map[string]int: Resources identifies the revision to use for each resource of the application's charm. + + :param num_units int: + NumUnits holds the number of units required. For IAAS models, this + will be 0 and separate AddUnitChanges will be used. For Kubernetes + models, this will be used to scale the application. + (Only given on Juju 2.5+) """ # resolve indirect references charm = self.resolve(charm) - # the bundle plan doesn't actually do anything with resources, even - # though it ostensibly gives us something (None) for that param + + if len(args) == 1: + # Juju 2.4 and below only sends the resources + resources = args[0] + devices, num_units = None, None + else: + # Juju 2.5+ sends devices before resources, as well as num_units + # There might be placement but we need to ignore that. + devices, resources, num_units = args[:3] + if not charm.startswith('local:'): - resources = await self.model._add_store_resources(application, - charm) + resources = await self.model._add_store_resources( + application, charm, overrides=resources) await self.model._deploy( charm_url=charm, application=application, @@ -1858,6 +2136,8 @@ class BundleHandler(object): endpoint_bindings=endpoint_bindings, resources=resources, storage=storage, + devices=devices, + num_units=num_units, ) return application @@ -1889,6 +2169,20 @@ class BundleHandler(object): to=placement, ) + async def scale(self, application, scale): + """ + Handle a change of scale to a k8s application. + + :param string application: + Application holds the application placeholder name for which a unit + is added. + + :param int scale: + New scale value to use. + """ + application = self.resolve(application) + return await self.model.applications[application].scale(scale=scale) + async def expose(self, application): """ :param application string: @@ -1920,13 +2214,13 @@ class BundleHandler(object): return await entity.set_annotations(annotations) -class CharmStore(object): +class CharmStore: """ Async wrapper around theblues.charmstore.CharmStore """ - def __init__(self, loop): + def __init__(self, loop, cs_timeout=20): self.loop = loop - self._cs = theblues.charmstore.CharmStore(timeout=5) + self._cs = theblues.charmstore.CharmStore(timeout=cs_timeout) def __getattr__(self, name): """ @@ -1952,7 +2246,7 @@ class CharmStore(object): return wrapper -class CharmArchiveGenerator(object): +class CharmArchiveGenerator: """ Create a Zip archive of a local charm directory for upload to a controller. @@ -1969,9 +2263,9 @@ class CharmArchiveGenerator(object): Ignored:: - * build/\* - This is used for packing the charm itself and any + * build/* - This is used for packing the charm itself and any similar tasks. - * \*/.\* - Hidden files are all ignored for now. This will most + * */.* - Hidden files are all ignored for now. This will most likely be changed into a specific ignore list (.bzr, etc)