X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=e024c65eea45aa258b5e90aff2dfa3f13b8ca9d3;hb=6a363710a84d62a6ad1268045f319611f356f453;hp=c15d07cda92ac042e7f7ee22eb0da3ac16e38d0e;hpb=ed0017cc2049050002f23d1b9e1ef95ff8448e5c;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index c15d07c..e024c65 100644 --- a/juju/model.py +++ b/juju/model.py @@ -18,9 +18,8 @@ import yaml import theblues.charmstore import theblues.errors -from . import tag +from . import tag, utils from .client import client -from .client import watcher from .client import connection from .constraints import parse as parse_constraints, normalize_key from .delta import get_entity_delta @@ -386,8 +385,8 @@ class Model(object): self.observers = weakref.WeakValueDictionary() self.state = ModelState(self) self.info = None - self._watcher_task = None - self._watch_shutdown = asyncio.Event(loop=self.loop) + self._watch_stopping = asyncio.Event(loop=self.loop) + self._watch_stopped = asyncio.Event(loop=self.loop) self._watch_received = asyncio.Event(loop=self.loop) self._charmstore = CharmStore(self.loop) @@ -432,9 +431,10 @@ class Model(object): """Shut down the watcher task and close websockets. """ - self._stop_watching() if self.connection and self.connection.is_open: - await self._watch_shutdown.wait() + log.debug('Stopping watcher task') + self._watch_stopping.set() + await self._watch_stopped.wait() log.debug('Closing model connection') await self.connection.close() self.connection = None @@ -566,8 +566,7 @@ class Model(object): explicit call to this method. """ - facade = client.ClientFacade() - facade.connect(self.connection) + facade = client.ClientFacade.from_connection(self.connection) self.info = await facade.ModelInfo() log.debug('Got ModelInfo: %s', vars(self.info)) @@ -621,42 +620,34 @@ class Model(object): """ async def _start_watch(): - self._watch_shutdown.clear() try: - allwatcher = watcher.AllWatcher() - self._watch_conn = await self.connection.clone() - allwatcher.connect(self._watch_conn) - while True: - results = await allwatcher.Next() + allwatcher = client.AllWatcherFacade.from_connection( + self.connection) + while not self._watch_stopping.is_set(): + results = await utils.run_with_interrupt( + allwatcher.Next(), + self._watch_stopping, + self.loop) + if self._watch_stopping.is_set(): + break for delta in results.deltas: delta = get_entity_delta(delta) old_obj, new_obj = self.state.apply_delta(delta) - # XXX: Might not want to shield at this level - # We are shielding because when the watcher is - # canceled (on disconnect()), we don't want all of - # its children (every observer callback) to be - # canceled with it. So we shield them. But this means - # they can *never* be canceled. - await asyncio.shield( - self._notify_observers(delta, old_obj, new_obj), - loop=self.loop) + await self._notify_observers(delta, old_obj, new_obj) self._watch_received.set() except CancelledError: - log.debug('Closing watcher connection') - await self._watch_conn.close() - self._watch_shutdown.set() - self._watch_conn = None + pass + except Exception: + log.exception('Error in watcher') + raise + finally: + self._watch_stopped.set() log.debug('Starting watcher task') - self._watcher_task = self.loop.create_task(_start_watch()) - - def _stop_watching(self): - """Stop the asynchronous watch against this model. - - """ - log.debug('Stopping watcher task') - if self._watcher_task: - self._watcher_task.cancel() + self._watch_received.clear() + self._watch_stopping.clear() + self._watch_stopped.clear() + self.loop.create_task(_start_watch()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state @@ -797,8 +788,7 @@ class Model(object): params.series = series # Submit the request. - client_facade = client.ClientFacade() - client_facade.connect(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection) results = await client_facade.AddMachines([params]) error = results.machines[0].error if error: @@ -814,8 +804,7 @@ class Model(object): :param str relation2: '[:]' """ - app_facade = client.ApplicationFacade() - app_facade.connect(self.connection) + app_facade = client.ApplicationFacade.from_connection(self.connection) log.debug( 'Adding relation %s <-> %s', relation1, relation2) @@ -858,8 +847,7 @@ class Model(object): :param str key: The public ssh key """ - key_facade = client.KeyManagerFacade() - key_facade.connect(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection) return await key_facade.AddKeys([key], user) add_ssh_keys = add_ssh_key @@ -1017,8 +1005,7 @@ class Model(object): entity = await self.charmstore.entity(entity_url) entity_id = entity['Id'] - client_facade = client.ClientFacade() - client_facade.connect(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection) is_bundle = ((is_local and (Path(entity_id) / 'bundle.yaml').exists()) or @@ -1100,8 +1087,8 @@ class Model(object): if not resources: return None - resources_facade = client.ResourcesFacade() - resources_facade.connect(self.connection) + resources_facade = client.ResourcesFacade.from_connection( + self.connection) response = await resources_facade.AddPendingResources( tag.application(application), entity_url, @@ -1123,8 +1110,8 @@ class Model(object): config = yaml.dump({application: config}, default_flow_style=False) - app_facade = client.ApplicationFacade() - app_facade.connect(self.connection) + app_facade = client.ApplicationFacade.from_connection( + self.connection) app = client.ApplicationDeploy( charm_url=charm_url, @@ -1156,8 +1143,7 @@ class Model(object): """Destroy units by name. """ - app_facade = client.ApplicationFacade() - app_facade.connect(self.connection) + app_facade = client.ApplicationFacade.from_connection(self.connection) log.debug( 'Destroying unit%s %s', @@ -1214,9 +1200,9 @@ class Model(object): :param str acl: Access control ('read' or 'write') """ - model_facade = client.ModelManagerFacade() controller_conn = await self.connection.controller() - model_facade.connect(controller_conn) + model_facade = client.ModelManagerFacade.from_connection( + controller_conn) user = tag.user(username) model = tag.model(self.info.uuid) changes = client.ModifyModelAccess(acl, 'grant', model, user) @@ -1256,8 +1242,7 @@ class Model(object): else it's fingerprint """ - key_facade = client.KeyManagerFacade() - key_facade.connect(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection) entity = {'tag': tag.model(self.info.uuid)} entities = client.Entities([entity]) return await key_facade.ListKeys(entities, raw_ssh) @@ -1330,8 +1315,7 @@ class Model(object): :param str user: Juju user to which the key is registered """ - key_facade = client.KeyManagerFacade() - key_facade.connect(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection) key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii'))) key = hashlib.md5(key).hexdigest() key = ':'.join(a+b for a, b in zip(key[::2], key[1::2])) @@ -1365,9 +1349,9 @@ class Model(object): :param str username: Username to revoke """ - model_facade = client.ModelManagerFacade() controller_conn = await self.connection.controller() - model_facade.connect(controller_conn) + model_facade = client.ModelManagerFacade.from_connection( + controller_conn) user = tag.user(username) model = tag.model(self.info.uuid) changes = client.ModifyModelAccess('read', 'revoke', model, user) @@ -1432,8 +1416,7 @@ class Model(object): :param bool utc: Display time as UTC in RFC3339 format """ - client_facade = client.ClientFacade() - client_facade.connect(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection) return await client_facade.FullStatus(filters) def sync_tools( @@ -1513,8 +1496,8 @@ class Model(object): log.debug("Retrieving metrics for %s", ', '.join(tags) if tags else "all units") - metrics_facade = client.MetricsDebugFacade() - metrics_facade.connect(self.connection) + metrics_facade = client.MetricsDebugFacade.from_connection( + self.connection) entities = [client.Entity(tag) for tag in tags] metrics_result = await metrics_facade.GetMetrics(entities) @@ -1564,12 +1547,12 @@ class BundleHandler(object): for unit_name, unit in model.units.items(): app_units = self._units_by_app.setdefault(unit.application, []) app_units.append(unit_name) - self.client_facade = client.ClientFacade() - self.client_facade.connect(model.connection) - self.app_facade = client.ApplicationFacade() - self.app_facade.connect(model.connection) - self.ann_facade = client.AnnotationsFacade() - self.ann_facade.connect(model.connection) + self.client_facade = client.ClientFacade.from_connection( + model.connection) + self.app_facade = client.ApplicationFacade.from_connection( + model.connection) + self.ann_facade = client.AnnotationsFacade.from_connection( + model.connection) async def _handle_local_charms(self, bundle): """Search for references to local charms (i.e. filesystem paths)