X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=56924cffb9bd0257422dde5d7cba2a92496b40cb;hb=0b9e0ad499afba7c8a3b86a82e3cc5d42da4887d;hp=7fe0514549b91d1f3b0fd6d65e82547066bdf9e6;hpb=5acba37aaefa55fab65eb3ca8b5a5fcf24606a2a;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index 7fe0514..56924cf 100644 --- a/juju/model.py +++ b/juju/model.py @@ -159,14 +159,12 @@ class ModelState(object): def get_entity( self, entity_type, entity_id, history_index=-1, connected=True): - """Return an object instance representing the entity created or - updated by ``delta`` + """Return an object instance for the given entity_type and id. + + By default the object state matches the most recent state from + Juju. To get an instance of the object in an older state, pass + history_index, an index into the history deque for the entity. - """ - """ - log.debug( - 'Getting %s:%s at index %s', - entity_type, entity_id, history_index) """ if history_index < 0 and history_index != -1: @@ -351,6 +349,16 @@ class Model(object): self._watch_received = asyncio.Event(loop=loop) self._charmstore = CharmStore(self.loop) + async def connect(self, *args, **kw): + """Connect to an arbitrary Juju model. + + args and kw are passed through to Connection.connect() + + """ + self.connection = await connection.Connection.connect(*args, **kw) + self._watch() + await self._watch_received.wait() + async def connect_current(self): """Connect to the current Juju model. @@ -359,6 +367,15 @@ class Model(object): self._watch() await self._watch_received.wait() + async def connect_model(self, arg): + """Connect to a specific Juju model. + :param arg: : + + """ + self.connection = await connection.Connection.connect_model(arg) + self._watch() + await self._watch_received.wait() + async def disconnect(self): """Shut down the watcher task and close websockets. @@ -535,22 +552,61 @@ class Model(object): if o.cares_about(delta): asyncio.ensure_future(o(delta, old_obj, new_obj, self)) - async def _wait_for_new(self, entity_type, entity_id, predicate=None): + async def _wait(self, entity_type, entity_id, action, predicate=None): + """ + Block the calling routine until a given action has happened to the + given entity + + :param entity_type: The entity's type. + :param entity_id: The entity's id. + :param action: the type of action (e.g., 'add' or 'change') + :param predicate: optional callable that must take as an + argument a delta, and must return a boolean, indicating + whether the delta contains the specific action we're looking + for. For example, you might check to see whether a 'change' + has a 'completed' status. See the _Observer class for details. + + """ + q = asyncio.Queue(loop=self.loop) + + async def callback(delta, old, new, model): + await q.put(delta.get_id()) + + self.add_observer(callback, entity_type, action, entity_id, predicate) + entity_id = await q.get() + return self.state._live_entity_map(entity_type)[entity_id] + + async def _wait_for_new(self, entity_type, entity_id=None, predicate=None): """Wait for a new object to appear in the Model and return it. Waits for an object of type ``entity_type`` with id ``entity_id``. + If ``entity_id`` is ``None``, it will wait for the first new entity + of the correct type. This coroutine blocks until the new object appears in the model. """ - entity_added = asyncio.Queue(loop=self.loop) + # if the entity is already in the model, just return it + if entity_id in self.state._live_entity_map(entity_type): + return self.state._live_entity_map(entity_type)[entity_id] + # if we know the entity_id, we can trigger on any action that puts + # the enitty into the model; otherwise, we have to watch for the + # next "add" action on that entity_type + action = 'add' if entity_id is None else None + return await self._wait(entity_type, entity_id, action, predicate) - async def callback(delta, old, new, model): - await entity_added.put(delta.get_id()) + async def wait_for_action(self, action_id): + """Given an action, wait for it to complete.""" - self.add_observer(callback, entity_type, 'add', entity_id, predicate) - entity_id = await entity_added.get() - return self.state._live_entity_map(entity_type)[entity_id] + if action_id.startswith("action-"): + # if we've been passed action.tag, transform it into the + # id that the api deltas will use. + action_id = action_id[7:] + + def predicate(delta): + return delta.data['status'] in ('completed', 'failed') + + return await self._wait('action', action_id, 'change', predicate) def add_machine( self, spec=None, constraints=None, disks=None, series=None, @@ -759,9 +815,6 @@ class Model(object): - series is required; how do we pick a default? """ - if constraints: - constraints = client.Value(**constraints) - if to: placement = [ client.Placement(**p) for p in to @@ -791,8 +844,11 @@ class Model(object): if pending_apps: # new apps will usually be in the model by now, but if some # haven't made it yet we'll need to wait on them to be added - await asyncio.wait([self._wait_for_new('application', app_name) - for app_name in pending_apps]) + await asyncio.gather(*[ + asyncio.ensure_future( + self.model._wait_for_new('application', app_name)) + for app_name in pending_apps + ]) return [app for name, app in self.applications.items() if name in handler.applications] else: @@ -815,7 +871,7 @@ class Model(object): ) await app_facade.Deploy([app]) - return [await self._wait_for_new('application', service_name)] + return await self._wait_for_new('application', service_name) def destroy(self): """Terminate all machines and resources for this model. @@ -823,6 +879,21 @@ class Model(object): """ pass + async def destroy_unit(self, *unit_names): + """Destroy units by name. + + """ + app_facade = client.ApplicationFacade() + app_facade.connect(self.connection) + + log.debug( + 'Destroying unit%s %s', + 's' if len(unit_names) == 1 else '', + ' '.join(unit_names)) + + return await app_facade.Destroy(self.name) + destroy_units = destroy_unit + def get_backup(self, archive_id): """Download a backup archive file. @@ -1135,6 +1206,36 @@ class Model(object): def charmstore(self): return self._charmstore + async def get_metrics(self, *tags): + """Retrieve metrics. + + :param str \*tags: Tags of entities from which to retrieve metrics. + No tags retrieves the metrics of all units in the model. + """ + log.debug("Retrieving metrics for %s", + ', '.join(tags) if tags else "all units") + + metrics_facade = client.MetricsDebugFacade() + metrics_facade.connect(self.connection) + + entities = [client.Entity(tag) for tag in tags] + metrics_result = await metrics_facade.GetMetrics(entities) + + metrics = collections.defaultdict(list) + + for entity_metrics in metrics_result.results: + error = entity_metrics.error + if error: + if "is not a valid tag" in error: + raise ValueError(error.message) + else: + raise Exception(error.message) + + for metric in entity_metrics.metrics: + metrics[metric.unit].append(metric.to_json()) + + return metrics + class BundleHandler(object): """ @@ -1290,6 +1391,8 @@ class BundleHandler(object): # do the do log.info('Deploying %s', charm) await self.app_facade.Deploy([app]) + # ensure the app is in the model for future operations + await self.model._wait_for_new('application', application) return application async def addUnit(self, application, to):