import theblues.charmstore
import theblues.errors
-from . import tag
+from . import tag, utils
from .client import client
from .client import connection
from .constraints import parse as parse_constraints, normalize_key
self.observers = weakref.WeakValueDictionary()
self.state = ModelState(self)
self.info = None
- self._watcher_task = None
- self._watch_shutdown = asyncio.Event(loop=self.loop)
+ self._watch_stopping = asyncio.Event(loop=self.loop)
+ self._watch_stopped = asyncio.Event(loop=self.loop)
self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
"""Shut down the watcher task and close websockets.
"""
- self._stop_watching()
if self.connection and self.connection.is_open:
- await self._watch_shutdown.wait()
+ log.debug('Stopping watcher task')
+ self._watch_stopping.set()
+ await self._watch_stopped.wait()
log.debug('Closing model connection')
await self.connection.close()
self.connection = None
"""
async def _start_watch():
- self._watch_shutdown.clear()
try:
allwatcher = client.AllWatcherFacade.from_connection(
self.connection)
- while True:
- results = await allwatcher.Next()
+ while not self._watch_stopping.is_set():
+ results = await utils.run_with_interrupt(
+ allwatcher.Next(),
+ self._watch_stopping,
+ self.loop)
+ if self._watch_stopping.is_set():
+ break
for delta in results.deltas:
delta = get_entity_delta(delta)
old_obj, new_obj = self.state.apply_delta(delta)
- # XXX: Might not want to shield at this level
- # We are shielding because when the watcher is
- # canceled (on disconnect()), we don't want all of
- # its children (every observer callback) to be
- # canceled with it. So we shield them. But this means
- # they can *never* be canceled.
- await asyncio.shield(
- self._notify_observers(delta, old_obj, new_obj),
- loop=self.loop)
+ await self._notify_observers(delta, old_obj, new_obj)
self._watch_received.set()
except CancelledError:
- self._watch_shutdown.set()
+ pass
except Exception:
log.exception('Error in watcher')
raise
+ finally:
+ self._watch_stopped.set()
log.debug('Starting watcher task')
- self._watcher_task = self.loop.create_task(_start_watch())
-
- def _stop_watching(self):
- """Stop the asynchronous watch against this model.
-
- """
- log.debug('Stopping watcher task')
- if self._watcher_task:
- self._watcher_task.cancel()
+ self._watch_received.clear()
+ self._watch_stopping.clear()
+ self._watch_stopped.clear()
+ self.loop.create_task(_start_watch())
async def _notify_observers(self, delta, old_obj, new_obj):
"""Call observing callbacks, notifying them of a change in model state