Add ModelEntity callbacks and async return values.
authorTim Van Steenburgh <tvansteenburgh@gmail.com>
Mon, 17 Oct 2016 17:41:52 +0000 (13:41 -0400)
committerTim Van Steenburgh <tvansteenburgh@gmail.com>
Mon, 17 Oct 2016 17:53:21 +0000 (13:53 -0400)
- Change callbacks can now be registered directly on ModelEntity
  objects.
- Coroutines that create new objects in the model can (and should)
  now return the new ModelEntity by using Model._wait_for_new(). Only
  implemented for app = Model.deploy() so far but should be easy to
  add for others.

TODO
examples/relate.py
juju/application.py
juju/model.py

diff --git a/TODO b/TODO
index 394b228..94fc342 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,28 +1,5 @@
 TODO
 ====
 
-Model as state history of immutable objects
--------------------------------------------
-
-1. Model gets delta from AllWatcher
-2. Entity type+id in the delta uniquely identifies an entity in the model
-3. The model keeps a history deque for each of these entities
-4. The new delta is appended to the deque for this entity
-5. When a new python object is created for this entitiy, it:
-   a. Registers an observer with the model so it'll get change callbacks
-   b. Gets its state from a pointer back to the last item in the model's
-      history deque for this entity
-   c. Has a previous() method that returns a copy of the object at its previous
-      frame of history (or None if no previous history exists). This object
-      would be disconnected (would not receive live updates from the model).
-
-
-Make model-changing methods (like deploy()) return the appropriate object
--------------------------------------------------------------------------
-For objects being added (newly created), this will require that the method
-doesn't return until the AllWatcher returns a delta containing an id for
-the newly created thing.
-
-
-Add a LogWatcher coroutine that yields from debug-log api
----------------------------------------------------------
+- Add a LogWatcher coroutine that yields from debug-log api
+- Add a way to exit the event loop when a Model matches a bundle yaml
index bdfb2d7..fa32900 100644 (file)
@@ -27,9 +27,13 @@ class MyRemoveObserver(ModelObserver):
 
 
 class MyModelObserver(ModelObserver):
+    _shutting_down = False
+
     async def on_change(self, delta, old, new, model):
-        if model.all_units_idle():
+        if model.all_units_idle() and not self._shutting_down:
+            self._shutting_down = True
             logging.debug('All units idle, disconnecting')
+            await model.reset(force=True)
             await model.disconnect()
             model.loop.stop()
 
@@ -42,12 +46,28 @@ async def run():
     await model.reset(force=True)
     model.add_observer(MyModelObserver())
 
-    await model.deploy(
+    ubuntu_app = await model.deploy(
         'ubuntu-0',
         service_name='ubuntu',
         series='trusty',
         channel='stable',
     )
+    ubuntu_app.on_change(asyncio.coroutine(
+        lambda delta, old_app, new_app, model:
+            print('App changed: {}'.format(new_app.entity_id))
+    ))
+    ubuntu_app.on_remove(asyncio.coroutine(
+        lambda delta, old_app, new_app, model:
+            print('App removed: {}'.format(old_app.entity_id))
+    ))
+    ubuntu_app.on_unit_add(asyncio.coroutine(
+        lambda delta, old_unit, new_unit, model:
+            print('Unit added: {}'.format(new_unit.entity_id))
+    ))
+    ubuntu_app.on_unit_remove(asyncio.coroutine(
+        lambda delta, old_unit, new_unit, model:
+            print('Unit removed: {}'.format(old_unit.entity_id))
+    ))
     await model.deploy(
         'nrpe-11',
         service_name='nrpe',
index 978500f..1e87ced 100644 (file)
@@ -7,6 +7,26 @@ log = logging.getLogger(__name__)
 
 
 class Application(model.ModelEntity):
+    @property
+    def _unit_match_pattern(self):
+        return r'^{}.*$'.format(self.entity_id)
+
+    def on_unit_add(self, callable_):
+        """Add a "unit added" observer to this entity, which will be called
+        whenever a unit is added to this application.
+
+        """
+        self.model.add_observer(
+            callable_, 'unit', 'add', self._unit_match_pattern)
+
+    def on_unit_remove(self, callable_):
+        """Add a "unit removed" observer to this entity, which will be called
+        whenever a unit is removed from this application.
+
+        """
+        self.model.add_observer(
+            callable_, 'unit', 'remove', self._unit_match_pattern)
+
     @property
     def units(self):
         return [
index fe07f80..5d436fd 100644 (file)
@@ -1,6 +1,8 @@
 import asyncio
 import collections
 import logging
+import re
+import weakref
 from concurrent.futures import CancelledError
 from functools import partial
 
@@ -17,13 +19,49 @@ from .errors import JujuAPIError
 log = logging.getLogger(__name__)
 
 
+class _Observer(object):
+    """Wrapper around an observer callable.
+
+    This wrapper allows filter criteria to be associated with the
+    callable so that it's only called for changes that meet the criteria.
+
+    """
+    def __init__(self, callable_, entity_type, action, entity_id):
+        self.callable_ = callable_
+        self.entity_type = entity_type
+        self.action = action
+        self.entity_id = entity_id
+        if self.entity_id:
+            if not self.entity_id.startswith('^'):
+                self.entity_id = '^' + self.entity_id
+            if not self.entity_id.endswith('$'):
+                self.entity_id += '$'
+
+    async def __call__(self, delta, old, new, model):
+        await self.callable_(delta, old, new, model)
+
+    def cares_about(self, entity_type, action, entity_id):
+        """Return True if this observer "cares about" (i.e. wants to be
+        called) for a change matching the entity_type, action, and
+        entity_id parameters.
+
+        """
+        if (self.entity_id and entity_id and
+                not re.match(self.entity_id, str(entity_id))):
+            return False
+
+        if self.entity_type and self.entity_type != entity_type:
+            return False
+
+        if self.action and self.action != action:
+            return False
+
+        return True
+
+
 class ModelObserver(object):
     async def __call__(self, delta, old, new, model):
-        if old is None and new is not None:
-            type_ = 'add'
-        else:
-            type_ = delta.type
-        handler_name = 'on_{}_{}'.format(delta.entity, type_)
+        handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
         method = getattr(self, handler_name, self.on_change)
         await method(delta, old, new, model)
 
@@ -40,6 +78,9 @@ class ModelState(object):
         self.model = model
         self.state = dict()
 
+    def clear(self):
+        self.state.clear()
+
     def _live_entity_map(self, entity_type):
         """Return an id:Entity map of all the living entities of
         type ``entity_type``.
@@ -120,8 +161,16 @@ class ModelState(object):
         updated by ``delta``
 
         """
+        """
+        log.debug(
+            'Getting %s:%s at index %s',
+            entity_type, entity_id, history_index)
+        """
+
         if history_index < 0 and history_index != -1:
             history_index += len(self.entity_history(entity_type, entity_id))
+            if history_index < 0:
+                return None
 
         try:
             self.entity_data(entity_type, entity_id, history_index)
@@ -168,6 +217,23 @@ class ModelEntity(object):
                     self.entity_type, self.entity_id))
         return self.data[name]
 
+    def __bool__(self):
+        return bool(self.data)
+
+    def on_change(self, callable_):
+        """Add a change observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'change', self.entity_id)
+
+    def on_remove(self, callable_):
+        """Add a remove observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'remove', self.entity_id)
+
     @property
     def entity_type(self):
         """A string identifying the entity type of this object, e.g.
@@ -182,7 +248,7 @@ class ModelEntity(object):
         entity in the underlying model.
 
         This will be True except when the object represents an entity at a
-        prior state in history, e.g. if the object was obtained by calling
+        non-latest state in history, e.g. if the object was obtained by calling
         .previous() on another object.
 
         """
@@ -276,7 +342,7 @@ class Model(object):
         """
         self.loop = loop or asyncio.get_event_loop()
         self.connection = None
-        self.observers = set()
+        self.observers = weakref.WeakValueDictionary()
         self.state = ModelState(self)
         self._watcher_task = None
         self._watch_shutdown = asyncio.Event(loop=loop)
@@ -329,6 +395,7 @@ class Model(object):
         await self.block_until(
             lambda: len(self.machines) == 0
         )
+        self.state.clear()
 
     async def block_until(self, *conditions, timeout=None):
         """Return only after all conditions are true.
@@ -336,7 +403,7 @@ class Model(object):
         """
         async def _block():
             while not all(c() for c in conditions):
-                await asyncio.sleep(.1)
+                await asyncio.sleep(0)
         await asyncio.wait_for(_block(), timeout)
 
     @property
@@ -363,10 +430,11 @@ class Model(object):
         """
         return self.state.units
 
-    def add_observer(self, callable_):
+    def add_observer(
+            self, callable_, entity_type=None, action=None, entity_id=None):
         """Register an "on-model-change" callback
 
-        Once a watch is started (Model.watch() is called), ``callable_``
+        Once the model is connected, ``callable_``
         will be called each time the model changes. callable_ should
         be Awaitable and accept the following positional arguments:
 
@@ -385,8 +453,15 @@ class Model(object):
 
             model - The :class:`Model` itself.
 
+        Events for which ``callable_`` is called can be specified by passing
+        entity_type, action, and/or id_ filter criteria, e.g.:
+
+            add_observer(
+                myfunc, entity_type='application', action='add', id_='ubuntu')
+
         """
-        self.observers.add(callable_)
+        observer = _Observer(callable_, entity_type, action, entity_id)
+        self.observers[observer] = callable_
 
     def _watch(self):
         """Start an asynchronous watch against this model.
@@ -442,11 +517,32 @@ class Model(object):
             by applying this delta.
 
         """
+        if not old_obj:
+            delta.type = 'add'
+
         log.debug(
             'Model changed: %s %s %s',
             delta.entity, delta.type, delta.get_id())
+
         for o in self.observers:
-            asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+            if o.cares_about(delta.entity, delta.type, delta.get_id()):
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+
+    async def _wait_for_new(self, entity_type, entity_id):
+        """Wait for a new object to appear in the Model and return it.
+
+        Waits for an object of type ``entity_type`` with id ``entity_id``.
+
+        This coroutine blocks until the new object appears in the model.
+
+        """
+        entity_added = asyncio.Event(loop=self.loop)
+
+        async def callback(delta, old, new, model):
+            entity_added.set()
+        self.add_observer(callback, entity_type, 'add', entity_id)
+        await entity_added.wait()
+        return self.state._live_entity_map(entity_type)[entity_id]
 
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
@@ -686,7 +782,8 @@ class Model(object):
                 storage=storage,
             )
 
-            return await app_facade.Deploy([app])
+            await app_facade.Deploy([app])
+            return await self._wait_for_new('application', service_name)
 
     def destroy(self):
         """Terminate all machines and resources for this model.