class MyModelObserver(ModelObserver):
+ _shutting_down = False
+
async def on_change(self, delta, old, new, model):
- if model.all_units_idle():
+ if model.all_units_idle() and not self._shutting_down:
+ self._shutting_down = True
logging.debug('All units idle, disconnecting')
+ await model.reset(force=True)
await model.disconnect()
model.loop.stop()
await model.reset(force=True)
model.add_observer(MyModelObserver())
- await model.deploy(
+ ubuntu_app = await model.deploy(
'ubuntu-0',
service_name='ubuntu',
series='trusty',
channel='stable',
)
+ ubuntu_app.on_change(asyncio.coroutine(
+ lambda delta, old_app, new_app, model:
+ print('App changed: {}'.format(new_app.entity_id))
+ ))
+ ubuntu_app.on_remove(asyncio.coroutine(
+ lambda delta, old_app, new_app, model:
+ print('App removed: {}'.format(old_app.entity_id))
+ ))
+ ubuntu_app.on_unit_add(asyncio.coroutine(
+ lambda delta, old_unit, new_unit, model:
+ print('Unit added: {}'.format(new_unit.entity_id))
+ ))
+ ubuntu_app.on_unit_remove(asyncio.coroutine(
+ lambda delta, old_unit, new_unit, model:
+ print('Unit removed: {}'.format(old_unit.entity_id))
+ ))
await model.deploy(
'nrpe-11',
service_name='nrpe',
import asyncio
import collections
import logging
+import re
+import weakref
from concurrent.futures import CancelledError
from functools import partial
log = logging.getLogger(__name__)
+class _Observer(object):
+ """Wrapper around an observer callable.
+
+ This wrapper allows filter criteria to be associated with the
+ callable so that it's only called for changes that meet the criteria.
+
+ """
+ def __init__(self, callable_, entity_type, action, entity_id):
+ self.callable_ = callable_
+ self.entity_type = entity_type
+ self.action = action
+ self.entity_id = entity_id
+ if self.entity_id:
+ if not self.entity_id.startswith('^'):
+ self.entity_id = '^' + self.entity_id
+ if not self.entity_id.endswith('$'):
+ self.entity_id += '$'
+
+ async def __call__(self, delta, old, new, model):
+ await self.callable_(delta, old, new, model)
+
+ def cares_about(self, entity_type, action, entity_id):
+ """Return True if this observer "cares about" (i.e. wants to be
+ called) for a change matching the entity_type, action, and
+ entity_id parameters.
+
+ """
+ if (self.entity_id and entity_id and
+ not re.match(self.entity_id, str(entity_id))):
+ return False
+
+ if self.entity_type and self.entity_type != entity_type:
+ return False
+
+ if self.action and self.action != action:
+ return False
+
+ return True
+
+
class ModelObserver(object):
async def __call__(self, delta, old, new, model):
- if old is None and new is not None:
- type_ = 'add'
- else:
- type_ = delta.type
- handler_name = 'on_{}_{}'.format(delta.entity, type_)
+ handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
method = getattr(self, handler_name, self.on_change)
await method(delta, old, new, model)
self.model = model
self.state = dict()
+ def clear(self):
+ self.state.clear()
+
def _live_entity_map(self, entity_type):
"""Return an id:Entity map of all the living entities of
type ``entity_type``.
updated by ``delta``
"""
+ """
+ log.debug(
+ 'Getting %s:%s at index %s',
+ entity_type, entity_id, history_index)
+ """
+
if history_index < 0 and history_index != -1:
history_index += len(self.entity_history(entity_type, entity_id))
+ if history_index < 0:
+ return None
try:
self.entity_data(entity_type, entity_id, history_index)
self.entity_type, self.entity_id))
return self.data[name]
+ def __bool__(self):
+ return bool(self.data)
+
+ def on_change(self, callable_):
+ """Add a change observer to this entity.
+
+ """
+ self.model.add_observer(
+ callable_, self.entity_type, 'change', self.entity_id)
+
+ def on_remove(self, callable_):
+ """Add a remove observer to this entity.
+
+ """
+ self.model.add_observer(
+ callable_, self.entity_type, 'remove', self.entity_id)
+
@property
def entity_type(self):
"""A string identifying the entity type of this object, e.g.
entity in the underlying model.
This will be True except when the object represents an entity at a
- prior state in history, e.g. if the object was obtained by calling
+ non-latest state in history, e.g. if the object was obtained by calling
.previous() on another object.
"""
"""
self.loop = loop or asyncio.get_event_loop()
self.connection = None
- self.observers = set()
+ self.observers = weakref.WeakValueDictionary()
self.state = ModelState(self)
self._watcher_task = None
self._watch_shutdown = asyncio.Event(loop=loop)
await self.block_until(
lambda: len(self.machines) == 0
)
+ self.state.clear()
async def block_until(self, *conditions, timeout=None):
"""Return only after all conditions are true.
"""
async def _block():
while not all(c() for c in conditions):
- await asyncio.sleep(.1)
+ await asyncio.sleep(0)
await asyncio.wait_for(_block(), timeout)
@property
"""
return self.state.units
- def add_observer(self, callable_):
+ def add_observer(
+ self, callable_, entity_type=None, action=None, entity_id=None):
"""Register an "on-model-change" callback
- Once a watch is started (Model.watch() is called), ``callable_``
+ Once the model is connected, ``callable_``
will be called each time the model changes. callable_ should
be Awaitable and accept the following positional arguments:
model - The :class:`Model` itself.
+ Events for which ``callable_`` is called can be specified by passing
+ entity_type, action, and/or id_ filter criteria, e.g.:
+
+ add_observer(
+ myfunc, entity_type='application', action='add', id_='ubuntu')
+
"""
- self.observers.add(callable_)
+ observer = _Observer(callable_, entity_type, action, entity_id)
+ self.observers[observer] = callable_
def _watch(self):
"""Start an asynchronous watch against this model.
by applying this delta.
"""
+ if not old_obj:
+ delta.type = 'add'
+
log.debug(
'Model changed: %s %s %s',
delta.entity, delta.type, delta.get_id())
+
for o in self.observers:
- asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ if o.cares_about(delta.entity, delta.type, delta.get_id()):
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+
+ async def _wait_for_new(self, entity_type, entity_id):
+ """Wait for a new object to appear in the Model and return it.
+
+ Waits for an object of type ``entity_type`` with id ``entity_id``.
+
+ This coroutine blocks until the new object appears in the model.
+
+ """
+ entity_added = asyncio.Event(loop=self.loop)
+
+ async def callback(delta, old, new, model):
+ entity_added.set()
+ self.add_observer(callback, entity_type, 'add', entity_id)
+ await entity_added.wait()
+ return self.state._live_entity_map(entity_type)[entity_id]
def add_machine(
self, spec=None, constraints=None, disks=None, series=None,
storage=storage,
)
- return await app_facade.Deploy([app])
+ await app_facade.Deploy([app])
+ return await self._wait_for_new('application', service_name)
def destroy(self):
"""Terminate all machines and resources for this model.