+import asyncio
+import collections
+import logging
+import re
+import weakref
+from concurrent.futures import CancelledError
+from functools import partial
+
+import yaml
+from theblues import charmstore
+
+from .client import client
+from .client import watcher
+from .client import connection
+from .constraints import parse as parse_constraints
+from .delta import get_entity_delta
+from .delta import get_entity_class
+from .exceptions import DeadEntityException
+from .errors import JujuAPIError
+
+log = logging.getLogger(__name__)
+
+
+class _Observer(object):
+ """Wrapper around an observer callable.
+
+ This wrapper allows filter criteria to be associated with the
+ callable so that it's only called for changes that meet the criteria.
+
+ """
+ def __init__(self, callable_, entity_type, action, entity_id, predicate):
+ self.callable_ = callable_
+ self.entity_type = entity_type
+ self.action = action
+ self.entity_id = entity_id
+ self.predicate = predicate
+ if self.entity_id:
+ self.entity_id = str(self.entity_id)
+ if not self.entity_id.startswith('^'):
+ self.entity_id = '^' + self.entity_id
+ if not self.entity_id.endswith('$'):
+ self.entity_id += '$'
+
+ async def __call__(self, delta, old, new, model):
+ await self.callable_(delta, old, new, model)
+
+ def cares_about(self, delta):
+ """Return True if this observer "cares about" (i.e. wants to be
+ called) for a this delta.
+
+ """
+ if (self.entity_id and delta.get_id() and
+ not re.match(self.entity_id, str(delta.get_id()))):
+ return False
+
+ if self.entity_type and self.entity_type != delta.entity:
+ return False
+
+ if self.action and self.action != delta.type:
+ return False
+
+ if self.predicate and not self.predicate(delta):
+ return False
+
+ return True
+
+
+class ModelObserver(object):
+ async def __call__(self, delta, old, new, model):
+ handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
+ method = getattr(self, handler_name, self.on_change)
+ await method(delta, old, new, model)
+
+ async def on_change(self, delta, old, new, model):
+ pass
+
+
+class ModelState(object):
+ """Holds the state of the model, including the delta history of all
+ entities in the model.
+
+ """
+ def __init__(self, model):
+ self.model = model
+ self.state = dict()
+
+ def _live_entity_map(self, entity_type):
+ """Return an id:Entity map of all the living entities of
+ type ``entity_type``.
+
+ """
+ return {
+ entity_id: self.get_entity(entity_type, entity_id)
+ for entity_id, history in self.state.get(entity_type, {}).items()
+ if history[-1] is not None
+ }
+
+ @property
+ def applications(self):
+ """Return a map of application-name:Application for all applications
+ currently in the model.
+
+ """
+ return self._live_entity_map('application')
+
+ @property
+ def machines(self):
+ """Return a map of machine-id:Machine for all machines currently in
+ the model.
+
+ """
+ return self._live_entity_map('machine')
+
+ @property
+ def units(self):
+ """Return a map of unit-id:Unit for all units currently in
+ the model.
+
+ """
+ return self._live_entity_map('unit')
+
+ def entity_history(self, entity_type, entity_id):
+ """Return the history deque for an entity.
+
+ """
+ return self.state[entity_type][entity_id]
+
+ def entity_data(self, entity_type, entity_id, history_index):
+ """Return the data dict for an entity at a specific index of its
+ history.
+
+ """
+ return self.entity_history(entity_type, entity_id)[history_index]
+
+ def apply_delta(self, delta):
+ """Apply delta to our state and return a copy of the
+ affected object as it was before and after the update, e.g.:
+
+ old_obj, new_obj = self.apply_delta(delta)
+
+ old_obj may be None if the delta is for the creation of a new object,
+ e.g. a new application or unit is deployed.
+
+ new_obj will never be None, but may be dead (new_obj.dead == True)
+ if the object was deleted as a result of the delta being applied.
+
+ """
+ history = (
+ self.state
+ .setdefault(delta.entity, {})
+ .setdefault(delta.get_id(), collections.deque())
+ )
+
+ history.append(delta.data)
+ if delta.type == 'remove':
+ history.append(None)
+
+ entity = self.get_entity(delta.entity, delta.get_id())
+ return entity.previous(), entity
+
+ def get_entity(
+ self, entity_type, entity_id, history_index=-1, connected=True):
+ """Return an object instance for the given entity_type and id.
+
+ By default the object state matches the most recent state from
+ Juju. To get an instance of the object in an older state, pass
+ history_index, an index into the history deque for the entity.
+
+ """
+
+ if history_index < 0 and history_index != -1:
+ history_index += len(self.entity_history(entity_type, entity_id))
+ if history_index < 0:
+ return None
+
+ try:
+ self.entity_data(entity_type, entity_id, history_index)
+ except IndexError:
+ return None
+
+ entity_class = get_entity_class(entity_type)
+ return entity_class(
+ entity_id, self.model, history_index=history_index,
+ connected=connected)
+
+
+class ModelEntity(object):
+ """An object in the Model tree"""
+
+ def __init__(self, entity_id, model, history_index=-1, connected=True):
+ """Initialize a new entity
+
+ :param entity_id str: The unique id of the object in the model
+ :param model: The model instance in whose object tree this
+ entity resides
+ :history_index int: The index of this object's state in the model's
+ history deque for this entity
+ :connected bool: Flag indicating whether this object gets live updates
+ from the model.
+
+ """
+ self.entity_id = entity_id
+ self.model = model
+ self._history_index = history_index
+ self.connected = connected
+ self.connection = model.connection
+
+ def __getattr__(self, name):
+ """Fetch object attributes from the underlying data dict held in the
+ model.
+
+ """
+ if self.data is None:
+ raise DeadEntityException(
+ "Entity {}:{} is dead - its attributes can no longer be "
+ "accessed. Use the .previous() method on this object to get "
+ "a copy of the object at its previous state.".format(
+ self.entity_type, self.entity_id))
+ return self.data[name]
+
+ def __bool__(self):
+ return bool(self.data)
+
+ def on_change(self, callable_):
+ """Add a change observer to this entity.
+
+ """
+ self.model.add_observer(
+ callable_, self.entity_type, 'change', self.entity_id)
+
+ def on_remove(self, callable_):
+ """Add a remove observer to this entity.
+
+ """
+ self.model.add_observer(
+ callable_, self.entity_type, 'remove', self.entity_id)
+
+ @property
+ def entity_type(self):
+ """A string identifying the entity type of this object, e.g.
+ 'application' or 'unit', etc.
+
+ """
+ return self.__class__.__name__.lower()
+
+ @property
+ def current(self):
+ """Return True if this object represents the current state of the
+ entity in the underlying model.
+
+ This will be True except when the object represents an entity at a
+ non-latest state in history, e.g. if the object was obtained by calling
+ .previous() on another object.
+
+ """
+ return self._history_index == -1
+
+ @property
+ def dead(self):
+ """Returns True if this entity no longer exists in the underlying
+ model.
+
+ """
+ return (
+ self.data is None or
+ self.model.state.entity_data(
+ self.entity_type, self.entity_id, -1) is None
+ )
+
+ @property
+ def alive(self):
+ """Returns True if this entity still exists in the underlying
+ model.
+
+ """
+ return not self.dead
+
+ @property
+ def data(self):
+ """The data dictionary for this entity.
+
+ """
+ return self.model.state.entity_data(
+ self.entity_type, self.entity_id, self._history_index)
+
+ def previous(self):
+ """Return a copy of this object as was at its previous state in
+ history.
+
+ Returns None if this object is new (and therefore has no history).
+
+ The returned object is always "disconnected", i.e. does not receive
+ live updates.
+
+ """
+ return self.model.state.get_entity(
+ self.entity_type, self.entity_id, self._history_index - 1,
+ connected=False)
+
+ def next(self):
+ """Return a copy of this object at its next state in
+ history.
+
+ Returns None if this object is already the latest.
+
+ The returned object is "disconnected", i.e. does not receive
+ live updates, unless it is current (latest).
+
+ """
+ if self._history_index == -1:
+ return None
+
+ new_index = self._history_index + 1
+ connected = (
+ new_index == len(self.model.state.entity_history(
+ self.entity_type, self.entity_id)) - 1
+ )
+ return self.model.state.get_entity(
+ self.entity_type, self.entity_id, self._history_index - 1,
+ connected=connected)
+
+ def latest(self):
+ """Return a copy of this object at its current state in the model.
+
+ Returns self if this object is already the latest.
+
+ The returned object is always "connected", i.e. receives
+ live updates from the model.
+
+ """
+ if self._history_index == -1:
+ return self
+
+ return self.model.state.get_entity(self.entity_type, self.entity_id)
+
+