import asyncio
+import collections
import logging
from concurrent.futures import CancelledError
from .client import watcher
from .client import connection
from .delta import get_entity_delta
+from .delta import get_entity_class
+from .exceptions import DeadEntityException
log = logging.getLogger(__name__)
type_ = delta.type
handler_name = 'on_{}_{}'.format(delta.entity, type_)
method = getattr(self, handler_name, self.on_change)
- log.debug(
- 'Model changed: %s %s %s',
- delta.entity, delta.type, delta.get_id())
await method(delta, old, new, model)
async def on_change(self, delta, old, new, model):
pass
+class ModelState(object):
+ """Holds the state of the model, including the delta history of all
+ entities in the model.
+
+ """
+ def __init__(self, model):
+ self.model = model
+ self.state = dict()
+
+ def _live_entity_map(self, entity_type):
+ """Return an id:Entity map of all the living entities of
+ type ``entity_type``.
+
+ """
+ return {
+ entity_id: self.get_entity(entity_type, entity_id)
+ for entity_id, history in self.state.get(entity_type, {}).items()
+ if history[-1] is not None
+ }
+
+ @property
+ def applications(self):
+ """Return a map of application-name:Application for all applications
+ currently in the model.
+
+ """
+ return self._live_entity_map('application')
+
+ @property
+ def machines(self):
+ """Return a map of machine-id:Machine for all machines currently in
+ the model.
+
+ """
+ return self._live_entity_map('machine')
+
+ @property
+ def units(self):
+ """Return a map of unit-id:Unit for all units currently in
+ the model.
+
+ """
+ return self._live_entity_map('unit')
+
+ def entity_history(self, entity_type, entity_id):
+ """Return the history deque for an entity.
+
+ """
+ return self.state[entity_type][entity_id]
+
+ def entity_data(self, entity_type, entity_id, history_index):
+ """Return the data dict for an entity at a specific index of its
+ history.
+
+ """
+ return self.entity_history(entity_type, entity_id)[history_index]
+
+ def apply_delta(self, delta):
+ """Apply delta to our state and return a copy of the
+ affected object as it was before and after the update, e.g.:
+
+ old_obj, new_obj = self.apply_delta(delta)
+
+ old_obj may be None if the delta is for the creation of a new object,
+ e.g. a new application or unit is deployed.
+
+ new_obj will never be None, but may be dead (new_obj.dead == True)
+ if the object was deleted as a result of the delta being applied.
+
+ """
+ history = (
+ self.state
+ .setdefault(delta.entity, {})
+ .setdefault(delta.get_id(), collections.deque())
+ )
+
+ history.append(delta.data)
+ if delta.type == 'remove':
+ history.append(None)
+
+ entity = self.get_entity(delta.entity, delta.get_id())
+ return entity.previous(), entity
+
+ def get_entity(
+ self, entity_type, entity_id, history_index=-1, connected=True):
+ """Return an object instance representing the entity created or
+ updated by ``delta``
+
+ """
+ if history_index < 0 and history_index != -1:
+ history_index += len(self.entity_history(entity_type, entity_id))
+
+ try:
+ self.entity_data(entity_type, entity_id, history_index)
+ except IndexError:
+ return None
+
+ entity_class = get_entity_class(entity_type)
+ return entity_class(
+ entity_id, self.model, history_index=history_index,
+ connected=connected)
+
+
class ModelEntity(object):
"""An object in the Model tree"""
- def __init__(self, data, model):
+ def __init__(self, entity_id, model, history_index=-1, connected=True):
"""Initialize a new entity
- :param data: dict of data from a watcher delta
+ :param entity_id str: The unique id of the object in the model
:param model: The model instance in whose object tree this
entity resides
+ :history_index int: The index of this object's state in the model's
+ history deque for this entity
+ :connected bool: Flag indicating whether this object gets live updates
+ from the model.
"""
- self.data = data
+ self.entity_id = entity_id
self.model = model
+ self._history_index = history_index
+ self.connected = connected
self.connection = model.connection
def __getattr__(self, name):
+ """Fetch object attributes from the underlying data dict held in the
+ model.
+
+ """
+ if self.data is None:
+ raise DeadEntityException(
+ "Entity {}:{} is dead - its attributes can no longer be "
+ "accessed. Use the .previous() method on this object to get "
+ "a copy of the object at its previous state.".format(
+ self.entity_type, self.entity_id))
return self.data[name]
+ @property
+ def entity_type(self):
+ """A string identifying the entity type of this object, e.g.
+ 'application' or 'unit', etc.
+
+ """
+ return self.__class__.__name__.lower()
+
+ @property
+ def current(self):
+ """Return True if this object represents the current state of the
+ entity in the underlying model.
+
+ This will be True except when the object represents an entity at a
+ prior state in history, e.g. if the object was obtained by calling
+ .previous() on another object.
+
+ """
+ return self._history_index == -1
+
+ @property
+ def dead(self):
+ """Returns True if this entity no longer exists in the underlying
+ model.
+
+ """
+ return (
+ self.data is None or
+ self.model.state.entity_data(
+ self.entity_type, self.entity_id, -1) is None
+ )
+
+ @property
+ def alive(self):
+ """Returns True if this entity still exists in the underlying
+ model.
+
+ """
+ return not self.dead
+
+ @property
+ def data(self):
+ """The data dictionary for this entity.
+
+ """
+ return self.model.state.entity_data(
+ self.entity_type, self.entity_id, self._history_index)
+
+ def previous(self):
+ """Return a copy of this object as was at its previous state in
+ history.
+
+ Returns None if this object is new (and therefore has no history).
+
+ The returned object is always "disconnected", i.e. does not receive
+ live updates.
+
+ """
+ return self.model.state.get_entity(
+ self.entity_type, self.entity_id, self._history_index - 1,
+ connected=False)
+
+ def next(self):
+ """Return a copy of this object at its next state in
+ history.
+
+ Returns None if this object is already the latest.
+
+ The returned object is "disconnected", i.e. does not receive
+ live updates, unless it is current (latest).
+
+ """
+ if self._history_index == -1:
+ return None
+
+ new_index = self._history_index + 1
+ connected = (
+ new_index == len(self.model.state.entity_history(
+ self.entity_type, self.entity_id)) - 1
+ )
+ return self.model.state.get_entity(
+ self.entity_type, self.entity_id, self._history_index - 1,
+ connected=connected)
+
+ def latest(self):
+ """Return a copy of this object at its current state in the model.
+
+ Returns self if this object is already the latest.
+
+ The returned object is always "connected", i.e. receives
+ live updates from the model.
+
+ """
+ if self._history_index == -1:
+ return self
+
+ return self.model.state.get_entity(self.entity_type, self.entity_id)
+
class Model(object):
def __init__(self, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.connection = None
self.observers = set()
- self.state = dict()
+ self.state = ModelState(self)
self._watcher_task = None
self._watch_shutdown = asyncio.Event(loop=loop)
self._watch_received = asyncio.Event(loop=loop)
means no applications or machines exist in the model.
"""
+ log.debug('Resetting model')
for app in self.applications.values():
await app.destroy()
for machine in self.machines.values():
currently in the model.
"""
- return self.state.get('application', {})
+ return self.state.applications
@property
def machines(self):
the model.
"""
- return self.state.get('machine', {})
+ return self.state.machines
@property
def units(self):
the model.
"""
- return self.state.get('unit', {})
+ return self.state.units
def add_observer(self, callable_):
"""Register an "on-model-change" callback
results = await allwatcher.Next()
for delta in results.deltas:
delta = get_entity_delta(delta)
- old_obj, new_obj = self._apply_delta(delta)
+ old_obj, new_obj = self.state.apply_delta(delta)
# XXX: Might not want to shield at this level
# We are shielding because when the watcher is
# canceled (on disconnect()), we don't want all of
if self._watcher_task:
self._watcher_task.cancel()
- def _apply_delta(self, delta):
- """Apply delta to our model state and return the a copy of the
- affected object as it was before and after the update, e.g.:
-
- old_obj, new_obj = self._apply_delta(delta)
-
- old_obj may be None if the delta is for the creation of a new object,
- e.g. a new application or unit is deployed.
-
- new_obj may be None if no object was created or updated, or if an
- object was deleted as a result of the delta being applied.
-
- """
- old_obj, new_obj = None, None
-
- if (delta.entity in self.state and
- delta.get_id() in self.state[delta.entity]):
- old_obj = self.state[delta.entity][delta.get_id()]
- if delta.type == 'remove':
- del self.state[delta.entity][delta.get_id()]
- return old_obj, new_obj
-
- new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = (
- self._create_model_entity(delta))
-
- return old_obj, new_obj
-
- def _create_model_entity(self, delta):
- """Return an object instance representing the entity created or
- updated by ``delta``
-
- """
- entity_class = delta.get_entity_class()
- return entity_class(delta.data, self)
-
async def _notify_observers(self, delta, old_obj, new_obj):
"""Call observing callbacks, notifying them of a change in model state
by applying this delta.
"""
+ log.debug(
+ 'Model changed: %s %s %s',
+ delta.entity, delta.type, delta.get_id())
for o in self.observers:
asyncio.ensure_future(o(delta, old_obj, new_obj, self))