From 46887b86181254efd72f59a8d6be468342afad83 Mon Sep 17 00:00:00 2001 From: Tim Van Steenburgh Date: Fri, 14 Oct 2016 12:15:26 -0400 Subject: [PATCH] Refactor model state management. Maintain entity state history in the model, not in the entities. Add methods for navigating an entity's history. --- examples/relate.py | 14 +++ juju/delta.py | 30 +++-- juju/exceptions.py | 2 + juju/model.py | 278 +++++++++++++++++++++++++++++++++++++-------- 4 files changed, 269 insertions(+), 55 deletions(-) create mode 100644 juju/exceptions.py diff --git a/examples/relate.py b/examples/relate.py index 288d58f..bdfb2d7 100644 --- a/examples/relate.py +++ b/examples/relate.py @@ -13,6 +13,19 @@ import logging from juju.model import Model, ModelObserver +class MyRemoveObserver(ModelObserver): + async def on_change(self, delta, old, new, model): + if delta.type == 'remove': + assert(new.latest() == new) + assert(new.next() is None) + assert(new.dead) + assert(new.current) + assert(new.connected) + assert(new.previous().dead) + assert(not new.previous().current) + assert(not new.previous().connected) + + class MyModelObserver(ModelObserver): async def on_change(self, delta, old, new, model): if model.all_units_idle(): @@ -25,6 +38,7 @@ async def run(): model = Model() await model.connect_current() + model.add_observer(MyRemoveObserver()) await model.reset(force=True) model.add_observer(MyModelObserver()) diff --git a/juju/delta.py b/juju/delta.py index 0b142dd..b1eba23 100644 --- a/juju/delta.py +++ b/juju/delta.py @@ -2,27 +2,24 @@ from .client import client def get_entity_delta(d): - _delta_types = { - 'action': ActionDelta, - 'application': ApplicationDelta, - 'annotation': AnnotationDelta, - 'machine': MachineDelta, - 'unit': UnitDelta, - 'relation': RelationDelta, - } - return _delta_types[d.entity](d.deltas) +def get_entity_class(entity_type): + return _delta_types[entity_type].get_entity_class() + + class EntityDelta(client.Delta): def get_id(self): return self.data['id'] + @classmethod def get_entity_class(self): return None class ActionDelta(EntityDelta): + @classmethod def get_entity_class(self): from .action import Action return Action @@ -32,6 +29,7 @@ class ApplicationDelta(EntityDelta): def get_id(self): return self.data['name'] + @classmethod def get_entity_class(self): from .application import Application return Application @@ -41,12 +39,14 @@ class AnnotationDelta(EntityDelta): def get_id(self): return self.data['tag'] + @classmethod def get_entity_class(self): from .annotation import Annotation return Annotation class MachineDelta(EntityDelta): + @classmethod def get_entity_class(self): from .machine import Machine return Machine @@ -56,12 +56,24 @@ class UnitDelta(EntityDelta): def get_id(self): return self.data['name'] + @classmethod def get_entity_class(self): from .unit import Unit return Unit class RelationDelta(EntityDelta): + @classmethod def get_entity_class(self): from .relation import Relation return Relation + + +_delta_types = { + 'action': ActionDelta, + 'application': ApplicationDelta, + 'annotation': AnnotationDelta, + 'machine': MachineDelta, + 'unit': UnitDelta, + 'relation': RelationDelta, +} diff --git a/juju/exceptions.py b/juju/exceptions.py new file mode 100644 index 0000000..bcf07b6 --- /dev/null +++ b/juju/exceptions.py @@ -0,0 +1,2 @@ +class DeadEntityException(Exception): + pass diff --git a/juju/model.py b/juju/model.py index 677657f..6bdb261 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,4 +1,5 @@ import asyncio +import collections import logging from concurrent.futures import CancelledError @@ -6,6 +7,8 @@ from .client import client from .client import watcher from .client import connection from .delta import get_entity_delta +from .delta import get_entity_class +from .exceptions import DeadEntityException log = logging.getLogger(__name__) @@ -18,33 +21,247 @@ class ModelObserver(object): type_ = delta.type handler_name = 'on_{}_{}'.format(delta.entity, type_) method = getattr(self, handler_name, self.on_change) - log.debug( - 'Model changed: %s %s %s', - delta.entity, delta.type, delta.get_id()) await method(delta, old, new, model) async def on_change(self, delta, old, new, model): pass +class ModelState(object): + """Holds the state of the model, including the delta history of all + entities in the model. + + """ + def __init__(self, model): + self.model = model + self.state = dict() + + def _live_entity_map(self, entity_type): + """Return an id:Entity map of all the living entities of + type ``entity_type``. + + """ + return { + entity_id: self.get_entity(entity_type, entity_id) + for entity_id, history in self.state.get(entity_type, {}).items() + if history[-1] is not None + } + + @property + def applications(self): + """Return a map of application-name:Application for all applications + currently in the model. + + """ + return self._live_entity_map('application') + + @property + def machines(self): + """Return a map of machine-id:Machine for all machines currently in + the model. + + """ + return self._live_entity_map('machine') + + @property + def units(self): + """Return a map of unit-id:Unit for all units currently in + the model. + + """ + return self._live_entity_map('unit') + + def entity_history(self, entity_type, entity_id): + """Return the history deque for an entity. + + """ + return self.state[entity_type][entity_id] + + def entity_data(self, entity_type, entity_id, history_index): + """Return the data dict for an entity at a specific index of its + history. + + """ + return self.entity_history(entity_type, entity_id)[history_index] + + def apply_delta(self, delta): + """Apply delta to our state and return a copy of the + affected object as it was before and after the update, e.g.: + + old_obj, new_obj = self.apply_delta(delta) + + old_obj may be None if the delta is for the creation of a new object, + e.g. a new application or unit is deployed. + + new_obj will never be None, but may be dead (new_obj.dead == True) + if the object was deleted as a result of the delta being applied. + + """ + history = ( + self.state + .setdefault(delta.entity, {}) + .setdefault(delta.get_id(), collections.deque()) + ) + + history.append(delta.data) + if delta.type == 'remove': + history.append(None) + + entity = self.get_entity(delta.entity, delta.get_id()) + return entity.previous(), entity + + def get_entity( + self, entity_type, entity_id, history_index=-1, connected=True): + """Return an object instance representing the entity created or + updated by ``delta`` + + """ + if history_index < 0 and history_index != -1: + history_index += len(self.entity_history(entity_type, entity_id)) + + try: + self.entity_data(entity_type, entity_id, history_index) + except IndexError: + return None + + entity_class = get_entity_class(entity_type) + return entity_class( + entity_id, self.model, history_index=history_index, + connected=connected) + + class ModelEntity(object): """An object in the Model tree""" - def __init__(self, data, model): + def __init__(self, entity_id, model, history_index=-1, connected=True): """Initialize a new entity - :param data: dict of data from a watcher delta + :param entity_id str: The unique id of the object in the model :param model: The model instance in whose object tree this entity resides + :history_index int: The index of this object's state in the model's + history deque for this entity + :connected bool: Flag indicating whether this object gets live updates + from the model. """ - self.data = data + self.entity_id = entity_id self.model = model + self._history_index = history_index + self.connected = connected self.connection = model.connection def __getattr__(self, name): + """Fetch object attributes from the underlying data dict held in the + model. + + """ + if self.data is None: + raise DeadEntityException( + "Entity {}:{} is dead - its attributes can no longer be " + "accessed. Use the .previous() method on this object to get " + "a copy of the object at its previous state.".format( + self.entity_type, self.entity_id)) return self.data[name] + @property + def entity_type(self): + """A string identifying the entity type of this object, e.g. + 'application' or 'unit', etc. + + """ + return self.__class__.__name__.lower() + + @property + def current(self): + """Return True if this object represents the current state of the + entity in the underlying model. + + This will be True except when the object represents an entity at a + prior state in history, e.g. if the object was obtained by calling + .previous() on another object. + + """ + return self._history_index == -1 + + @property + def dead(self): + """Returns True if this entity no longer exists in the underlying + model. + + """ + return ( + self.data is None or + self.model.state.entity_data( + self.entity_type, self.entity_id, -1) is None + ) + + @property + def alive(self): + """Returns True if this entity still exists in the underlying + model. + + """ + return not self.dead + + @property + def data(self): + """The data dictionary for this entity. + + """ + return self.model.state.entity_data( + self.entity_type, self.entity_id, self._history_index) + + def previous(self): + """Return a copy of this object as was at its previous state in + history. + + Returns None if this object is new (and therefore has no history). + + The returned object is always "disconnected", i.e. does not receive + live updates. + + """ + return self.model.state.get_entity( + self.entity_type, self.entity_id, self._history_index - 1, + connected=False) + + def next(self): + """Return a copy of this object at its next state in + history. + + Returns None if this object is already the latest. + + The returned object is "disconnected", i.e. does not receive + live updates, unless it is current (latest). + + """ + if self._history_index == -1: + return None + + new_index = self._history_index + 1 + connected = ( + new_index == len(self.model.state.entity_history( + self.entity_type, self.entity_id)) - 1 + ) + return self.model.state.get_entity( + self.entity_type, self.entity_id, self._history_index - 1, + connected=connected) + + def latest(self): + """Return a copy of this object at its current state in the model. + + Returns self if this object is already the latest. + + The returned object is always "connected", i.e. receives + live updates from the model. + + """ + if self._history_index == -1: + return self + + return self.model.state.get_entity(self.entity_type, self.entity_id) + class Model(object): def __init__(self, loop=None): @@ -56,7 +273,7 @@ class Model(object): self.loop = loop or asyncio.get_event_loop() self.connection = None self.observers = set() - self.state = dict() + self.state = ModelState(self) self._watcher_task = None self._watch_shutdown = asyncio.Event(loop=loop) self._watch_received = asyncio.Event(loop=loop) @@ -99,6 +316,7 @@ class Model(object): means no applications or machines exist in the model. """ + log.debug('Resetting model') for app in self.applications.values(): await app.destroy() for machine in self.machines.values(): @@ -122,7 +340,7 @@ class Model(object): currently in the model. """ - return self.state.get('application', {}) + return self.state.applications @property def machines(self): @@ -130,7 +348,7 @@ class Model(object): the model. """ - return self.state.get('machine', {}) + return self.state.machines @property def units(self): @@ -138,7 +356,7 @@ class Model(object): the model. """ - return self.state.get('unit', {}) + return self.state.units def add_observer(self, callable_): """Register an "on-model-change" callback @@ -181,7 +399,7 @@ class Model(object): results = await allwatcher.Next() for delta in results.deltas: delta = get_entity_delta(delta) - old_obj, new_obj = self._apply_delta(delta) + old_obj, new_obj = self.state.apply_delta(delta) # XXX: Might not want to shield at this level # We are shielding because when the watcher is # canceled (on disconnect()), we don't want all of @@ -208,41 +426,6 @@ class Model(object): if self._watcher_task: self._watcher_task.cancel() - def _apply_delta(self, delta): - """Apply delta to our model state and return the a copy of the - affected object as it was before and after the update, e.g.: - - old_obj, new_obj = self._apply_delta(delta) - - old_obj may be None if the delta is for the creation of a new object, - e.g. a new application or unit is deployed. - - new_obj may be None if no object was created or updated, or if an - object was deleted as a result of the delta being applied. - - """ - old_obj, new_obj = None, None - - if (delta.entity in self.state and - delta.get_id() in self.state[delta.entity]): - old_obj = self.state[delta.entity][delta.get_id()] - if delta.type == 'remove': - del self.state[delta.entity][delta.get_id()] - return old_obj, new_obj - - new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = ( - self._create_model_entity(delta)) - - return old_obj, new_obj - - def _create_model_entity(self, delta): - """Return an object instance representing the entity created or - updated by ``delta`` - - """ - entity_class = delta.get_entity_class() - return entity_class(delta.data, self) - async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state @@ -254,6 +437,9 @@ class Model(object): by applying this delta. """ + log.debug( + 'Model changed: %s %s %s', + delta.entity, delta.type, delta.get_id()) for o in self.observers: asyncio.ensure_future(o(delta, old_obj, new_obj, self)) -- 2.25.1