from concurrent.futures import CancelledError
from functools import partial
+import yaml
from theblues import charmstore
from .client import client
callable so that it's only called for changes that meet the criteria.
"""
- def __init__(self, callable_, entity_type, action, entity_id):
+ def __init__(self, callable_, entity_type, action, entity_id, predicate):
self.callable_ = callable_
self.entity_type = entity_type
self.action = action
self.entity_id = entity_id
+ self.predicate = predicate
if self.entity_id:
+ self.entity_id = str(self.entity_id)
if not self.entity_id.startswith('^'):
self.entity_id = '^' + self.entity_id
if not self.entity_id.endswith('$'):
async def __call__(self, delta, old, new, model):
await self.callable_(delta, old, new, model)
- def cares_about(self, entity_type, action, entity_id):
+ def cares_about(self, delta):
"""Return True if this observer "cares about" (i.e. wants to be
- called) for a change matching the entity_type, action, and
- entity_id parameters.
+ called) for a this delta.
"""
- if (self.entity_id and entity_id and
- not re.match(self.entity_id, str(entity_id))):
+ if (self.entity_id and delta.get_id() and
+ not re.match(self.entity_id, str(delta.get_id()))):
return False
- if self.entity_type and self.entity_type != entity_type:
+ if self.entity_type and self.entity_type != delta.entity:
return False
- if self.action and self.action != action:
+ if self.action and self.action != delta.type:
+ return False
+
+ if self.predicate and not self.predicate(delta):
return False
return True
self.model = model
self.state = dict()
- def clear(self):
- self.state.clear()
-
def _live_entity_map(self, entity_type):
"""Return an id:Entity map of all the living entities of
type ``entity_type``.
await self.block_until(
lambda: len(self.machines) == 0
)
- self.state.clear()
async def block_until(self, *conditions, timeout=None):
"""Return only after all conditions are true.
return self.state.units
def add_observer(
- self, callable_, entity_type=None, action=None, entity_id=None):
+ self, callable_, entity_type=None, action=None, entity_id=None,
+ predicate=None):
"""Register an "on-model-change" callback
Once the model is connected, ``callable_``
add_observer(
myfunc, entity_type='application', action='add', id_='ubuntu')
+ For more complex filtering conditions, pass a predicate function. It
+ will be called with a delta as its only argument. If the predicate
+ function returns True, the callable_ will be called.
+
"""
- observer = _Observer(callable_, entity_type, action, entity_id)
+ observer = _Observer(
+ callable_, entity_type, action, entity_id, predicate)
self.observers[observer] = callable_
def _watch(self):
by applying this delta.
"""
- if not old_obj:
+ if new_obj and not old_obj:
delta.type = 'add'
log.debug(
delta.entity, delta.type, delta.get_id())
for o in self.observers:
- if o.cares_about(delta.entity, delta.type, delta.get_id()):
+ if o.cares_about(delta):
asyncio.ensure_future(o(delta, old_obj, new_obj, self))
- async def _wait_for_new(self, entity_type, entity_id):
+ async def _wait_for_new(self, entity_type, entity_id, predicate=None):
"""Wait for a new object to appear in the Model and return it.
Waits for an object of type ``entity_type`` with id ``entity_id``.
This coroutine blocks until the new object appears in the model.
"""
- entity_added = asyncio.Event(loop=self.loop)
+ entity_added = asyncio.Queue(loop=self.loop)
async def callback(delta, old, new, model):
- entity_added.set()
- self.add_observer(callback, entity_type, 'add', entity_id)
- await entity_added.wait()
+ await entity_added.put(delta.get_id())
+
+ self.add_observer(callback, entity_type, 'add', entity_id, predicate)
+ entity_id = await entity_added.get()
return self.state._live_entity_map(entity_type)[entity_id]
def add_machine(
log.debug(
'Adding relation %s <-> %s', relation1, relation2)
- return await app_facade.AddRelation([relation1, relation2])
+ try:
+ result = await app_facade.AddRelation([relation1, relation2])
+ except JujuAPIError as e:
+ if 'relation already exists' not in e.message:
+ raise
+ log.debug(
+ 'Relation %s <-> %s already exists', relation1, relation2)
+ # TODO: if relation already exists we should return the
+ # Relation ModelEntity here
+ return None
+
+ def predicate(delta):
+ endpoints = {}
+ for endpoint in delta.data['endpoints']:
+ endpoints[endpoint['application-name']] = endpoint['relation']
+ return endpoints == result.endpoints
+
+ return await self._wait_for_new('relation', None, predicate)
def add_space(self, name, *cidrs):
"""Add a new network space.
TODO::
- - entity_url must have a revision; look up latest automatically if
- not provided by caller
- service_name is required; fill this in automatically if not
provided by caller
- series is required; how do we pick a default?
handler = BundleHandler(self)
await handler.fetch_plan(entity_id)
await handler.execute_plan()
+ extant_apps = {app for app in self.applications}
+ pending_apps = set(handler.applications) - extant_apps
+ if pending_apps:
+ # new apps will usually be in the model by now, but if some
+ # haven't made it yet we'll need to wait on them to be added
+ await asyncio.gather(*[
+ asyncio.ensure_future(
+ self.model._wait_for_new('application', app_name))
+ for app_name in pending_apps
+ ])
+ return [app for name, app in self.applications.items()
+ if name in handler.applications]
else:
log.debug(
'Deploying %s', entity_id)
"""
pass
+ async def destroy_unit(self, *unit_names):
+ """Destroy units by name.
+
+ """
+ app_facade = client.ApplicationFacade()
+ app_facade.connect(self.connection)
+
+ log.debug(
+ 'Destroying unit%s %s',
+ 's' if len(unit_names) == 1 else '',
+ ' '.join(unit_names))
+
+ return await app_facade.Destroy(self.name)
+ destroy_units = destroy_unit
+
def get_backup(self, archive_id):
"""Download a backup archive file.
self.ann_facade.connect(model.connection)
async def fetch_plan(self, entity_id):
- yaml = await self.charmstore.files(entity_id,
- filename='bundle.yaml',
- read_file=True)
- self.plan = await self.client_facade.GetBundleChanges(yaml)
+ bundle_yaml = await self.charmstore.files(entity_id,
+ filename='bundle.yaml',
+ read_file=True)
+ self.bundle = yaml.safe_load(bundle_yaml)
+ self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
async def execute_plan(self):
for step in self.plan.changes:
result = await method(*step.args)
self.references[step.id_] = result
+ @property
+ def applications(self):
+ return list(self.bundle['services'].keys())
+
def resolve(self, reference):
if reference and reference.startswith('$'):
reference = self.references[reference[1:]]
parts = endpoints[i].split(':')
parts[0] = self.resolve(parts[0])
endpoints[i] = ':'.join(parts)
- try:
- await self.app_facade.AddRelation(endpoints)
- log.debug('Added relation %s <-> %s', *endpoints)
- except JujuAPIError as e:
- if 'relation already exists' not in e.message:
- raise
- log.debug('Relation %s <-> %s already exists', *endpoints)
- return None
+
+ log.info('Relating %s <-> %s', *endpoints)
+ return await self.model.add_relation(*endpoints)
async def deploy(self, charm, series, application, options, constraints,
storage, endpoint_bindings, resources):
resources=resources,
)
# do the do
- log.debug('Deploying %s', charm)
+ log.info('Deploying %s', charm)
await self.app_facade.Deploy([app])
return application
# doesn't, so we're not bothering, either
unit_name = self._units_by_app[application].pop()
log.debug('Reusing unit %s for %s', unit_name, application)
- return unit_name
- log.debug('Adding unit of %s%s',
- application,
- (' to %s' % placement) if placement else '')
- result = await self.app_facade.AddUnits(
- application=application,
- placement=placement,
- num_units=1,
+ return self.model.units[unit_name]
+
+ log.debug('Adding new unit for %s%s', application,
+ ' to %s' % placement if placement else '')
+ return await self.model.applications[application].add_unit(
+ count=1,
+ to=placement,
)
- return result.units[0]
async def expose(self, application):
"""
be exposed.
"""
application = self.resolve(application)
- log.debug('Exposing %s', application)
- await self.app_facade.Expose(application)
- return None
+ log.info('Exposing %s', application)
+ return await self.model.applications[application].expose()
async def setAnnotations(self, id_, entity_type, annotations):
"""
Annotations holds the annotations as key/value pairs.
"""
entity_id = self.resolve(id_)
- log.debug('Updating annotations of %s', entity_id)
- ann = client.EntityAnnotations(
- entity=entity_id,
- annotations=annotations,
- )
- await self.ann_facade.Set([ann])
- return None
+ try:
+ entity = self.model.state.get_entity(entity_type, entity_id)
+ except KeyError:
+ entity = await self.model._wait_for_new(entity_type, entity_id)
+ return await entity.set_annotations(annotations)
class CharmStore(object):