Refactor connection task management to avoid cancels (#117)
[osm/N2VC.git] / juju / model.py
index 3ed8fa7..e024c65 100644 (file)
@@ -18,7 +18,7 @@ import yaml
 import theblues.charmstore
 import theblues.errors
 
-from . import tag
+from . import tag, utils
 from .client import client
 from .client import connection
 from .constraints import parse as parse_constraints, normalize_key
@@ -385,8 +385,8 @@ class Model(object):
         self.observers = weakref.WeakValueDictionary()
         self.state = ModelState(self)
         self.info = None
-        self._watcher_task = None
-        self._watch_shutdown = asyncio.Event(loop=self.loop)
+        self._watch_stopping = asyncio.Event(loop=self.loop)
+        self._watch_stopped = asyncio.Event(loop=self.loop)
         self._watch_received = asyncio.Event(loop=self.loop)
         self._charmstore = CharmStore(self.loop)
 
@@ -431,9 +431,10 @@ class Model(object):
         """Shut down the watcher task and close websockets.
 
         """
-        self._stop_watching()
         if self.connection and self.connection.is_open:
-            await self._watch_shutdown.wait()
+            log.debug('Stopping watcher task')
+            self._watch_stopping.set()
+            await self._watch_stopped.wait()
             log.debug('Closing model connection')
             await self.connection.close()
             self.connection = None
@@ -619,41 +620,34 @@ class Model(object):
 
         """
         async def _start_watch():
-            self._watch_shutdown.clear()
             try:
                 allwatcher = client.AllWatcherFacade.from_connection(
                     self.connection)
-                while True:
-                    results = await allwatcher.Next()
+                while not self._watch_stopping.is_set():
+                    results = await utils.run_with_interrupt(
+                        allwatcher.Next(),
+                        self._watch_stopping,
+                        self.loop)
+                    if self._watch_stopping.is_set():
+                        break
                     for delta in results.deltas:
                         delta = get_entity_delta(delta)
                         old_obj, new_obj = self.state.apply_delta(delta)
-                        # XXX: Might not want to shield at this level
-                        # We are shielding because when the watcher is
-                        # canceled (on disconnect()), we don't want all of
-                        # its children (every observer callback) to be
-                        # canceled with it. So we shield them. But this means
-                        # they can *never* be canceled.
-                        await asyncio.shield(
-                            self._notify_observers(delta, old_obj, new_obj),
-                            loop=self.loop)
+                        await self._notify_observers(delta, old_obj, new_obj)
                     self._watch_received.set()
             except CancelledError:
-                self._watch_shutdown.set()
+                pass
             except Exception:
                 log.exception('Error in watcher')
                 raise
+            finally:
+                self._watch_stopped.set()
 
         log.debug('Starting watcher task')
-        self._watcher_task = self.loop.create_task(_start_watch())
-
-    def _stop_watching(self):
-        """Stop the asynchronous watch against this model.
-
-        """
-        log.debug('Stopping watcher task')
-        if self._watcher_task:
-            self._watcher_task.cancel()
+        self._watch_received.clear()
+        self._watch_stopping.clear()
+        self._watch_stopped.clear()
+        self.loop.create_task(_start_watch())
 
     async def _notify_observers(self, delta, old_obj, new_obj):
         """Call observing callbacks, notifying them of a change in model state