import asyncio
import collections
import logging
+import re
+import weakref
from concurrent.futures import CancelledError
from functools import partial
+import yaml
from theblues import charmstore
from .client import client
log = logging.getLogger(__name__)
+class _Observer(object):
+ """Wrapper around an observer callable.
+
+ This wrapper allows filter criteria to be associated with the
+ callable so that it's only called for changes that meet the criteria.
+
+ """
+ def __init__(self, callable_, entity_type, action, entity_id, predicate):
+ self.callable_ = callable_
+ self.entity_type = entity_type
+ self.action = action
+ self.entity_id = entity_id
+ self.predicate = predicate
+ if self.entity_id:
+ self.entity_id = str(self.entity_id)
+ if not self.entity_id.startswith('^'):
+ self.entity_id = '^' + self.entity_id
+ if not self.entity_id.endswith('$'):
+ self.entity_id += '$'
+
+ async def __call__(self, delta, old, new, model):
+ await self.callable_(delta, old, new, model)
+
+ def cares_about(self, delta):
+ """Return True if this observer "cares about" (i.e. wants to be
+ called) for a this delta.
+
+ """
+ if (self.entity_id and delta.get_id() and
+ not re.match(self.entity_id, str(delta.get_id()))):
+ return False
+
+ if self.entity_type and self.entity_type != delta.entity:
+ return False
+
+ if self.action and self.action != delta.type:
+ return False
+
+ if self.predicate and not self.predicate(delta):
+ return False
+
+ return True
+
+
class ModelObserver(object):
async def __call__(self, delta, old, new, model):
- if old is None and new is not None:
- type_ = 'add'
- else:
- type_ = delta.type
- handler_name = 'on_{}_{}'.format(delta.entity, type_)
+ handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
method = getattr(self, handler_name, self.on_change)
await method(delta, old, new, model)
updated by ``delta``
"""
+ """
+ log.debug(
+ 'Getting %s:%s at index %s',
+ entity_type, entity_id, history_index)
+ """
+
if history_index < 0 and history_index != -1:
history_index += len(self.entity_history(entity_type, entity_id))
+ if history_index < 0:
+ return None
try:
self.entity_data(entity_type, entity_id, history_index)
self.entity_type, self.entity_id))
return self.data[name]
+ def __bool__(self):
+ return bool(self.data)
+
+ def on_change(self, callable_):
+ """Add a change observer to this entity.
+
+ """
+ self.model.add_observer(
+ callable_, self.entity_type, 'change', self.entity_id)
+
+ def on_remove(self, callable_):
+ """Add a remove observer to this entity.
+
+ """
+ self.model.add_observer(
+ callable_, self.entity_type, 'remove', self.entity_id)
+
@property
def entity_type(self):
"""A string identifying the entity type of this object, e.g.
entity in the underlying model.
This will be True except when the object represents an entity at a
- prior state in history, e.g. if the object was obtained by calling
+ non-latest state in history, e.g. if the object was obtained by calling
.previous() on another object.
"""
"""
self.loop = loop or asyncio.get_event_loop()
self.connection = None
- self.observers = set()
+ self.observers = weakref.WeakValueDictionary()
self.state = ModelState(self)
self._watcher_task = None
self._watch_shutdown = asyncio.Event(loop=loop)
"""
async def _block():
while not all(c() for c in conditions):
- await asyncio.sleep(.1)
+ await asyncio.sleep(0)
await asyncio.wait_for(_block(), timeout)
@property
"""
return self.state.units
- def add_observer(self, callable_):
+ def add_observer(
+ self, callable_, entity_type=None, action=None, entity_id=None,
+ predicate=None):
"""Register an "on-model-change" callback
- Once a watch is started (Model.watch() is called), ``callable_``
+ Once the model is connected, ``callable_``
will be called each time the model changes. callable_ should
be Awaitable and accept the following positional arguments:
model - The :class:`Model` itself.
+ Events for which ``callable_`` is called can be specified by passing
+ entity_type, action, and/or id_ filter criteria, e.g.:
+
+ add_observer(
+ myfunc, entity_type='application', action='add', id_='ubuntu')
+
+ For more complex filtering conditions, pass a predicate function. It
+ will be called with a delta as its only argument. If the predicate
+ function returns True, the callable_ will be called.
+
"""
- self.observers.add(callable_)
+ observer = _Observer(
+ callable_, entity_type, action, entity_id, predicate)
+ self.observers[observer] = callable_
def _watch(self):
"""Start an asynchronous watch against this model.
by applying this delta.
"""
+ if new_obj and not old_obj:
+ delta.type = 'add'
+
log.debug(
'Model changed: %s %s %s',
delta.entity, delta.type, delta.get_id())
+
for o in self.observers:
- asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ if o.cares_about(delta):
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+
+ async def _wait_for_new(self, entity_type, entity_id, predicate=None):
+ """Wait for a new object to appear in the Model and return it.
+
+ Waits for an object of type ``entity_type`` with id ``entity_id``.
+
+ This coroutine blocks until the new object appears in the model.
+
+ """
+ entity_added = asyncio.Queue(loop=self.loop)
+
+ async def callback(delta, old, new, model):
+ await entity_added.put(delta.get_id())
+
+ self.add_observer(callback, entity_type, 'add', entity_id, predicate)
+ entity_id = await entity_added.get()
+ return self.state._live_entity_map(entity_type)[entity_id]
def add_machine(
self, spec=None, constraints=None, disks=None, series=None,
log.debug(
'Adding relation %s <-> %s', relation1, relation2)
- return await app_facade.AddRelation([relation1, relation2])
+ try:
+ result = await app_facade.AddRelation([relation1, relation2])
+ except JujuAPIError as e:
+ if 'relation already exists' not in e.message:
+ raise
+ log.debug(
+ 'Relation %s <-> %s already exists', relation1, relation2)
+ # TODO: if relation already exists we should return the
+ # Relation ModelEntity here
+ return None
+
+ def predicate(delta):
+ endpoints = {}
+ for endpoint in delta.data['endpoints']:
+ endpoints[endpoint['application-name']] = endpoint['relation']
+ return endpoints == result.endpoints
+
+ return await self._wait_for_new('relation', None, predicate)
def add_space(self, name, *cidrs):
"""Add a new network space.
TODO::
- - entity_url must have a revision; look up latest automatically if
- not provided by caller
- service_name is required; fill this in automatically if not
provided by caller
- series is required; how do we pick a default?
handler = BundleHandler(self)
await handler.fetch_plan(entity_id)
await handler.execute_plan()
+ extant_apps = {app for app in self.applications}
+ pending_apps = set(handler.applications) - extant_apps
+ if pending_apps:
+ # new apps will usually be in the model by now, but if some
+ # haven't made it yet we'll need to wait on them to be added
+ await asyncio.wait([self._wait_for_new('application', app_name)
+ for app_name in pending_apps])
+ return [app for name, app in self.applications.items()
+ if name in handler.applications]
else:
log.debug(
'Deploying %s', entity_id)
storage=storage,
)
- return await app_facade.Deploy([app])
+ await app_facade.Deploy([app])
+ return [await self._wait_for_new('application', service_name)]
def destroy(self):
"""Terminate all machines and resources for this model.
"""
pass
+ async def destroy_unit(self, *unit_names):
+ """Destroy units by name.
+
+ """
+ app_facade = client.ApplicationFacade()
+ app_facade.connect(self.connection)
+
+ log.debug(
+ 'Destroying unit%s %s',
+ 's' if len(unit_names) == 1 else '',
+ ' '.join(unit_names))
+
+ return await app_facade.Destroy(self.name)
+ destroy_units = destroy_unit
+
def get_backup(self, archive_id):
"""Download a backup archive file.
self.ann_facade.connect(model.connection)
async def fetch_plan(self, entity_id):
- yaml = await self.charmstore.files(entity_id,
- filename='bundle.yaml',
- read_file=True)
- self.plan = await self.client_facade.GetBundleChanges(yaml)
+ bundle_yaml = await self.charmstore.files(entity_id,
+ filename='bundle.yaml',
+ read_file=True)
+ self.bundle = yaml.safe_load(bundle_yaml)
+ self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
async def execute_plan(self):
for step in self.plan.changes:
result = await method(*step.args)
self.references[step.id_] = result
+ @property
+ def applications(self):
+ return list(self.bundle['services'].keys())
+
def resolve(self, reference):
if reference and reference.startswith('$'):
reference = self.references[reference[1:]]
parts = endpoints[i].split(':')
parts[0] = self.resolve(parts[0])
endpoints[i] = ':'.join(parts)
- try:
- await self.app_facade.AddRelation(endpoints)
- log.debug('Added relation %s <-> %s', *endpoints)
- except JujuAPIError as e:
- if 'relation already exists' not in e.message:
- raise
- log.debug('Relation %s <-> %s already exists', *endpoints)
- return None
+
+ log.info('Relating %s <-> %s', *endpoints)
+ return await self.model.add_relation(*endpoints)
async def deploy(self, charm, series, application, options, constraints,
storage, endpoint_bindings, resources):
resources=resources,
)
# do the do
- log.debug('Deploying %s', charm)
+ log.info('Deploying %s', charm)
await self.app_facade.Deploy([app])
return application
# doesn't, so we're not bothering, either
unit_name = self._units_by_app[application].pop()
log.debug('Reusing unit %s for %s', unit_name, application)
- return unit_name
- log.debug('Adding unit of %s%s',
- application,
- (' to %s' % placement) if placement else '')
- result = await self.app_facade.AddUnits(
- application=application,
- placement=placement,
- num_units=1,
+ return self.model.units[unit_name]
+
+ log.debug('Adding new unit for %s%s', application,
+ ' to %s' % placement if placement else '')
+ return await self.model.applications[application].add_unit(
+ count=1,
+ to=placement,
)
- return result.units[0]
async def expose(self, application):
"""
be exposed.
"""
application = self.resolve(application)
- log.debug('Exposing %s', application)
- await self.app_facade.Expose(application)
- return None
+ log.info('Exposing %s', application)
+ return await self.model.applications[application].expose()
async def setAnnotations(self, id_, entity_type, annotations):
"""
Annotations holds the annotations as key/value pairs.
"""
entity_id = self.resolve(id_)
- log.debug('Updating annotations of %s', entity_id)
- ann = client.EntityAnnotations(
- entity=entity_id,
- annotations=annotations,
- )
- await self.ann_facade.Set([ann])
- return None
+ try:
+ entity = self.model.state.get_entity(entity_type, entity_id)
+ except KeyError:
+ entity = await self.model._wait_for_new(entity_type, entity_id)
+ return await entity.set_annotations(annotations)
class CharmStore(object):