+ def __init__(self, loop=None):
+ """Instantiate a new connected Model.
+
+ :param loop: an asyncio event loop
+
+ """
+ self.loop = loop or asyncio.get_event_loop()
+ self.connection = None
+ self.observers = weakref.WeakValueDictionary()
+ self.state = ModelState(self)
+ self.info = None
+ self._watcher_task = None
+ self._watch_shutdown = asyncio.Event(loop=loop)
+ self._watch_received = asyncio.Event(loop=loop)
+ self._charmstore = CharmStore(self.loop)
+
+ async def connect(self, *args, **kw):
+ """Connect to an arbitrary Juju model.
+
+ args and kw are passed through to Connection.connect()
+
+ """
+ self.connection = await connection.Connection.connect(*args, **kw)
+ await self._after_connect()
+
+ async def connect_current(self):
+ """Connect to the current Juju model.
+
+ """
+ self.connection = await connection.Connection.connect_current()
+ await self._after_connect()
+
+ async def connect_model(self, model_name):
+ """Connect to a specific Juju model by name.
+
+ :param model_name: Format [controller:][user/]model
+
+ """
+ self.connection = await connection.Connection.connect_model(model_name)
+ await self._after_connect()
+
+ async def _after_connect(self):
+ """Run initialization steps after connecting to websocket.
+
+ """
+ self._watch()
+ await self._watch_received.wait()
+ await self.get_info()
+
+ async def disconnect(self):
+ """Shut down the watcher task and close websockets.
+
+ """
+ self._stop_watching()
+ if self.connection and self.connection.is_open:
+ await self._watch_shutdown.wait()
+ log.debug('Closing model connection')
+ await self.connection.close()
+ self.connection = None
+
+ def all_units_idle(self):
+ """Return True if all units are idle.
+
+ """
+ for unit in self.units.values():
+ unit_status = unit.data['agent-status']['current']
+ if unit_status != 'idle':
+ return False
+ return True
+
+ async def reset(self, force=False):
+ """Reset the model to a clean state.
+
+ :param bool force: Force-terminate machines.
+
+ This returns only after the model has reached a clean state. "Clean"
+ means no applications or machines exist in the model.
+
+ """
+ log.debug('Resetting model')
+ for app in self.applications.values():
+ await app.destroy()
+ for machine in self.machines.values():
+ await machine.destroy(force=force)
+ await self.block_until(
+ lambda: len(self.machines) == 0
+ )
+
+ async def block_until(self, *conditions, timeout=None):
+ """Return only after all conditions are true.
+
+ """
+ async def _block():
+ while not all(c() for c in conditions):
+ await asyncio.sleep(0)
+ await asyncio.wait_for(_block(), timeout)
+
+ @property
+ def applications(self):
+ """Return a map of application-name:Application for all applications
+ currently in the model.
+
+ """
+ return self.state.applications
+
+ @property
+ def machines(self):
+ """Return a map of machine-id:Machine for all machines currently in
+ the model.
+
+ """
+ return self.state.machines
+
+ @property
+ def units(self):
+ """Return a map of unit-id:Unit for all units currently in
+ the model.
+
+ """
+ return self.state.units
+
+ async def get_info(self):
+ """Return a client.ModelInfo object for this Model.
+
+ Retrieves latest info for this Model from the api server. The
+ return value is cached on the Model.info attribute so that the
+ valued may be accessed again without another api call, if
+ desired.
+
+ This method is called automatically when the Model is connected,
+ resulting in Model.info being initialized without requiring an
+ explicit call to this method.
+
+ """
+ facade = client.ClientFacade()
+ facade.connect(self.connection)
+
+ self.info = await facade.ModelInfo()
+ log.debug('Got ModelInfo: %s', vars(self.info))
+
+ return self.info
+
+ def add_observer(
+ self, callable_, entity_type=None, action=None, entity_id=None,
+ predicate=None):
+ """Register an "on-model-change" callback
+
+ Once the model is connected, ``callable_``
+ will be called each time the model changes. callable_ should
+ be Awaitable and accept the following positional arguments:
+
+ delta - An instance of :class:`juju.delta.EntityDelta`
+ containing the raw delta data recv'd from the Juju
+ websocket.
+
+ old_obj - If the delta modifies an existing object in the model,
+ old_obj will be a copy of that object, as it was before the
+ delta was applied. Will be None if the delta creates a new
+ entity in the model.
+
+ new_obj - A copy of the new or updated object, after the delta
+ is applied. Will be None if the delta removes an entity
+ from the model.
+
+ model - The :class:`Model` itself.
+
+ Events for which ``callable_`` is called can be specified by passing
+ entity_type, action, and/or id_ filter criteria, e.g.:
+
+ add_observer(
+ myfunc, entity_type='application', action='add', id_='ubuntu')
+
+ For more complex filtering conditions, pass a predicate function. It
+ will be called with a delta as its only argument. If the predicate
+ function returns True, the callable_ will be called.
+
+ """
+ observer = _Observer(
+ callable_, entity_type, action, entity_id, predicate)
+ self.observers[observer] = callable_
+
+ def _watch(self):
+ """Start an asynchronous watch against this model.
+
+ See :meth:`add_observer` to register an onchange callback.
+
+ """
+ async def _start_watch():
+ self._watch_shutdown.clear()
+ try:
+ allwatcher = watcher.AllWatcher()
+ self._watch_conn = await self.connection.clone()
+ allwatcher.connect(self._watch_conn)
+ while True:
+ results = await allwatcher.Next()
+ for delta in results.deltas:
+ delta = get_entity_delta(delta)
+ old_obj, new_obj = self.state.apply_delta(delta)
+ # XXX: Might not want to shield at this level
+ # We are shielding because when the watcher is
+ # canceled (on disconnect()), we don't want all of
+ # its children (every observer callback) to be
+ # canceled with it. So we shield them. But this means
+ # they can *never* be canceled.
+ await asyncio.shield(
+ self._notify_observers(delta, old_obj, new_obj))
+ self._watch_received.set()
+ except CancelledError:
+ log.debug('Closing watcher connection')
+ await self._watch_conn.close()
+ self._watch_shutdown.set()
+ self._watch_conn = None
+
+ log.debug('Starting watcher task')
+ self._watcher_task = self.loop.create_task(_start_watch())
+
+ def _stop_watching(self):
+ """Stop the asynchronous watch against this model.
+
+ """
+ log.debug('Stopping watcher task')
+ if self._watcher_task:
+ self._watcher_task.cancel()
+
+ async def _notify_observers(self, delta, old_obj, new_obj):
+ """Call observing callbacks, notifying them of a change in model state
+
+ :param delta: The raw change from the watcher
+ (:class:`juju.client.overrides.Delta`)
+ :param old_obj: The object in the model that this delta updates.
+ May be None.
+ :param new_obj: The object in the model that is created or updated
+ by applying this delta.
+
+ """
+ if new_obj and not old_obj:
+ delta.type = 'add'
+
+ log.debug(
+ 'Model changed: %s %s %s',
+ delta.entity, delta.type, delta.get_id())
+
+ for o in self.observers:
+ if o.cares_about(delta):
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+
+ async def _wait(self, entity_type, entity_id, action, predicate=None):
+ """
+ Block the calling routine until a given action has happened to the
+ given entity
+
+ :param entity_type: The entity's type.
+ :param entity_id: The entity's id.
+ :param action: the type of action (e.g., 'add' or 'change')
+ :param predicate: optional callable that must take as an
+ argument a delta, and must return a boolean, indicating
+ whether the delta contains the specific action we're looking
+ for. For example, you might check to see whether a 'change'
+ has a 'completed' status. See the _Observer class for details.
+
+ """
+ q = asyncio.Queue(loop=self.loop)
+
+ async def callback(delta, old, new, model):
+ await q.put(delta.get_id())
+
+ self.add_observer(callback, entity_type, action, entity_id, predicate)
+ entity_id = await q.get()
+ return self.state._live_entity_map(entity_type)[entity_id]
+
+ async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
+ """Wait for a new object to appear in the Model and return it.
+
+ Waits for an object of type ``entity_type`` with id ``entity_id``.
+ If ``entity_id`` is ``None``, it will wait for the first new entity
+ of the correct type.
+
+ This coroutine blocks until the new object appears in the model.
+
+ """
+ # if the entity is already in the model, just return it
+ if entity_id in self.state._live_entity_map(entity_type):
+ return self.state._live_entity_map(entity_type)[entity_id]
+ # if we know the entity_id, we can trigger on any action that puts
+ # the enitty into the model; otherwise, we have to watch for the
+ # next "add" action on that entity_type
+ action = 'add' if entity_id is None else None
+ return await self._wait(entity_type, entity_id, action, predicate)
+
+ async def wait_for_action(self, action_id):
+ """Given an action, wait for it to complete."""
+
+ if action_id.startswith("action-"):
+ # if we've been passed action.tag, transform it into the
+ # id that the api deltas will use.
+ action_id = action_id[7:]
+
+ def predicate(delta):
+ return delta.data['status'] in ('completed', 'failed')
+
+ return await self._wait('action', action_id, 'change', predicate)
+