From: Tim Van Steenburgh Date: Fri, 24 Jun 2016 21:38:02 +0000 (-0400) Subject: Async model updates X-Git-Tag: 0.1.0~91 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;ds=inline;h=fe2d2f1a5ef2453359858481929a2526ea1a3c5c;p=osm%2FN2VC.git Async model updates --- diff --git a/examples/livemodel.py b/examples/livemodel.py new file mode 100644 index 0000000..2330a58 --- /dev/null +++ b/examples/livemodel.py @@ -0,0 +1,22 @@ +import asyncio + +from juju.model import Model +from juju.client.connection import Connection + + +loop = asyncio.get_event_loop() +conn = loop.run_until_complete(Connection.connect_current()) + + +def on_model_change(delta, old, new, model): + print(delta.entity, delta.type, delta.data) + print(old) + print(new) + print(model) + +async def watch_model(): + model = Model(conn) + model.add_observer(on_model_change) + await model.watch() + +loop.run_until_complete(watch_model()) diff --git a/juju/action.py b/juju/action.py new file mode 100644 index 0000000..941fa94 --- /dev/null +++ b/juju/action.py @@ -0,0 +1,5 @@ +from . import model + + +class Action(model.ModelEntity): + pass diff --git a/juju/application.py b/juju/application.py new file mode 100644 index 0000000..a72533d --- /dev/null +++ b/juju/application.py @@ -0,0 +1,181 @@ +from . import model + + +class Application(model.ModelEntity): + def add_relation(self, local_relation, remote_relation): + """Add a relation to another service. + + :param str local_relation: Name of relation on this service + :param str remote_relation: Name of relation on the other service in + the form '[:]' + + """ + pass + + def add_unit(self, count=1, to=None): + """Add one or more units to this service. + + :param int count: Number of units to add + :param str to: Placement directive, e.g.:: + '23' - machine 23 + 'lxc:7' - new lxc container on machine 7 + '24/lxc/3' - lxc container 3 or machine 24 + + If None, a new machine is provisioned. + + """ + pass + add_units = add_unit + + def allocate(self, budget, value): + """Allocate budget to this service. + + :param str budget: Name of budget + :param int value: Budget limit + + """ + pass + + def attach(self, resource_name, file_path): + """Upload a file as a resource for this service. + + :param str resource: Name of the resource + :param str file_path: Path to the file to upload + + """ + pass + + def collect_metrics(self): + """Collect metrics on this service. + + """ + pass + + def destroy_relation(self, local_relation, remote_relation): + """Remove a relation to another service. + + :param str local_relation: Name of relation on this service + :param str remote_relation: Name of relation on the other service in + the form '[:]' + + """ + pass + remove_relation = destroy_relation + + def destroy(self): + """Remove this service from the model. + + """ + pass + remove = destroy + + def expose(self): + """Make this service publicly available over the network. + + """ + pass + + def get_config(self): + """Return the configuration settings for this service. + + """ + pass + + def get_constraints(self): + """Return the machine constraints for this service. + + """ + pass + + def get_actions(self, schema=False): + """Get actions defined for this service. + + :param bool schema: Return the full action schema + + """ + pass + + def get_resources(self, details=False): + """Return resources for this service. + + :param bool details: Include detailed info about resources used by each + unit + + """ + pass + + def run(self, command, timeout=None): + """Run command on all units for this service. + + :param str command: The command to run + :param int timeout: Time to wait before command is considered failed + + """ + pass + + def set_config(self, to_default=False, **config): + """Set configuration options for this service. + + :param bool to_default: Set service options to default values + :param \*\*config: Config key/values + + """ + pass + + def set_constraints(self, constraints): + """Set machine constraints for this service. + + :param :class:`juju.Constraints` constraints: Machine constraints + + """ + pass + + def set_meter_status(self, status, info=None): + """Set the meter status on this status. + + :param str status: Meter status, e.g. 'RED', 'AMBER' + :param str info: Extra info message + + """ + pass + + def set_plan(self, plan_name): + """Set the plan for this service, effective immediately. + + :param str plan_name: Name of plan + + """ + pass + + def unexpose(self): + """Remove public availability over the network for this service. + + """ + pass + + def update_allocation(self, allocation): + """Update existing allocation for this service. + + :param int allocation: The allocation to set + + """ + pass + + def upgrade_charm( + self, channel=None, force_series=False, force_units=False, + path=None, resources=None, revision=-1, switch=None): + """Upgrade the charm for this service. + + :param str channel: Channel to use when getting the charm from the + charm store, e.g. 'development' + :param bool force_series: Upgrade even if series of deployed service + is not supported by the new charm + :param bool force_units: Upgrade all units immediately, even if in + error state + :param str path: Uprade to a charm located at path + :param dict resources: Dictionary of resource name/filepath pairs + :param int revision: Explicit upgrade revision + :param str switch: Crossgrade charm url + + """ + pass diff --git a/juju/client/overrides.py b/juju/client/overrides.py index 1cce4da..6ce2ee1 100644 --- a/juju/client/overrides.py +++ b/juju/client/overrides.py @@ -1,3 +1,5 @@ +from collections import namedtuple + from .facade import Type __all__ = [ @@ -15,6 +17,13 @@ class Delta(Type): ''' self.deltas = deltas + Change = namedtuple('Change', 'entity type data') + change = Change(*self.deltas) + + self.entity = change.entity + self.type = change.type + self.data = change.data + @classmethod def from_json(cls, data): return cls(deltas=data) diff --git a/juju/delta.py b/juju/delta.py new file mode 100644 index 0000000..247c81d --- /dev/null +++ b/juju/delta.py @@ -0,0 +1,50 @@ +from .client import client + + +def get_entity_delta(d): + _delta_types = { + 'application': ApplicationDelta, + 'machine': MachineDelta, + 'unit': UnitDelta, + 'action': ActionDelta, + } + + return _delta_types[d.entity](d.deltas) + + +class EntityDelta(client.Delta): + def get_id(self): + return self.data['Id'] + + def get_entity_class(self): + return None + + +class ApplicationDelta(EntityDelta): + def get_id(self): + return self.data['Name'] + + def get_entity_class(self): + from .application import Application + return Application + + +class MachineDelta(EntityDelta): + def get_entity_class(self): + from .machine import Machine + return Machine + + +class UnitDelta(EntityDelta): + def get_id(self): + return self.data['Name'] + + def get_entity_class(self): + from .unit import Unit + return Unit + + +class ActionDelta(EntityDelta): + def get_entity_class(self): + from .action import Action + return Action diff --git a/juju/machine.py b/juju/machine.py index 4bc8ff6..a7cb6a9 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -1,4 +1,7 @@ -class Machine(object): +from . import model + + +class Machine(model.ModelEntity): def run(self, command, timeout=None): """Run command on this machine. diff --git a/juju/model.py b/juju/model.py index 9d638cb..2ef903b 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,4 +1,122 @@ +from .client import watcher +from .delta import get_entity_delta + + +class ModelEntity(object): + """An object in the Model tree""" + + def __init__(self, data, model): + """Initialize a new entity + + :param data: dict of data from a watcher delta + :param model: The model instance in whose object tree this + entity resides + + """ + self.data = data + self.model = model + + class Model(object): + def __init__(self, connection): + """Instantiate a new connected Model. + + :param connection: `juju.client.connection.Connection` instance + + """ + self.connection = connection + self.observers = set() + self.state = dict() + + def add_observer(self, callable_): + """Register an "on-model-change" callback + + Once a watch is started (Model.watch() is called), ``callable_`` + will be called each time the model changes. callable_ should + accept the following positional arguments: + + delta - An instance of :class:`juju.delta.EntityDelta` + containing the raw delta data recv'd from the Juju + websocket. + + old_obj - If the delta modifies an existing object in the model, + old_obj will be a copy of that object, as it was before the + delta was applied. Will be None if the delta creates a new + entity in the model. + + new_obj - A copy of the new or updated object, after the delta + is applied. Will be None if the delta removes an entity + from the model. + + model - The :class:`Model` itself. + + """ + self.observers.add(callable_) + + async def watch(self): + """Start an asynchronous watch against this model. + + See :meth:`add_observer` to register an onchange callback. + + """ + allwatcher = watcher.AllWatcher() + allwatcher.connect(self.connection) + while True: + results = await allwatcher.Next() + for delta in results.deltas: + delta = get_entity_delta(delta) + old_obj, new_obj = self._apply_delta(delta) + self._notify_observers(delta, old_obj, new_obj) + + def _apply_delta(self, delta): + """Apply delta to our model state and return the a copy of the + affected object as it was before and after the update, e.g.: + + old_obj, new_obj = self._apply_delta(delta) + + old_obj may be None if the delta is for the creation of a new object, + e.g. a new application or unit is deployed. + + new_obj may be if no object was created or updated, or if an object + was deleted as a result of the delta being applied. + + """ + old_obj, new_obj = None, None + + if (delta.entity in self.state and + delta.get_id() in self.state[delta.entity]): + old_obj = self.state[delta.entity][delta.get_id()] + if delta.type == 'remove': + del self.state[delta.entity][delta.get_id()] + return old_obj, new_obj + + new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = ( + self._create_model_entity(delta)) + + return old_obj, new_obj + + def _create_model_entity(self, delta): + """Return an object instance representing the entity created or + updated by ``delta`` + + """ + entity_class = delta.get_entity_class() + return entity_class(delta.data, self) + + def _notify_observers(self, delta, old_obj, new_obj): + """Call observing callbacks, notifying them of a change in model state + + :param delta: The raw change from the watcher + (:class:`juju.client.overrides.Delta`) + :param old_obj: The object in the model that this delta updates. + May be None. + :param new_obj: The object in the model that is created or updated + by applying this delta. + + """ + for o in self.observers: + o(delta, old_obj, new_obj, self) + def add_machine( self, spec=None, constraints=None, disks=None, series=None, count=1): diff --git a/juju/service.py b/juju/service.py deleted file mode 100644 index 39570b8..0000000 --- a/juju/service.py +++ /dev/null @@ -1,178 +0,0 @@ -class Service(object): - def add_relation(self, local_relation, remote_relation): - """Add a relation to another service. - - :param str local_relation: Name of relation on this service - :param str remote_relation: Name of relation on the other service in - the form '[:]' - - """ - pass - - def add_unit(self, count=1, to=None): - """Add one or more units to this service. - - :param int count: Number of units to add - :param str to: Placement directive, e.g.:: - '23' - machine 23 - 'lxc:7' - new lxc container on machine 7 - '24/lxc/3' - lxc container 3 or machine 24 - - If None, a new machine is provisioned. - - """ - pass - add_units = add_unit - - def allocate(self, budget, value): - """Allocate budget to this service. - - :param str budget: Name of budget - :param int value: Budget limit - - """ - pass - - def attach(self, resource_name, file_path): - """Upload a file as a resource for this service. - - :param str resource: Name of the resource - :param str file_path: Path to the file to upload - - """ - pass - - def collect_metrics(self): - """Collect metrics on this service. - - """ - pass - - def destroy_relation(self, local_relation, remote_relation): - """Remove a relation to another service. - - :param str local_relation: Name of relation on this service - :param str remote_relation: Name of relation on the other service in - the form '[:]' - - """ - pass - remove_relation = destroy_relation - - def destroy(self): - """Remove this service from the model. - - """ - pass - remove = destroy - - def expose(self): - """Make this service publicly available over the network. - - """ - pass - - def get_config(self): - """Return the configuration settings for this service. - - """ - pass - - def get_constraints(self): - """Return the machine constraints for this service. - - """ - pass - - def get_actions(self, schema=False): - """Get actions defined for this service. - - :param bool schema: Return the full action schema - - """ - pass - - def get_resources(self, details=False): - """Return resources for this service. - - :param bool details: Include detailed info about resources used by each - unit - - """ - pass - - def run(self, command, timeout=None): - """Run command on all units for this service. - - :param str command: The command to run - :param int timeout: Time to wait before command is considered failed - - """ - pass - - def set_config(self, to_default=False, **config): - """Set configuration options for this service. - - :param bool to_default: Set service options to default values - :param \*\*config: Config key/values - - """ - pass - - def set_constraints(self, constraints): - """Set machine constraints for this service. - - :param :class:`juju.Constraints` constraints: Machine constraints - - """ - pass - - def set_meter_status(self, status, info=None): - """Set the meter status on this status. - - :param str status: Meter status, e.g. 'RED', 'AMBER' - :param str info: Extra info message - - """ - pass - - def set_plan(self, plan_name): - """Set the plan for this service, effective immediately. - - :param str plan_name: Name of plan - - """ - pass - - def unexpose(self): - """Remove public availability over the network for this service. - - """ - pass - - def update_allocation(self, allocation): - """Update existing allocation for this service. - - :param int allocation: The allocation to set - - """ - pass - - def upgrade_charm( - self, channel=None, force_series=False, force_units=False, - path=None, resources=None, revision=-1, switch=None): - """Upgrade the charm for this service. - - :param str channel: Channel to use when getting the charm from the - charm store, e.g. 'development' - :param bool force_series: Upgrade even if series of deployed service - is not supported by the new charm - :param bool force_units: Upgrade all units immediately, even if in - error state - :param str path: Uprade to a charm located at path - :param dict resources: Dictionary of resource name/filepath pairs - :param int revision: Explicit upgrade revision - :param str switch: Crossgrade charm url - - """ - pass diff --git a/juju/unit.py b/juju/unit.py index 900ade9..4d75484 100644 --- a/juju/unit.py +++ b/juju/unit.py @@ -1,4 +1,7 @@ -class Unit(object): +from . import model + + +class Unit(model.ModelEntity): def add_storage(self, name, constraints=None): """Add unit storage dynamically.