- allwatcher = watcher.AllWatcher()
- self._watch_conn = await self.connection.clone()
- allwatcher.connect(self._watch_conn)
- while True:
- results = await allwatcher.Next()
+ allwatcher = client.AllWatcherFacade.from_connection(
+ self.connection)
+ while not self._watch_stopping.is_set():
+ try:
+ results = await utils.run_with_interrupt(
+ allwatcher.Next(),
+ self._watch_stopping,
+ self.loop)
+ except JujuAPIError as e:
+ if 'watcher was stopped' not in str(e):
+ raise
+ if self._watch_stopping.is_set():
+ # this shouldn't ever actually happen, because
+ # the event should trigger before the controller
+ # has a chance to tell us the watcher is stopped
+ # but handle it gracefully, just in case
+ break
+ # controller stopped our watcher for some reason
+ # but we're not actually stopping, so just restart it
+ log.warning(
+ 'Watcher: watcher stopped, restarting')
+ del allwatcher.Id
+ continue
+ except websockets.ConnectionClosed:
+ monitor = self.connection.monitor
+ if monitor.status == monitor.ERROR:
+ # closed unexpectedly, try to reopen
+ log.warning(
+ 'Watcher: connection closed, reopening')
+ await self.connection.reconnect()
+ del allwatcher.Id
+ continue
+ else:
+ # closed on request, go ahead and shutdown
+ break
+ if self._watch_stopping.is_set():
+ await allwatcher.Stop()
+ break