def get_entity(
self, entity_type, entity_id, history_index=-1, connected=True):
- """Return an object instance representing the entity created or
- updated by ``delta``
+ """Return an object instance for the given entity_type and id.
+
+ By default the object state matches the most recent state from
+ Juju. To get an instance of the object in an older state, pass
+ history_index, an index into the history deque for the entity.
- """
- """
- log.debug(
- 'Getting %s:%s at index %s',
- entity_type, entity_id, history_index)
"""
if history_index < 0 and history_index != -1:
self._watch_received = asyncio.Event(loop=loop)
self._charmstore = CharmStore(self.loop)
+ async def connect(self, *args, **kw):
+ """Connect to an arbitrary Juju model.
+
+ args and kw are passed through to Connection.connect()
+
+ """
+ self.connection = await connection.Connection.connect(*args, **kw)
+ self._watch()
+ await self._watch_received.wait()
+
async def connect_current(self):
"""Connect to the current Juju model.
self._watch()
await self._watch_received.wait()
+ async def connect_model(self, arg):
+ """Connect to a specific Juju model.
+ :param arg: <controller>:<user/model>
+
+ """
+ self.connection = await connection.Connection.connect_model(arg)
+ self._watch()
+ await self._watch_received.wait()
+
async def disconnect(self):
"""Shut down the watcher task and close websockets.
if o.cares_about(delta):
asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ async def _wait(self, entity_type, entity_id, action, predicate=None):
+ """
+ Block the calling routine until a given action has happened to the
+ given entity
+
+ :param entity_type: The entity's type.
+ :param entity_id: The entity's id.
+ :param action: the type of action (e.g., 'add' or 'change')
+ :param predicate: optional callable that must take as an
+ argument a delta, and must return a boolean, indicating
+ whether the delta contains the specific action we're looking
+ for. For example, you might check to see whether a 'change'
+ has a 'completed' status. See the _Observer class for details.
+
+ """
+ q = asyncio.Queue(loop=self.loop)
+
+ async def callback(delta, old, new, model):
+ await q.put(delta.get_id())
+
+ self.add_observer(callback, entity_type, action, entity_id, predicate)
+ entity_id = await q.get()
+ return self.state._live_entity_map(entity_type)[entity_id]
+
async def _wait_for_new(self, entity_type, entity_id, predicate=None):
"""Wait for a new object to appear in the Model and return it.
This coroutine blocks until the new object appears in the model.
"""
- entity_added = asyncio.Queue(loop=self.loop)
+ return await self._wait(entity_type, entity_id, 'add', predicate)
- async def callback(delta, old, new, model):
- await entity_added.put(delta.get_id())
+ async def wait_for_action(self, action_id):
+ """Given an action, wait for it to complete."""
- self.add_observer(callback, entity_type, 'add', entity_id, predicate)
- entity_id = await entity_added.get()
- return self.state._live_entity_map(entity_type)[entity_id]
+ if action_id.startswith("action-"):
+ # if we've been passed action.tag, transform it into the
+ # id that the api deltas will use.
+ action_id = action_id[7:]
+
+ def predicate(delta):
+ return delta.data['status'] in ('completed', 'failed')
+
+ return await self._wait('action', action_id, 'change', predicate)
def add_machine(
self, spec=None, constraints=None, disks=None, series=None,
- series is required; how do we pick a default?
"""
- if constraints:
- constraints = client.Value(**constraints)
-
if to:
placement = [
client.Placement(**p) for p in to
)
await app_facade.Deploy([app])
- return [await self._wait_for_new('application', service_name)]
+ return await self._wait_for_new('application', service_name)
def destroy(self):
"""Terminate all machines and resources for this model.