from .client import client
from .client import watcher
from .client import connection
+from .constraints import parse as parse_constraints
from .delta import get_entity_delta
from .delta import get_entity_class
from .exceptions import DeadEntityException
def get_entity(
self, entity_type, entity_id, history_index=-1, connected=True):
- """Return an object instance representing the entity created or
- updated by ``delta``
+ """Return an object instance for the given entity_type and id.
+
+ By default the object state matches the most recent state from
+ Juju. To get an instance of the object in an older state, pass
+ history_index, an index into the history deque for the entity.
- """
- """
- log.debug(
- 'Getting %s:%s at index %s',
- entity_type, entity_id, history_index)
"""
if history_index < 0 and history_index != -1:
self._watch_received = asyncio.Event(loop=loop)
self._charmstore = CharmStore(self.loop)
+ async def connect(self, *args, **kw):
+ """Connect to an arbitrary Juju model.
+
+ args and kw are passed through to Connection.connect()
+
+ """
+ self.connection = await connection.Connection.connect(*args, **kw)
+ self._watch()
+ await self._watch_received.wait()
+
async def connect_current(self):
"""Connect to the current Juju model.
self._watch()
await self._watch_received.wait()
+ async def connect_model(self, arg):
+ """Connect to a specific Juju model.
+ :param arg: <controller>:<user/model>
+
+ """
+ self.connection = await connection.Connection.connect_model(arg)
+ self._watch()
+ await self._watch_received.wait()
+
async def disconnect(self):
"""Shut down the watcher task and close websockets.
if o.cares_about(delta):
asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ async def _wait(self, entity_type, entity_id, action, predicate=None):
+ """
+ Block the calling routine until a given action has happened to the
+ given entity
+
+ :param entity_type: The entity's type.
+ :param entity_id: The entity's id.
+ :param action: the type of action (e.g., 'add' or 'change')
+ :param predicate: optional callable that must take as an
+ argument a delta, and must return a boolean, indicating
+ whether the delta contains the specific action we're looking
+ for. For example, you might check to see whether a 'change'
+ has a 'completed' status. See the _Observer class for details.
+
+ """
+ q = asyncio.Queue(loop=self.loop)
+
+ async def callback(delta, old, new, model):
+ await q.put(delta.get_id())
+
+ self.add_observer(callback, entity_type, action, entity_id, predicate)
+ entity_id = await q.get()
+ return self.state._live_entity_map(entity_type)[entity_id]
+
async def _wait_for_new(self, entity_type, entity_id, predicate=None):
"""Wait for a new object to appear in the Model and return it.
This coroutine blocks until the new object appears in the model.
"""
- entity_added = asyncio.Queue(loop=self.loop)
+ return await self._wait(entity_type, entity_id, 'add', predicate)
- async def callback(delta, old, new, model):
- await entity_added.put(delta.get_id())
+ async def wait_for_action(self, action_id):
+ """Given an action, wait for it to complete."""
- self.add_observer(callback, entity_type, 'add', entity_id, predicate)
- entity_id = await entity_added.get()
- return self.state._live_entity_map(entity_type)[entity_id]
+ if action_id.startswith("action-"):
+ # if we've been passed action.tag, transform it into the
+ # id that the api deltas will use.
+ action_id = action_id[7:]
+
+ def predicate(delta):
+ return delta.data['status'] in ('completed', 'failed')
+
+ return await self._wait('action', action_id, 'change', predicate)
def add_machine(
self, spec=None, constraints=None, disks=None, series=None,
- series is required; how do we pick a default?
"""
- if constraints:
- constraints = client.Value(**constraints)
-
if to:
placement = [
client.Placement(**p) for p in to
await self.client_facade.AddCharm(None, entity_id)
return entity_id
- async def addMachines(self, series, constraints, container_type,
- parent_id):
+ async def addMachines(self, params):
"""
:param series string:
Series holds the optional machine OS series.
case this machine is a container, in which case also ContainerType
is set.
"""
- params = client.AddMachineParams(
- series=series,
- constraints=constraints,
- container_type=container_type,
- parent_id=self.resolve(parent_id),
- )
- results = await self.client_facade.AddMachines(params)
- log.debug('Added new machine %s', results[0].machine)
- return results[0].machine
+ if 'parent_id' in params:
+ params['parent_id'] = self.resolve(params['parent_id'])
+
+ params['constraints'] = parse_constraints(
+ params.get('constraints'))
+ params['jobs'] = params.get('jobs', ['JobHostUnits'])
+
+ params = client.AddMachineParams(**params)
+ results = await self.client_facade.AddMachines([params])
+ error = results.machines[0].error
+ if error:
+ raise ValueError("Error adding machine: %s", error.message)
+ machine = results.machines[0].machine
+ log.debug('Added new machine %s', machine)
+ return machine
async def addRelation(self, endpoint1, endpoint2):
"""