X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=db65b7d1eca5fa1af1e0c988ba7d2cea88faedf6;hb=c8d7bcea7e1fd4b174e3be2a29e382df66e88b66;hp=480813c78a86298e1fde7ccaa704af0b47bf2ee0;hpb=2ed7314a9ea1240883655bc521b6e27f149aa485;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index 480813c..db65b7d 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,78 +1,442 @@ +import asyncio +import collections import logging +import re +import weakref +from concurrent.futures import CancelledError +from functools import partial + +import yaml +from theblues import charmstore from .client import client from .client import watcher +from .client import connection from .delta import get_entity_delta +from .delta import get_entity_class +from .exceptions import DeadEntityException +from .errors import JujuAPIError log = logging.getLogger(__name__) +class _Observer(object): + """Wrapper around an observer callable. + + This wrapper allows filter criteria to be associated with the + callable so that it's only called for changes that meet the criteria. + + """ + def __init__(self, callable_, entity_type, action, entity_id, predicate): + self.callable_ = callable_ + self.entity_type = entity_type + self.action = action + self.entity_id = entity_id + self.predicate = predicate + if self.entity_id: + self.entity_id = str(self.entity_id) + if not self.entity_id.startswith('^'): + self.entity_id = '^' + self.entity_id + if not self.entity_id.endswith('$'): + self.entity_id += '$' + + async def __call__(self, delta, old, new, model): + await self.callable_(delta, old, new, model) + + def cares_about(self, delta): + """Return True if this observer "cares about" (i.e. wants to be + called) for a this delta. + + """ + if (self.entity_id and delta.get_id() and + not re.match(self.entity_id, str(delta.get_id()))): + return False + + if self.entity_type and self.entity_type != delta.entity: + return False + + if self.action and self.action != delta.type: + return False + + if self.predicate and not self.predicate(delta): + return False + + return True + + class ModelObserver(object): - def __call__(self, delta, old, new, model): - if old is None and new is not None: - type_ = 'add' - else: - type_ = delta.type - handler_name = 'on_{}_{}'.format(delta.entity, type_) + async def __call__(self, delta, old, new, model): + handler_name = 'on_{}_{}'.format(delta.entity, delta.type) method = getattr(self, handler_name, self.on_change) - log.debug( - 'Model changed: %s %s %s', - delta.entity, delta.type, delta.get_id()) - method(delta, old, new, model) + await method(delta, old, new, model) - def on_change(self, delta, old, new, model): + async def on_change(self, delta, old, new, model): pass +class ModelState(object): + """Holds the state of the model, including the delta history of all + entities in the model. + + """ + def __init__(self, model): + self.model = model + self.state = dict() + + def _live_entity_map(self, entity_type): + """Return an id:Entity map of all the living entities of + type ``entity_type``. + + """ + return { + entity_id: self.get_entity(entity_type, entity_id) + for entity_id, history in self.state.get(entity_type, {}).items() + if history[-1] is not None + } + + @property + def applications(self): + """Return a map of application-name:Application for all applications + currently in the model. + + """ + return self._live_entity_map('application') + + @property + def machines(self): + """Return a map of machine-id:Machine for all machines currently in + the model. + + """ + return self._live_entity_map('machine') + + @property + def units(self): + """Return a map of unit-id:Unit for all units currently in + the model. + + """ + return self._live_entity_map('unit') + + def entity_history(self, entity_type, entity_id): + """Return the history deque for an entity. + + """ + return self.state[entity_type][entity_id] + + def entity_data(self, entity_type, entity_id, history_index): + """Return the data dict for an entity at a specific index of its + history. + + """ + return self.entity_history(entity_type, entity_id)[history_index] + + def apply_delta(self, delta): + """Apply delta to our state and return a copy of the + affected object as it was before and after the update, e.g.: + + old_obj, new_obj = self.apply_delta(delta) + + old_obj may be None if the delta is for the creation of a new object, + e.g. a new application or unit is deployed. + + new_obj will never be None, but may be dead (new_obj.dead == True) + if the object was deleted as a result of the delta being applied. + + """ + history = ( + self.state + .setdefault(delta.entity, {}) + .setdefault(delta.get_id(), collections.deque()) + ) + + history.append(delta.data) + if delta.type == 'remove': + history.append(None) + + entity = self.get_entity(delta.entity, delta.get_id()) + return entity.previous(), entity + + def get_entity( + self, entity_type, entity_id, history_index=-1, connected=True): + """Return an object instance for the given entity_type and id. + + By default the object state matches the most recent state from + Juju. To get an instance of the object in an older state, pass + history_index, an index into the history deque for the entity. + + """ + + if history_index < 0 and history_index != -1: + history_index += len(self.entity_history(entity_type, entity_id)) + if history_index < 0: + return None + + try: + self.entity_data(entity_type, entity_id, history_index) + except IndexError: + return None + + entity_class = get_entity_class(entity_type) + return entity_class( + entity_id, self.model, history_index=history_index, + connected=connected) + + class ModelEntity(object): """An object in the Model tree""" - def __init__(self, data, model): + def __init__(self, entity_id, model, history_index=-1, connected=True): """Initialize a new entity - :param data: dict of data from a watcher delta + :param entity_id str: The unique id of the object in the model :param model: The model instance in whose object tree this entity resides + :history_index int: The index of this object's state in the model's + history deque for this entity + :connected bool: Flag indicating whether this object gets live updates + from the model. """ - self.data = data + self.entity_id = entity_id self.model = model + self._history_index = history_index + self.connected = connected self.connection = model.connection def __getattr__(self, name): + """Fetch object attributes from the underlying data dict held in the + model. + + """ + if self.data is None: + raise DeadEntityException( + "Entity {}:{} is dead - its attributes can no longer be " + "accessed. Use the .previous() method on this object to get " + "a copy of the object at its previous state.".format( + self.entity_type, self.entity_id)) return self.data[name] + def __bool__(self): + return bool(self.data) + + def on_change(self, callable_): + """Add a change observer to this entity. + + """ + self.model.add_observer( + callable_, self.entity_type, 'change', self.entity_id) + + def on_remove(self, callable_): + """Add a remove observer to this entity. + + """ + self.model.add_observer( + callable_, self.entity_type, 'remove', self.entity_id) + + @property + def entity_type(self): + """A string identifying the entity type of this object, e.g. + 'application' or 'unit', etc. + + """ + return self.__class__.__name__.lower() + + @property + def current(self): + """Return True if this object represents the current state of the + entity in the underlying model. + + This will be True except when the object represents an entity at a + non-latest state in history, e.g. if the object was obtained by calling + .previous() on another object. + + """ + return self._history_index == -1 + + @property + def dead(self): + """Returns True if this entity no longer exists in the underlying + model. + + """ + return ( + self.data is None or + self.model.state.entity_data( + self.entity_type, self.entity_id, -1) is None + ) + + @property + def alive(self): + """Returns True if this entity still exists in the underlying + model. + + """ + return not self.dead + + @property + def data(self): + """The data dictionary for this entity. + + """ + return self.model.state.entity_data( + self.entity_type, self.entity_id, self._history_index) + + def previous(self): + """Return a copy of this object as was at its previous state in + history. + + Returns None if this object is new (and therefore has no history). + + The returned object is always "disconnected", i.e. does not receive + live updates. + + """ + return self.model.state.get_entity( + self.entity_type, self.entity_id, self._history_index - 1, + connected=False) + + def next(self): + """Return a copy of this object at its next state in + history. + + Returns None if this object is already the latest. + + The returned object is "disconnected", i.e. does not receive + live updates, unless it is current (latest). + + """ + if self._history_index == -1: + return None + + new_index = self._history_index + 1 + connected = ( + new_index == len(self.model.state.entity_history( + self.entity_type, self.entity_id)) - 1 + ) + return self.model.state.get_entity( + self.entity_type, self.entity_id, self._history_index - 1, + connected=connected) + + def latest(self): + """Return a copy of this object at its current state in the model. + + Returns self if this object is already the latest. + + The returned object is always "connected", i.e. receives + live updates from the model. + + """ + if self._history_index == -1: + return self + + return self.model.state.get_entity(self.entity_type, self.entity_id) + class Model(object): - def __init__(self, connection): + def __init__(self, loop=None): """Instantiate a new connected Model. - :param connection: `juju.client.connection.Connection` instance + :param loop: an asyncio event loop """ - self.connection = connection - self.observers = set() - self.state = dict() - self._watching = False + self.loop = loop or asyncio.get_event_loop() + self.connection = None + self.observers = weakref.WeakValueDictionary() + self.state = ModelState(self) + self._watcher_task = None + self._watch_shutdown = asyncio.Event(loop=loop) + self._watch_received = asyncio.Event(loop=loop) + self._charmstore = CharmStore(self.loop) + + async def connect_current(self): + """Connect to the current Juju model. + + """ + self.connection = await connection.Connection.connect_current() + self._watch() + await self._watch_received.wait() + + async def disconnect(self): + """Shut down the watcher task and close websockets. + + """ + self._stop_watching() + if self.connection and self.connection.is_open: + await self._watch_shutdown.wait() + log.debug('Closing model connection') + await self.connection.close() + self.connection = None + + def all_units_idle(self): + """Return True if all units are idle. + + """ + for unit in self.units.values(): + unit_status = unit.data['agent-status']['current'] + if unit_status != 'idle': + return False + return True + + async def reset(self, force=False): + """Reset the model to a clean state. + + :param bool force: Force-terminate machines. + + This returns only after the model has reached a clean state. "Clean" + means no applications or machines exist in the model. + + """ + log.debug('Resetting model') + for app in self.applications.values(): + await app.destroy() + for machine in self.machines.values(): + await machine.destroy(force=force) + await self.block_until( + lambda: len(self.machines) == 0 + ) + + async def block_until(self, *conditions, timeout=None): + """Return only after all conditions are true. + + """ + async def _block(): + while not all(c() for c in conditions): + await asyncio.sleep(0) + await asyncio.wait_for(_block(), timeout) @property def applications(self): - return self.state.get('application', {}) + """Return a map of application-name:Application for all applications + currently in the model. + + """ + return self.state.applications + + @property + def machines(self): + """Return a map of machine-id:Machine for all machines currently in + the model. + + """ + return self.state.machines @property def units(self): - return self.state.get('unit', {}) + """Return a map of unit-id:Unit for all units currently in + the model. - def stop_watching(self): - self.connection.cancel() - self._watching = False + """ + return self.state.units - def add_observer(self, callable_): + def add_observer( + self, callable_, entity_type=None, action=None, entity_id=None, + predicate=None): """Register an "on-model-change" callback - Once a watch is started (Model.watch() is called), ``callable_`` + Once the model is connected, ``callable_`` will be called each time the model changes. callable_ should - accept the following positional arguments: + be Awaitable and accept the following positional arguments: delta - An instance of :class:`juju.delta.EntityDelta` containing the raw delta data recv'd from the Juju @@ -89,61 +453,65 @@ class Model(object): model - The :class:`Model` itself. - """ - self.observers.add(callable_) + Events for which ``callable_`` is called can be specified by passing + entity_type, action, and/or id_ filter criteria, e.g.: - async def watch(self): - """Start an asynchronous watch against this model. + add_observer( + myfunc, entity_type='application', action='add', id_='ubuntu') - See :meth:`add_observer` to register an onchange callback. + For more complex filtering conditions, pass a predicate function. It + will be called with a delta as its only argument. If the predicate + function returns True, the callable_ will be called. """ - self._watching = True - allwatcher = watcher.AllWatcher() - allwatcher.connect(await self.connection.clone()) - while self._watching: - results = await allwatcher.Next() - for delta in results.deltas: - delta = get_entity_delta(delta) - old_obj, new_obj = self._apply_delta(delta) - self._notify_observers(delta, old_obj, new_obj) - - def _apply_delta(self, delta): - """Apply delta to our model state and return the a copy of the - affected object as it was before and after the update, e.g.: - - old_obj, new_obj = self._apply_delta(delta) + observer = _Observer( + callable_, entity_type, action, entity_id, predicate) + self.observers[observer] = callable_ - old_obj may be None if the delta is for the creation of a new object, - e.g. a new application or unit is deployed. + def _watch(self): + """Start an asynchronous watch against this model. - new_obj may be None if no object was created or updated, or if an - object was deleted as a result of the delta being applied. + See :meth:`add_observer` to register an onchange callback. """ - old_obj, new_obj = None, None - - if (delta.entity in self.state and - delta.get_id() in self.state[delta.entity]): - old_obj = self.state[delta.entity][delta.get_id()] - if delta.type == 'remove': - del self.state[delta.entity][delta.get_id()] - return old_obj, new_obj - - new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = ( - self._create_model_entity(delta)) - - return old_obj, new_obj - - def _create_model_entity(self, delta): - """Return an object instance representing the entity created or - updated by ``delta`` + async def _start_watch(): + self._watch_shutdown.clear() + try: + allwatcher = watcher.AllWatcher() + self._watch_conn = await self.connection.clone() + allwatcher.connect(self._watch_conn) + while True: + results = await allwatcher.Next() + for delta in results.deltas: + delta = get_entity_delta(delta) + old_obj, new_obj = self.state.apply_delta(delta) + # XXX: Might not want to shield at this level + # We are shielding because when the watcher is + # canceled (on disconnect()), we don't want all of + # its children (every observer callback) to be + # canceled with it. So we shield them. But this means + # they can *never* be canceled. + await asyncio.shield( + self._notify_observers(delta, old_obj, new_obj)) + self._watch_received.set() + except CancelledError: + log.debug('Closing watcher connection') + await self._watch_conn.close() + self._watch_shutdown.set() + self._watch_conn = None + + log.debug('Starting watcher task') + self._watcher_task = self.loop.create_task(_start_watch()) + + def _stop_watching(self): + """Stop the asynchronous watch against this model. """ - entity_class = delta.get_entity_class() - return entity_class(delta.data, self) + log.debug('Stopping watcher task') + if self._watcher_task: + self._watcher_task.cancel() - def _notify_observers(self, delta, old_obj, new_obj): + async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state :param delta: The raw change from the watcher @@ -154,8 +522,63 @@ class Model(object): by applying this delta. """ + if new_obj and not old_obj: + delta.type = 'add' + + log.debug( + 'Model changed: %s %s %s', + delta.entity, delta.type, delta.get_id()) + for o in self.observers: - o(delta, old_obj, new_obj, self) + if o.cares_about(delta): + asyncio.ensure_future(o(delta, old_obj, new_obj, self)) + + async def _wait(self, entity_type, entity_id, action, predicate=None): + """ + Block the calling routine until a given action has happened to the + given entity + + :param entity_type: The entity's type. + :param entity_id: The entity's id. + :param action: the type of action (e.g., 'add' or 'change') + :param predicate: optional callable that must take as an + argument a delta, and must return a boolean, indicating + whether the delta contains the specific action we're looking + for. For example, you might check to see whether a 'change' + has a 'completed' status. See the _Observer class for details. + + """ + q = asyncio.Queue(loop=self.loop) + + async def callback(delta, old, new, model): + await q.put(delta.get_id()) + + self.add_observer(callback, entity_type, action, entity_id, predicate) + entity_id = await q.get() + return self.state._live_entity_map(entity_type)[entity_id] + + async def _wait_for_new(self, entity_type, entity_id, predicate=None): + """Wait for a new object to appear in the Model and return it. + + Waits for an object of type ``entity_type`` with id ``entity_id``. + + This coroutine blocks until the new object appears in the model. + + """ + return await self._wait(entity_type, entity_id, 'add', predicate) + + async def wait_for_action(self, action_id): + """Given an action, wait for it to complete.""" + + if action_id.startswith("action-"): + # if we've been passed action.tag, transform it into the + # id that the api deltas will use. + action_id = action_id[7:] + + def predicate(delta): + return delta.data['status'] in ('completed', 'failed') + + return await self._wait('action', action_id, 'change', predicate) def add_machine( self, spec=None, constraints=None, disks=None, series=None, @@ -187,14 +610,37 @@ class Model(object): pass add_machines = add_machine - def add_relation(self, relation1, relation2): - """Add a relation between two services. + async def add_relation(self, relation1, relation2): + """Add a relation between two applications. - :param str relation1: '[:]' - :param str relation2: '[:]' + :param str relation1: '[:]' + :param str relation2: '[:]' """ - pass + app_facade = client.ApplicationFacade() + app_facade.connect(self.connection) + + log.debug( + 'Adding relation %s <-> %s', relation1, relation2) + + try: + result = await app_facade.AddRelation([relation1, relation2]) + except JujuAPIError as e: + if 'relation already exists' not in e.message: + raise + log.debug( + 'Relation %s <-> %s already exists', relation1, relation2) + # TODO: if relation already exists we should return the + # Relation ModelEntity here + return None + + def predicate(delta): + endpoints = {} + for endpoint in delta.data['endpoints']: + endpoints[endpoint['application-name']] = endpoint['relation'] + return endpoints == result.endpoints + + return await self._wait_for_new('relation', None, predicate) def add_space(self, name, *cidrs): """Add a new network space. @@ -336,16 +782,11 @@ class Model(object): TODO:: - - entity_url must have a revision; look up latest automatically if - not provided by caller - service_name is required; fill this in automatically if not provided by caller - series is required; how do we pick a default? """ - if constraints: - constraints = client.Value(**constraints) - if to: placement = [ client.Placement(**p) for p in to @@ -359,30 +800,50 @@ class Model(object): for k, v in storage.items() } + entity_id = await self.charmstore.entityId(entity_url) + app_facade = client.ApplicationFacade() client_facade = client.ClientFacade() app_facade.connect(self.connection) client_facade.connect(self.connection) - log.debug( - 'Deploying %s', entity_url) - - await client_facade.AddCharm(channel, entity_url) - app = client.ApplicationDeploy( - application=service_name, - channel=channel, - charm_url=entity_url, - config=config, - constraints=constraints, - endpoint_bindings=bind, - num_units=num_units, - placement=placement, - resources=resources, - series=series, - storage=storage, - ) - - return await app_facade.Deploy([app]) + if 'bundle/' in entity_id: + handler = BundleHandler(self) + await handler.fetch_plan(entity_id) + await handler.execute_plan() + extant_apps = {app for app in self.applications} + pending_apps = set(handler.applications) - extant_apps + if pending_apps: + # new apps will usually be in the model by now, but if some + # haven't made it yet we'll need to wait on them to be added + await asyncio.gather(*[ + asyncio.ensure_future( + self.model._wait_for_new('application', app_name)) + for app_name in pending_apps + ]) + return [app for name, app in self.applications.items() + if name in handler.applications] + else: + log.debug( + 'Deploying %s', entity_id) + + await client_facade.AddCharm(channel, entity_id) + app = client.ApplicationDeploy( + application=service_name, + channel=channel, + charm_url=entity_id, + config=config, + constraints=constraints, + endpoint_bindings=bind, + num_units=num_units, + placement=placement, + resources=resources, + series=series, + storage=storage, + ) + + await app_facade.Deploy([app]) + return await self._wait_for_new('application', service_name) def destroy(self): """Terminate all machines and resources for this model. @@ -390,6 +851,21 @@ class Model(object): """ pass + async def destroy_unit(self, *unit_names): + """Destroy units by name. + + """ + app_facade = client.ApplicationFacade() + app_facade.connect(self.connection) + + log.debug( + 'Destroying unit%s %s', + 's' if len(unit_names) == 1 else '', + ' '.join(unit_names)) + + return await app_facade.Destroy(self.name) + destroy_units = destroy_unit + def get_backup(self, archive_id): """Download a backup archive file. @@ -697,3 +1173,248 @@ class Model(object): """ pass + + @property + def charmstore(self): + return self._charmstore + + +class BundleHandler(object): + """ + Handle bundles by using the API to translate bundle YAML into a plan of + steps and then dispatching each of those using the API. + """ + def __init__(self, model): + self.model = model + self.charmstore = model.charmstore + self.plan = [] + self.references = {} + self._units_by_app = {} + for unit_name, unit in model.units.items(): + app_units = self._units_by_app.setdefault(unit.application, []) + app_units.append(unit_name) + self.client_facade = client.ClientFacade() + self.client_facade.connect(model.connection) + self.app_facade = client.ApplicationFacade() + self.app_facade.connect(model.connection) + self.ann_facade = client.AnnotationsFacade() + self.ann_facade.connect(model.connection) + + async def fetch_plan(self, entity_id): + bundle_yaml = await self.charmstore.files(entity_id, + filename='bundle.yaml', + read_file=True) + self.bundle = yaml.safe_load(bundle_yaml) + self.plan = await self.client_facade.GetBundleChanges(bundle_yaml) + + async def execute_plan(self): + for step in self.plan.changes: + method = getattr(self, step.method) + result = await method(*step.args) + self.references[step.id_] = result + + @property + def applications(self): + return list(self.bundle['services'].keys()) + + def resolve(self, reference): + if reference and reference.startswith('$'): + reference = self.references[reference[1:]] + return reference + + async def addCharm(self, charm, series): + """ + :param charm string: + Charm holds the URL of the charm to be added. + + :param series string: + Series holds the series of the charm to be added + if the charm default is not sufficient. + """ + entity_id = await self.charmstore.entityId(charm) + log.debug('Adding %s', entity_id) + await self.client_facade.AddCharm(None, entity_id) + return entity_id + + async def addMachines(self, series, constraints, container_type, + parent_id): + """ + :param series string: + Series holds the optional machine OS series. + + :param constraints string: + Constraints holds the optional machine constraints. + + :param Container_type string: + ContainerType optionally holds the type of the container (for + instance ""lxc" or kvm"). It is not specified for top level + machines. + + :param parent_id string: + ParentId optionally holds a placeholder pointing to another machine + change or to a unit change. This value is only specified in the + case this machine is a container, in which case also ContainerType + is set. + """ + params = client.AddMachineParams( + series=series, + constraints=constraints, + container_type=container_type, + parent_id=self.resolve(parent_id), + ) + results = await self.client_facade.AddMachines(params) + log.debug('Added new machine %s', results[0].machine) + return results[0].machine + + async def addRelation(self, endpoint1, endpoint2): + """ + :param endpoint1 string: + :param endpoint2 string: + Endpoint1 and Endpoint2 hold relation endpoints in the + "application:interface" form, where the application is always a + placeholder pointing to an application change, and the interface is + optional. Examples are "$deploy-42:web" or just "$deploy-42". + """ + endpoints = [endpoint1, endpoint2] + # resolve indirect references + for i in range(len(endpoints)): + parts = endpoints[i].split(':') + parts[0] = self.resolve(parts[0]) + endpoints[i] = ':'.join(parts) + + log.info('Relating %s <-> %s', *endpoints) + return await self.model.add_relation(*endpoints) + + async def deploy(self, charm, series, application, options, constraints, + storage, endpoint_bindings, resources): + """ + :param charm string: + Charm holds the URL of the charm to be used to deploy this + application. + + :param series string: + Series holds the series of the application to be deployed + if the charm default is not sufficient. + + :param application string: + Application holds the application name. + + :param options map[string]interface{}: + Options holds application options. + + :param constraints string: + Constraints holds the optional application constraints. + + :param storage map[string]string: + Storage holds the optional storage constraints. + + :param endpoint_bindings map[string]string: + EndpointBindings holds the optional endpoint bindings + + :param resources map[string]int: + Resources identifies the revision to use for each resource + of the application's charm. + """ + # resolve indirect references + charm = self.resolve(charm) + # stringify all config values for API + options = {k: str(v) for k, v in options.items()} + # build param object + app = client.ApplicationDeploy( + charm_url=charm, + series=series, + application=application, + config=options, + constraints=constraints, + storage=storage, + endpoint_bindings=endpoint_bindings, + resources=resources, + ) + # do the do + log.info('Deploying %s', charm) + await self.app_facade.Deploy([app]) + return application + + async def addUnit(self, application, to): + """ + :param application string: + Application holds the application placeholder name for which a unit + is added. + + :param to string: + To holds the optional location where to add the unit, as a + placeholder pointing to another unit change or to a machine change. + """ + application = self.resolve(application) + placement = self.resolve(to) + if self._units_by_app.get(application): + # enough units for this application already exist; + # claim one, and carry on + # NB: this should probably honor placement, but the juju client + # doesn't, so we're not bothering, either + unit_name = self._units_by_app[application].pop() + log.debug('Reusing unit %s for %s', unit_name, application) + return self.model.units[unit_name] + + log.debug('Adding new unit for %s%s', application, + ' to %s' % placement if placement else '') + return await self.model.applications[application].add_unit( + count=1, + to=placement, + ) + + async def expose(self, application): + """ + :param application string: + Application holds the placeholder name of the application that must + be exposed. + """ + application = self.resolve(application) + log.info('Exposing %s', application) + return await self.model.applications[application].expose() + + async def setAnnotations(self, id_, entity_type, annotations): + """ + :param id_ string: + Id is the placeholder for the application or machine change + corresponding to the entity to be annotated. + + :param entity_type EntityType: + EntityType holds the type of the entity, "application" or + "machine". + + :param annotations map[string]string: + Annotations holds the annotations as key/value pairs. + """ + entity_id = self.resolve(id_) + try: + entity = self.model.state.get_entity(entity_type, entity_id) + except KeyError: + entity = await self.model._wait_for_new(entity_type, entity_id) + return await entity.set_annotations(annotations) + + +class CharmStore(object): + """ + Async wrapper around theblues.charmstore.CharmStore + """ + def __init__(self, loop): + self.loop = loop + self._cs = charmstore.CharmStore() + + def __getattr__(self, name): + """ + Wrap method calls in coroutines that use run_in_executor to make them + async. + """ + attr = getattr(self._cs, name) + if not callable(attr): + wrapper = partial(getattr, self._cs, name) + setattr(self, name, wrapper) + else: + async def coro(*args, **kwargs): + method = partial(attr, *args, **kwargs) + return await self.loop.run_in_executor(None, method) + setattr(self, name, coro) + wrapper = coro + return wrapper