Add relate example
authorTim Van Steenburgh <tvansteenburgh@gmail.com>
Sat, 10 Sep 2016 03:43:53 +0000 (23:43 -0400)
committerTim Van Steenburgh <tvansteenburgh@gmail.com>
Sat, 10 Sep 2016 03:43:53 +0000 (23:43 -0400)
examples/relate.py [new file with mode: 0644]
juju/client/connection.py
juju/delta.py
juju/machine.py
juju/model.py
juju/relation.py [new file with mode: 0644]

diff --git a/examples/relate.py b/examples/relate.py
new file mode 100644 (file)
index 0000000..967a785
--- /dev/null
@@ -0,0 +1,53 @@
+"""
+Deploy two charms and relate them.
+
+"""
+import asyncio
+import logging
+
+from juju.model import Model, ModelObserver
+
+
+class MyModelObserver(ModelObserver):
+    def on_change(self, delta, old, new, model):
+        if model.all_units_idle():
+            logging.debug('All units idle, disconnecting')
+            task = model.loop.create_task(model.disconnect())
+            task.add_done_callback(lambda fut: model.loop.stop())
+
+
+async def run():
+    model = Model()
+    await model.connect_current()
+
+    await model.reset(force=True)
+    await model.block_until(
+        lambda: len(model.machines) == 0
+    )
+    model.add_observer(MyModelObserver())
+
+    await model.deploy(
+        'ubuntu-0',
+        service_name='ubuntu',
+        series='trusty',
+        channel='stable',
+    )
+    await model.deploy(
+        'nrpe-11',
+        service_name='nrpe',
+        series='trusty',
+        channel='stable',
+        num_units=0,
+    )
+    await model.add_relation(
+        'ubuntu',
+        'nrpe',
+    )
+
+
+logging.basicConfig(level=logging.DEBUG)
+ws_logger = logging.getLogger('websockets.protocol')
+ws_logger.setLevel(logging.INFO)
+loop = asyncio.get_event_loop()
+loop.create_task(run())
+loop.run_forever()
index 69ac425..3b5bfc4 100644 (file)
@@ -1,4 +1,3 @@
-import asyncio
 import io
 import json
 import logging
@@ -42,6 +41,12 @@ class Connection:
         self.ws = None
         self.facades = {}
 
+    @property
+    def is_open(self):
+        if self.ws:
+            return self.ws.open
+        return False
+
     def _get_ssl(self, cert):
         return ssl.create_default_context(
             purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
index ea15f24..0b142dd 100644 (file)
@@ -3,11 +3,12 @@ from .client import client
 
 def get_entity_delta(d):
     _delta_types = {
+        'action': ActionDelta,
         'application': ApplicationDelta,
         'annotation': AnnotationDelta,
         'machine': MachineDelta,
         'unit': UnitDelta,
-        'action': ActionDelta,
+        'relation': RelationDelta,
     }
 
     return _delta_types[d.entity](d.deltas)
@@ -21,6 +22,12 @@ class EntityDelta(client.Delta):
         return None
 
 
+class ActionDelta(EntityDelta):
+    def get_entity_class(self):
+        from .action import Action
+        return Action
+
+
 class ApplicationDelta(EntityDelta):
     def get_id(self):
         return self.data['name']
@@ -54,7 +61,7 @@ class UnitDelta(EntityDelta):
         return Unit
 
 
-class ActionDelta(EntityDelta):
+class RelationDelta(EntityDelta):
     def get_entity_class(self):
-        from .action import Action
-        return Action
+        from .relation import Relation
+        return Relation
index a7cb6a9..181c60c 100644 (file)
@@ -1,7 +1,25 @@
+import logging
+
 from . import model
+from .client import client
+
+log = logging.getLogger(__name__)
 
 
 class Machine(model.ModelEntity):
+    async def destroy(self, force=False):
+        """Remove this machine from the model.
+
+        """
+        facade = client.ClientFacade()
+        facade.connect(self.connection)
+
+        log.debug(
+            'Destroying machine %s', self.id)
+
+        return await facade.DestroyMachines(force, [self.id])
+    remove = destroy
+
     def run(self, command, timeout=None):
         """Run command on this machine.
 
index e56bfb4..04f3437 100644 (file)
@@ -1,7 +1,10 @@
+import asyncio
 import logging
+from concurrent.futures import CancelledError
 
 from .client import client
 from .client import watcher
+from .client import connection
 from .delta import get_entity_delta
 
 log = logging.getLogger(__name__)
@@ -44,20 +47,63 @@ class ModelEntity(object):
 
 
 class Model(object):
-    def __init__(self, connection):
+    def __init__(self, loop=None):
         """Instantiate a new connected Model.
 
-        :param connection: `juju.client.connection.Connection` instance
+        :param loop: an asyncio event loop
 
         """
-        self.connection = connection
+        self.loop = loop or asyncio.get_event_loop()
+        self.connection = None
         self.observers = set()
         self.state = dict()
+        self._watcher_task = None
+        self._watch_shutdown = asyncio.Event(loop=loop)
+        self._watch_received = asyncio.Event(loop=loop)
+
+    async def connect_current(self):
+        self.connection = await connection.Connection.connect_current()
+        self._watch()
+        await self._watch_received.wait()
+
+    async def disconnect(self):
+        self._stop_watching()
+        if self.connection and self.connection.is_open:
+            await self._watch_shutdown.wait()
+            log.debug('Closing model connection')
+            await asyncio.wait_for(self.connection.close(), None)
+            self.connection = None
+
+    def all_units_idle(self):
+        """Return True if all units are idle.
+
+        """
+        for unit in self.units.values():
+            unit_status = unit.data['agent-status']['current']
+            if unit_status != 'idle':
+                return False
+        return True
+
+    async def reset(self, force=False):
+        for app in self.applications.values():
+            await app.destroy()
+        for machine in self.machines.values():
+            await machine.destroy(force=force)
+
+    async def block_until(self, func):
+        async def _block():
+            while not func():
+                await asyncio.sleep(.1)
+        await asyncio.wait_for(_block(), None)
 
     @property
     def applications(self):
         return self.state.get('application', {})
 
+    @property
+    def machines(self):
+        return self.state.get('machine', {})
+
     @property
     def units(self):
         return self.state.get('unit', {})
@@ -87,21 +133,41 @@ class Model(object):
         """
         self.observers.add(callable_)
 
-    async def watch(self):
+    def _watch(self):
         """Start an asynchronous watch against this model.
 
         See :meth:`add_observer` to register an onchange callback.
 
         """
-        self._watching = True
-        allwatcher = watcher.AllWatcher()
-        allwatcher.connect(await self.connection.clone())
-        while True:
-            results = await allwatcher.Next()
-            for delta in results.deltas:
-                delta = get_entity_delta(delta)
-                old_obj, new_obj = self._apply_delta(delta)
-                self._notify_observers(delta, old_obj, new_obj)
+        async def _start_watch():
+            self._watch_shutdown.clear()
+            try:
+                allwatcher = watcher.AllWatcher()
+                self._watch_conn = await self.connection.clone()
+                allwatcher.connect(self._watch_conn)
+                while True:
+                    results = await allwatcher.Next()
+                    for delta in results.deltas:
+                        delta = get_entity_delta(delta)
+                        old_obj, new_obj = self._apply_delta(delta)
+                        self._notify_observers(delta, old_obj, new_obj)
+                    self._watch_received.set()
+            except CancelledError:
+                log.debug('Closing watcher connection')
+                await asyncio.wait_for(self._watch_conn.close(), None)
+                self._watch_shutdown.set()
+                self._watch_conn = None
+
+        log.debug('Starting watcher task')
+        self._watcher_task = self.loop.create_task(_start_watch())
+
+    def _stop_watching(self):
+        """Stop the asynchronous watch against this model.
+
+        """
+        log.debug('Stopping watcher task')
+        if self._watcher_task:
+            self._watcher_task.cancel()
 
     def _apply_delta(self, delta):
         """Apply delta to our model state and return the a copy of the
@@ -182,14 +248,20 @@ class Model(object):
         pass
     add_machines = add_machine
 
-    def add_relation(self, relation1, relation2):
-        """Add a relation between two services.
+    async def add_relation(self, relation1, relation2):
+        """Add a relation between two applications.
 
-        :param str relation1: '<service>[:<relation_name>]'
-        :param str relation2: '<service>[:<relation_name>]'
+        :param str relation1: '<application>[:<relation_name>]'
+        :param str relation2: '<application>[:<relation_name>]'
 
         """
-        pass
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        log.debug(
+            'Adding relation %s <-> %s', relation1, relation2)
+
+        return await app_facade.AddRelation([relation1, relation2])
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
diff --git a/juju/relation.py b/juju/relation.py
new file mode 100644 (file)
index 0000000..571cd01
--- /dev/null
@@ -0,0 +1,9 @@
+import logging
+
+from . import model
+
+log = logging.getLogger(__name__)
+
+
+class Relation(model.ModelEntity):
+    pass