X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=juju%2Fmodel.py;h=e024c65eea45aa258b5e90aff2dfa3f13b8ca9d3;hb=6a363710a84d62a6ad1268045f319611f356f453;hp=3ed8fa798c3c84fe7dedd53bae561991791d6aee;hpb=7c2a530853c95b8a3518f6db0870f94858f87c27;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index 3ed8fa7..e024c65 100644 --- a/juju/model.py +++ b/juju/model.py @@ -18,7 +18,7 @@ import yaml import theblues.charmstore import theblues.errors -from . import tag +from . import tag, utils from .client import client from .client import connection from .constraints import parse as parse_constraints, normalize_key @@ -385,8 +385,8 @@ class Model(object): self.observers = weakref.WeakValueDictionary() self.state = ModelState(self) self.info = None - self._watcher_task = None - self._watch_shutdown = asyncio.Event(loop=self.loop) + self._watch_stopping = asyncio.Event(loop=self.loop) + self._watch_stopped = asyncio.Event(loop=self.loop) self._watch_received = asyncio.Event(loop=self.loop) self._charmstore = CharmStore(self.loop) @@ -431,9 +431,10 @@ class Model(object): """Shut down the watcher task and close websockets. """ - self._stop_watching() if self.connection and self.connection.is_open: - await self._watch_shutdown.wait() + log.debug('Stopping watcher task') + self._watch_stopping.set() + await self._watch_stopped.wait() log.debug('Closing model connection') await self.connection.close() self.connection = None @@ -619,41 +620,34 @@ class Model(object): """ async def _start_watch(): - self._watch_shutdown.clear() try: allwatcher = client.AllWatcherFacade.from_connection( self.connection) - while True: - results = await allwatcher.Next() + while not self._watch_stopping.is_set(): + results = await utils.run_with_interrupt( + allwatcher.Next(), + self._watch_stopping, + self.loop) + if self._watch_stopping.is_set(): + break for delta in results.deltas: delta = get_entity_delta(delta) old_obj, new_obj = self.state.apply_delta(delta) - # XXX: Might not want to shield at this level - # We are shielding because when the watcher is - # canceled (on disconnect()), we don't want all of - # its children (every observer callback) to be - # canceled with it. So we shield them. But this means - # they can *never* be canceled. - await asyncio.shield( - self._notify_observers(delta, old_obj, new_obj), - loop=self.loop) + await self._notify_observers(delta, old_obj, new_obj) self._watch_received.set() except CancelledError: - self._watch_shutdown.set() + pass except Exception: log.exception('Error in watcher') raise + finally: + self._watch_stopped.set() log.debug('Starting watcher task') - self._watcher_task = self.loop.create_task(_start_watch()) - - def _stop_watching(self): - """Stop the asynchronous watch against this model. - - """ - log.debug('Stopping watcher task') - if self._watcher_task: - self._watcher_task.cancel() + self._watch_received.clear() + self._watch_stopping.clear() + self._watch_stopped.clear() + self.loop.create_task(_start_watch()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state