Async model updates
authorTim Van Steenburgh <tvansteenburgh@gmail.com>
Fri, 24 Jun 2016 21:38:02 +0000 (17:38 -0400)
committerTim Van Steenburgh <tvansteenburgh@gmail.com>
Fri, 24 Jun 2016 21:38:02 +0000 (17:38 -0400)
examples/livemodel.py [new file with mode: 0644]
juju/action.py [new file with mode: 0644]
juju/application.py [new file with mode: 0644]
juju/client/overrides.py
juju/delta.py [new file with mode: 0644]
juju/machine.py
juju/model.py
juju/service.py [deleted file]
juju/unit.py

diff --git a/examples/livemodel.py b/examples/livemodel.py
new file mode 100644 (file)
index 0000000..2330a58
--- /dev/null
@@ -0,0 +1,22 @@
+import asyncio
+
+from juju.model import Model
+from juju.client.connection import Connection
+
+
+loop = asyncio.get_event_loop()
+conn = loop.run_until_complete(Connection.connect_current())
+
+
+def on_model_change(delta, old, new, model):
+    print(delta.entity, delta.type, delta.data)
+    print(old)
+    print(new)
+    print(model)
+
+async def watch_model():
+    model = Model(conn)
+    model.add_observer(on_model_change)
+    await model.watch()
+
+loop.run_until_complete(watch_model())
diff --git a/juju/action.py b/juju/action.py
new file mode 100644 (file)
index 0000000..941fa94
--- /dev/null
@@ -0,0 +1,5 @@
+from . import model
+
+
+class Action(model.ModelEntity):
+    pass
diff --git a/juju/application.py b/juju/application.py
new file mode 100644 (file)
index 0000000..a72533d
--- /dev/null
@@ -0,0 +1,181 @@
+from . import model
+
+
+class Application(model.ModelEntity):
+    def add_relation(self, local_relation, remote_relation):
+        """Add a relation to another service.
+
+        :param str local_relation: Name of relation on this service
+        :param str remote_relation: Name of relation on the other service in
+            the form '<service>[:<relation_name>]'
+
+        """
+        pass
+
+    def add_unit(self, count=1, to=None):
+        """Add one or more units to this service.
+
+        :param int count: Number of units to add
+        :param str to: Placement directive, e.g.::
+            '23' - machine 23
+            'lxc:7' - new lxc container on machine 7
+            '24/lxc/3' - lxc container 3 or machine 24
+
+            If None, a new machine is provisioned.
+
+        """
+        pass
+    add_units = add_unit
+
+    def allocate(self, budget, value):
+        """Allocate budget to this service.
+
+        :param str budget: Name of budget
+        :param int value: Budget limit
+
+        """
+        pass
+
+    def attach(self, resource_name, file_path):
+        """Upload a file as a resource for this service.
+
+        :param str resource: Name of the resource
+        :param str file_path: Path to the file to upload
+
+        """
+        pass
+
+    def collect_metrics(self):
+        """Collect metrics on this service.
+
+        """
+        pass
+
+    def destroy_relation(self, local_relation, remote_relation):
+        """Remove a relation to another service.
+
+        :param str local_relation: Name of relation on this service
+        :param str remote_relation: Name of relation on the other service in
+            the form '<service>[:<relation_name>]'
+
+        """
+        pass
+    remove_relation = destroy_relation
+
+    def destroy(self):
+        """Remove this service from the model.
+
+        """
+        pass
+    remove = destroy
+
+    def expose(self):
+        """Make this service publicly available over the network.
+
+        """
+        pass
+
+    def get_config(self):
+        """Return the configuration settings for this service.
+
+        """
+        pass
+
+    def get_constraints(self):
+        """Return the machine constraints for this service.
+
+        """
+        pass
+
+    def get_actions(self, schema=False):
+        """Get actions defined for this service.
+
+        :param bool schema: Return the full action schema
+
+        """
+        pass
+
+    def get_resources(self, details=False):
+        """Return resources for this service.
+
+        :param bool details: Include detailed info about resources used by each
+            unit
+
+        """
+        pass
+
+    def run(self, command, timeout=None):
+        """Run command on all units for this service.
+
+        :param str command: The command to run
+        :param int timeout: Time to wait before command is considered failed
+
+        """
+        pass
+
+    def set_config(self, to_default=False, **config):
+        """Set configuration options for this service.
+
+        :param bool to_default: Set service options to default values
+        :param \*\*config: Config key/values
+
+        """
+        pass
+
+    def set_constraints(self, constraints):
+        """Set machine constraints for this service.
+
+        :param :class:`juju.Constraints` constraints: Machine constraints
+
+        """
+        pass
+
+    def set_meter_status(self, status, info=None):
+        """Set the meter status on this status.
+
+        :param str status: Meter status, e.g. 'RED', 'AMBER'
+        :param str info: Extra info message
+
+        """
+        pass
+
+    def set_plan(self, plan_name):
+        """Set the plan for this service, effective immediately.
+
+        :param str plan_name: Name of plan
+
+        """
+        pass
+
+    def unexpose(self):
+        """Remove public availability over the network for this service.
+
+        """
+        pass
+
+    def update_allocation(self, allocation):
+        """Update existing allocation for this service.
+
+        :param int allocation: The allocation to set
+
+        """
+        pass
+
+    def upgrade_charm(
+            self, channel=None, force_series=False, force_units=False,
+            path=None, resources=None, revision=-1, switch=None):
+        """Upgrade the charm for this service.
+
+        :param str channel: Channel to use when getting the charm from the
+            charm store, e.g. 'development'
+        :param bool force_series: Upgrade even if series of deployed service
+            is not supported by the new charm
+        :param bool force_units: Upgrade all units immediately, even if in
+            error state
+        :param str path: Uprade to a charm located at path
+        :param dict resources: Dictionary of resource name/filepath pairs
+        :param int revision: Explicit upgrade revision
+        :param str switch: Crossgrade charm url
+
+        """
+        pass
index 1cce4da..6ce2ee1 100644 (file)
@@ -1,3 +1,5 @@
+from collections import namedtuple
+
 from .facade import Type
 
 __all__ = [
@@ -15,6 +17,13 @@ class Delta(Type):
         '''
         self.deltas = deltas
 
+        Change = namedtuple('Change', 'entity type data')
+        change = Change(*self.deltas)
+
+        self.entity = change.entity
+        self.type = change.type
+        self.data = change.data
+
     @classmethod
     def from_json(cls, data):
         return cls(deltas=data)
diff --git a/juju/delta.py b/juju/delta.py
new file mode 100644 (file)
index 0000000..247c81d
--- /dev/null
@@ -0,0 +1,50 @@
+from .client import client
+
+
+def get_entity_delta(d):
+    _delta_types = {
+        'application': ApplicationDelta,
+        'machine': MachineDelta,
+        'unit': UnitDelta,
+        'action': ActionDelta,
+    }
+
+    return _delta_types[d.entity](d.deltas)
+
+
+class EntityDelta(client.Delta):
+    def get_id(self):
+        return self.data['Id']
+
+    def get_entity_class(self):
+        return None
+
+
+class ApplicationDelta(EntityDelta):
+    def get_id(self):
+        return self.data['Name']
+
+    def get_entity_class(self):
+        from .application import Application
+        return Application
+
+
+class MachineDelta(EntityDelta):
+    def get_entity_class(self):
+        from .machine import Machine
+        return Machine
+
+
+class UnitDelta(EntityDelta):
+    def get_id(self):
+        return self.data['Name']
+
+    def get_entity_class(self):
+        from .unit import Unit
+        return Unit
+
+
+class ActionDelta(EntityDelta):
+    def get_entity_class(self):
+        from .action import Action
+        return Action
index 4bc8ff6..a7cb6a9 100644 (file)
@@ -1,4 +1,7 @@
-class Machine(object):
+from . import model
+
+
+class Machine(model.ModelEntity):
     def run(self, command, timeout=None):
         """Run command on this machine.
 
index 9d638cb..2ef903b 100644 (file)
@@ -1,4 +1,122 @@
+from .client import watcher
+from .delta import get_entity_delta
+
+
+class ModelEntity(object):
+    """An object in the Model tree"""
+
+    def __init__(self, data, model):
+        """Initialize a new entity
+
+        :param data: dict of data from a watcher delta
+        :param model: The model instance in whose object tree this
+            entity resides
+
+        """
+        self.data = data
+        self.model = model
+
+
 class Model(object):
+    def __init__(self, connection):
+        """Instantiate a new connected Model.
+
+        :param connection: `juju.client.connection.Connection` instance
+
+        """
+        self.connection = connection
+        self.observers = set()
+        self.state = dict()
+
+    def add_observer(self, callable_):
+        """Register an "on-model-change" callback
+
+        Once a watch is started (Model.watch() is called), ``callable_``
+        will be called each time the model changes. callable_ should
+        accept the following positional arguments:
+
+            delta - An instance of :class:`juju.delta.EntityDelta`
+                containing the raw delta data recv'd from the Juju
+                websocket.
+
+            old_obj - If the delta modifies an existing object in the model,
+                old_obj will be a copy of that object, as it was before the
+                delta was applied. Will be None if the delta creates a new
+                entity in the model.
+
+            new_obj - A copy of the new or updated object, after the delta
+                is applied. Will be None if the delta removes an entity
+                from the model.
+
+            model - The :class:`Model` itself.
+
+        """
+        self.observers.add(callable_)
+
+    async def watch(self):
+        """Start an asynchronous watch against this model.
+
+        See :meth:`add_observer` to register an onchange callback.
+
+        """
+        allwatcher = watcher.AllWatcher()
+        allwatcher.connect(self.connection)
+        while True:
+            results = await allwatcher.Next()
+            for delta in results.deltas:
+                delta = get_entity_delta(delta)
+                old_obj, new_obj = self._apply_delta(delta)
+                self._notify_observers(delta, old_obj, new_obj)
+
+    def _apply_delta(self, delta):
+        """Apply delta to our model state and return the a copy of the
+        affected object as it was before and after the update, e.g.:
+
+            old_obj, new_obj = self._apply_delta(delta)
+
+        old_obj may be None if the delta is for the creation of a new object,
+        e.g. a new application or unit is deployed.
+
+        new_obj may be if no object was created or updated, or if an object
+        was deleted as a result of the delta being applied.
+
+        """
+        old_obj, new_obj = None, None
+
+        if (delta.entity in self.state and
+                delta.get_id() in self.state[delta.entity]):
+            old_obj = self.state[delta.entity][delta.get_id()]
+            if delta.type == 'remove':
+                del self.state[delta.entity][delta.get_id()]
+                return old_obj, new_obj
+
+        new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = (
+            self._create_model_entity(delta))
+
+        return old_obj, new_obj
+
+    def _create_model_entity(self, delta):
+        """Return an object instance representing the entity created or
+        updated by ``delta``
+
+        """
+        entity_class = delta.get_entity_class()
+        return entity_class(delta.data, self)
+
+    def _notify_observers(self, delta, old_obj, new_obj):
+        """Call observing callbacks, notifying them of a change in model state
+
+        :param delta: The raw change from the watcher
+            (:class:`juju.client.overrides.Delta`)
+        :param old_obj: The object in the model that this delta updates.
+            May be None.
+        :param new_obj: The object in the model that is created or updated
+            by applying this delta.
+
+        """
+        for o in self.observers:
+            o(delta, old_obj, new_obj, self)
+
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
             count=1):
diff --git a/juju/service.py b/juju/service.py
deleted file mode 100644 (file)
index 39570b8..0000000
+++ /dev/null
@@ -1,178 +0,0 @@
-class Service(object):
-    def add_relation(self, local_relation, remote_relation):
-        """Add a relation to another service.
-
-        :param str local_relation: Name of relation on this service
-        :param str remote_relation: Name of relation on the other service in
-            the form '<service>[:<relation_name>]'
-
-        """
-        pass
-
-    def add_unit(self, count=1, to=None):
-        """Add one or more units to this service.
-
-        :param int count: Number of units to add
-        :param str to: Placement directive, e.g.::
-            '23' - machine 23
-            'lxc:7' - new lxc container on machine 7
-            '24/lxc/3' - lxc container 3 or machine 24
-
-            If None, a new machine is provisioned.
-
-        """
-        pass
-    add_units = add_unit
-
-    def allocate(self, budget, value):
-        """Allocate budget to this service.
-
-        :param str budget: Name of budget
-        :param int value: Budget limit
-
-        """
-        pass
-
-    def attach(self, resource_name, file_path):
-        """Upload a file as a resource for this service.
-
-        :param str resource: Name of the resource
-        :param str file_path: Path to the file to upload
-
-        """
-        pass
-
-    def collect_metrics(self):
-        """Collect metrics on this service.
-
-        """
-        pass
-
-    def destroy_relation(self, local_relation, remote_relation):
-        """Remove a relation to another service.
-
-        :param str local_relation: Name of relation on this service
-        :param str remote_relation: Name of relation on the other service in
-            the form '<service>[:<relation_name>]'
-
-        """
-        pass
-    remove_relation = destroy_relation
-
-    def destroy(self):
-        """Remove this service from the model.
-
-        """
-        pass
-    remove = destroy
-
-    def expose(self):
-        """Make this service publicly available over the network.
-
-        """
-        pass
-
-    def get_config(self):
-        """Return the configuration settings for this service.
-
-        """
-        pass
-
-    def get_constraints(self):
-        """Return the machine constraints for this service.
-
-        """
-        pass
-
-    def get_actions(self, schema=False):
-        """Get actions defined for this service.
-
-        :param bool schema: Return the full action schema
-
-        """
-        pass
-
-    def get_resources(self, details=False):
-        """Return resources for this service.
-
-        :param bool details: Include detailed info about resources used by each
-            unit
-
-        """
-        pass
-
-    def run(self, command, timeout=None):
-        """Run command on all units for this service.
-
-        :param str command: The command to run
-        :param int timeout: Time to wait before command is considered failed
-
-        """
-        pass
-
-    def set_config(self, to_default=False, **config):
-        """Set configuration options for this service.
-
-        :param bool to_default: Set service options to default values
-        :param \*\*config: Config key/values
-
-        """
-        pass
-
-    def set_constraints(self, constraints):
-        """Set machine constraints for this service.
-
-        :param :class:`juju.Constraints` constraints: Machine constraints
-
-        """
-        pass
-
-    def set_meter_status(self, status, info=None):
-        """Set the meter status on this status.
-
-        :param str status: Meter status, e.g. 'RED', 'AMBER'
-        :param str info: Extra info message
-
-        """
-        pass
-
-    def set_plan(self, plan_name):
-        """Set the plan for this service, effective immediately.
-
-        :param str plan_name: Name of plan
-
-        """
-        pass
-
-    def unexpose(self):
-        """Remove public availability over the network for this service.
-
-        """
-        pass
-
-    def update_allocation(self, allocation):
-        """Update existing allocation for this service.
-
-        :param int allocation: The allocation to set
-
-        """
-        pass
-
-    def upgrade_charm(
-            self, channel=None, force_series=False, force_units=False,
-            path=None, resources=None, revision=-1, switch=None):
-        """Upgrade the charm for this service.
-
-        :param str channel: Channel to use when getting the charm from the
-            charm store, e.g. 'development'
-        :param bool force_series: Upgrade even if series of deployed service
-            is not supported by the new charm
-        :param bool force_units: Upgrade all units immediately, even if in
-            error state
-        :param str path: Uprade to a charm located at path
-        :param dict resources: Dictionary of resource name/filepath pairs
-        :param int revision: Explicit upgrade revision
-        :param str switch: Crossgrade charm url
-
-        """
-        pass
index 900ade9..4d75484 100644 (file)
@@ -1,4 +1,7 @@
-class Unit(object):
+from . import model
+
+
+class Unit(model.ModelEntity):
     def add_storage(self, name, constraints=None):
         """Add unit storage dynamically.