From: Tim Van Steenburgh Date: Sat, 10 Sep 2016 03:43:53 +0000 (-0400) Subject: Add relate example X-Git-Tag: 0.1.0~84 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=a56869a418a68b795817d4f14d4065d11784f09f;p=osm%2FN2VC.git Add relate example --- diff --git a/examples/relate.py b/examples/relate.py new file mode 100644 index 0000000..967a785 --- /dev/null +++ b/examples/relate.py @@ -0,0 +1,53 @@ +""" +Deploy two charms and relate them. + +""" +import asyncio +import logging + +from juju.model import Model, ModelObserver + + +class MyModelObserver(ModelObserver): + def on_change(self, delta, old, new, model): + if model.all_units_idle(): + logging.debug('All units idle, disconnecting') + task = model.loop.create_task(model.disconnect()) + task.add_done_callback(lambda fut: model.loop.stop()) + + +async def run(): + model = Model() + await model.connect_current() + + await model.reset(force=True) + await model.block_until( + lambda: len(model.machines) == 0 + ) + model.add_observer(MyModelObserver()) + + await model.deploy( + 'ubuntu-0', + service_name='ubuntu', + series='trusty', + channel='stable', + ) + await model.deploy( + 'nrpe-11', + service_name='nrpe', + series='trusty', + channel='stable', + num_units=0, + ) + await model.add_relation( + 'ubuntu', + 'nrpe', + ) + + +logging.basicConfig(level=logging.DEBUG) +ws_logger = logging.getLogger('websockets.protocol') +ws_logger.setLevel(logging.INFO) +loop = asyncio.get_event_loop() +loop.create_task(run()) +loop.run_forever() diff --git a/juju/client/connection.py b/juju/client/connection.py index 69ac425..3b5bfc4 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -1,4 +1,3 @@ -import asyncio import io import json import logging @@ -42,6 +41,12 @@ class Connection: self.ws = None self.facades = {} + @property + def is_open(self): + if self.ws: + return self.ws.open + return False + def _get_ssl(self, cert): return ssl.create_default_context( purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert) diff --git a/juju/delta.py b/juju/delta.py index ea15f24..0b142dd 100644 --- a/juju/delta.py +++ b/juju/delta.py @@ -3,11 +3,12 @@ from .client import client def get_entity_delta(d): _delta_types = { + 'action': ActionDelta, 'application': ApplicationDelta, 'annotation': AnnotationDelta, 'machine': MachineDelta, 'unit': UnitDelta, - 'action': ActionDelta, + 'relation': RelationDelta, } return _delta_types[d.entity](d.deltas) @@ -21,6 +22,12 @@ class EntityDelta(client.Delta): return None +class ActionDelta(EntityDelta): + def get_entity_class(self): + from .action import Action + return Action + + class ApplicationDelta(EntityDelta): def get_id(self): return self.data['name'] @@ -54,7 +61,7 @@ class UnitDelta(EntityDelta): return Unit -class ActionDelta(EntityDelta): +class RelationDelta(EntityDelta): def get_entity_class(self): - from .action import Action - return Action + from .relation import Relation + return Relation diff --git a/juju/machine.py b/juju/machine.py index a7cb6a9..181c60c 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -1,7 +1,25 @@ +import logging + from . import model +from .client import client + +log = logging.getLogger(__name__) class Machine(model.ModelEntity): + async def destroy(self, force=False): + """Remove this machine from the model. + + """ + facade = client.ClientFacade() + facade.connect(self.connection) + + log.debug( + 'Destroying machine %s', self.id) + + return await facade.DestroyMachines(force, [self.id]) + remove = destroy + def run(self, command, timeout=None): """Run command on this machine. diff --git a/juju/model.py b/juju/model.py index e56bfb4..04f3437 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,7 +1,10 @@ +import asyncio import logging +from concurrent.futures import CancelledError from .client import client from .client import watcher +from .client import connection from .delta import get_entity_delta log = logging.getLogger(__name__) @@ -44,20 +47,63 @@ class ModelEntity(object): class Model(object): - def __init__(self, connection): + def __init__(self, loop=None): """Instantiate a new connected Model. - :param connection: `juju.client.connection.Connection` instance + :param loop: an asyncio event loop """ - self.connection = connection + self.loop = loop or asyncio.get_event_loop() + self.connection = None self.observers = set() self.state = dict() + self._watcher_task = None + self._watch_shutdown = asyncio.Event(loop=loop) + self._watch_received = asyncio.Event(loop=loop) + + async def connect_current(self): + self.connection = await connection.Connection.connect_current() + self._watch() + await self._watch_received.wait() + + async def disconnect(self): + self._stop_watching() + if self.connection and self.connection.is_open: + await self._watch_shutdown.wait() + log.debug('Closing model connection') + await asyncio.wait_for(self.connection.close(), None) + self.connection = None + + def all_units_idle(self): + """Return True if all units are idle. + + """ + for unit in self.units.values(): + unit_status = unit.data['agent-status']['current'] + if unit_status != 'idle': + return False + return True + + async def reset(self, force=False): + for app in self.applications.values(): + await app.destroy() + for machine in self.machines.values(): + await machine.destroy(force=force) + + async def block_until(self, func): + async def _block(): + while not func(): + await asyncio.sleep(.1) + await asyncio.wait_for(_block(), None) @property def applications(self): return self.state.get('application', {}) + @property + def machines(self): + return self.state.get('machine', {}) + @property def units(self): return self.state.get('unit', {}) @@ -87,21 +133,41 @@ class Model(object): """ self.observers.add(callable_) - async def watch(self): + def _watch(self): """Start an asynchronous watch against this model. See :meth:`add_observer` to register an onchange callback. """ - self._watching = True - allwatcher = watcher.AllWatcher() - allwatcher.connect(await self.connection.clone()) - while True: - results = await allwatcher.Next() - for delta in results.deltas: - delta = get_entity_delta(delta) - old_obj, new_obj = self._apply_delta(delta) - self._notify_observers(delta, old_obj, new_obj) + async def _start_watch(): + self._watch_shutdown.clear() + try: + allwatcher = watcher.AllWatcher() + self._watch_conn = await self.connection.clone() + allwatcher.connect(self._watch_conn) + while True: + results = await allwatcher.Next() + for delta in results.deltas: + delta = get_entity_delta(delta) + old_obj, new_obj = self._apply_delta(delta) + self._notify_observers(delta, old_obj, new_obj) + self._watch_received.set() + except CancelledError: + log.debug('Closing watcher connection') + await asyncio.wait_for(self._watch_conn.close(), None) + self._watch_shutdown.set() + self._watch_conn = None + + log.debug('Starting watcher task') + self._watcher_task = self.loop.create_task(_start_watch()) + + def _stop_watching(self): + """Stop the asynchronous watch against this model. + + """ + log.debug('Stopping watcher task') + if self._watcher_task: + self._watcher_task.cancel() def _apply_delta(self, delta): """Apply delta to our model state and return the a copy of the @@ -182,14 +248,20 @@ class Model(object): pass add_machines = add_machine - def add_relation(self, relation1, relation2): - """Add a relation between two services. + async def add_relation(self, relation1, relation2): + """Add a relation between two applications. - :param str relation1: '[:]' - :param str relation2: '[:]' + :param str relation1: '[:]' + :param str relation2: '[:]' """ - pass + app_facade = client.ApplicationFacade() + app_facade.connect(self.connection) + + log.debug( + 'Adding relation %s <-> %s', relation1, relation2) + + return await app_facade.AddRelation([relation1, relation2]) def add_space(self, name, *cidrs): """Add a new network space. diff --git a/juju/relation.py b/juju/relation.py new file mode 100644 index 0000000..571cd01 --- /dev/null +++ b/juju/relation.py @@ -0,0 +1,9 @@ +import logging + +from . import model + +log = logging.getLogger(__name__) + + +class Relation(model.ModelEntity): + pass