def get_entity(
self, entity_type, entity_id, history_index=-1, connected=True):
- """Return an object instance representing the entity created or
- updated by ``delta``
+ """Return an object instance for the given entity_type and id.
+
+ By default the object state matches the most recent state from
+ Juju. To get an instance of the object in an older state, pass
+ history_index, an index into the history deque for the entity.
- """
- """
- log.debug(
- 'Getting %s:%s at index %s',
- entity_type, entity_id, history_index)
"""
if history_index < 0 and history_index != -1:
self._watch_received = asyncio.Event(loop=loop)
self._charmstore = CharmStore(self.loop)
+ async def connect(self, *args, **kw):
+ """Connect to an arbitrary Juju model.
+
+ args and kw are passed through to Connection.connect()
+
+ """
+ self.connection = await connection.Connection.connect(*args, **kw)
+ self._watch()
+ await self._watch_received.wait()
+
async def connect_current(self):
"""Connect to the current Juju model.
self._watch()
await self._watch_received.wait()
+ async def connect_model(self, arg):
+ """Connect to a specific Juju model.
+ :param arg: <controller>:<user/model>
+
+ """
+ self.connection = await connection.Connection.connect_model(arg)
+ self._watch()
+ await self._watch_received.wait()
+
async def disconnect(self):
"""Shut down the watcher task and close websockets.
if o.cares_about(delta):
asyncio.ensure_future(o(delta, old_obj, new_obj, self))
- async def _wait_for_new(self, entity_type, entity_id, predicate=None):
+ async def _wait(self, entity_type, entity_id, action, predicate=None):
+ """
+ Block the calling routine until a given action has happened to the
+ given entity
+
+ :param entity_type: The entity's type.
+ :param entity_id: The entity's id.
+ :param action: the type of action (e.g., 'add' or 'change')
+ :param predicate: optional callable that must take as an
+ argument a delta, and must return a boolean, indicating
+ whether the delta contains the specific action we're looking
+ for. For example, you might check to see whether a 'change'
+ has a 'completed' status. See the _Observer class for details.
+
+ """
+ q = asyncio.Queue(loop=self.loop)
+
+ async def callback(delta, old, new, model):
+ await q.put(delta.get_id())
+
+ self.add_observer(callback, entity_type, action, entity_id, predicate)
+ entity_id = await q.get()
+ return self.state._live_entity_map(entity_type)[entity_id]
+
+ async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
"""Wait for a new object to appear in the Model and return it.
Waits for an object of type ``entity_type`` with id ``entity_id``.
+ If ``entity_id`` is ``None``, it will wait for the first new entity
+ of the correct type.
This coroutine blocks until the new object appears in the model.
"""
- entity_added = asyncio.Queue(loop=self.loop)
+ # if the entity is already in the model, just return it
+ if entity_id in self.state._live_entity_map(entity_type):
+ return self.state._live_entity_map(entity_type)[entity_id]
+ # if we know the entity_id, we can trigger on any action that puts
+ # the enitty into the model; otherwise, we have to watch for the
+ # next "add" action on that entity_type
+ action = 'add' if entity_id is None else None
+ return await self._wait(entity_type, entity_id, action, predicate)
- async def callback(delta, old, new, model):
- await entity_added.put(delta.get_id())
+ async def wait_for_action(self, action_id):
+ """Given an action, wait for it to complete."""
- self.add_observer(callback, entity_type, 'add', entity_id, predicate)
- entity_id = await entity_added.get()
- return self.state._live_entity_map(entity_type)[entity_id]
+ if action_id.startswith("action-"):
+ # if we've been passed action.tag, transform it into the
+ # id that the api deltas will use.
+ action_id = action_id[7:]
+
+ def predicate(delta):
+ return delta.data['status'] in ('completed', 'failed')
+
+ return await self._wait('action', action_id, 'change', predicate)
def add_machine(
self, spec=None, constraints=None, disks=None, series=None,
- series is required; how do we pick a default?
"""
- if constraints:
- constraints = client.Value(**constraints)
-
if to:
placement = [
client.Placement(**p) for p in to
if pending_apps:
# new apps will usually be in the model by now, but if some
# haven't made it yet we'll need to wait on them to be added
- await asyncio.wait([self._wait_for_new('application', app_name)
- for app_name in pending_apps])
+ await asyncio.gather(*[
+ asyncio.ensure_future(
+ self.model._wait_for_new('application', app_name))
+ for app_name in pending_apps
+ ])
return [app for name, app in self.applications.items()
if name in handler.applications]
else:
)
await app_facade.Deploy([app])
- return [await self._wait_for_new('application', service_name)]
+ return await self._wait_for_new('application', service_name)
def destroy(self):
"""Terminate all machines and resources for this model.
def charmstore(self):
return self._charmstore
+ async def get_metrics(self, *tags):
+ """Retrieve metrics.
+
+ :param str \*tags: Tags of entities from which to retrieve metrics.
+ No tags retrieves the metrics of all units in the model.
+ """
+ log.debug("Retrieving metrics for %s",
+ ', '.join(tags) if tags else "all units")
+
+ metrics_facade = client.MetricsDebugFacade()
+ metrics_facade.connect(self.connection)
+
+ entities = [client.Entity(tag) for tag in tags]
+ metrics_result = await metrics_facade.GetMetrics(entities)
+
+ metrics = collections.defaultdict(list)
+
+ for entity_metrics in metrics_result.results:
+ error = entity_metrics.error
+ if error:
+ if "is not a valid tag" in error:
+ raise ValueError(error.message)
+ else:
+ raise Exception(error.message)
+
+ for metric in entity_metrics.metrics:
+ metrics[metric.unit].append(metric.to_json())
+
+ return metrics
+
class BundleHandler(object):
"""
# do the do
log.info('Deploying %s', charm)
await self.app_facade.Deploy([app])
+ # ensure the app is in the model for future operations
+ await self.model._wait_for_new('application', application)
return application
async def addUnit(self, application, to):