+import asyncio
+import logging
+from concurrent.futures import CancelledError
+
+from .client import client
from .client import watcher
+from .client import connection
from .delta import get_entity_delta
+log = logging.getLogger(__name__)
+
+
+class ModelObserver(object):
+ def __call__(self, delta, old, new, model):
+ if old is None and new is not None:
+ type_ = 'add'
+ else:
+ type_ = delta.type
+ handler_name = 'on_{}_{}'.format(delta.entity, type_)
+ method = getattr(self, handler_name, self.on_change)
+ log.debug(
+ 'Model changed: %s %s %s',
+ delta.entity, delta.type, delta.get_id())
+ method(delta, old, new, model)
+
+ def on_change(self, delta, old, new, model):
+ pass
+
class ModelEntity(object):
"""An object in the Model tree"""
"""
self.data = data
self.model = model
+ self.connection = model.connection
def __getattr__(self, name):
return self.data[name]
class Model(object):
- def __init__(self, connection):
+ def __init__(self, loop=None):
"""Instantiate a new connected Model.
- :param connection: `juju.client.connection.Connection` instance
+ :param loop: an asyncio event loop
"""
- self.connection = connection
+ self.loop = loop or asyncio.get_event_loop()
+ self.connection = None
self.observers = set()
self.state = dict()
+ self._watcher_task = None
+ self._watch_shutdown = asyncio.Event(loop=loop)
+ self._watch_received = asyncio.Event(loop=loop)
+
+ async def connect_current(self):
+ self.connection = await connection.Connection.connect_current()
+ self._watch()
+ await self._watch_received.wait()
+
+ async def disconnect(self):
+ self._stop_watching()
+ if self.connection and self.connection.is_open:
+ await self._watch_shutdown.wait()
+ log.debug('Closing model connection')
+ await asyncio.wait_for(self.connection.close(), None)
+ self.connection = None
+
+ def all_units_idle(self):
+ """Return True if all units are idle.
+
+ """
+ for unit in self.units.values():
+ unit_status = unit.data['agent-status']['current']
+ if unit_status != 'idle':
+ return False
+ return True
+
+ async def reset(self, force=False):
+ for app in self.applications.values():
+ await app.destroy()
+ for machine in self.machines.values():
+ await machine.destroy(force=force)
+
+ async def block_until(self, func):
+ async def _block():
+ while not func():
+ await asyncio.sleep(.1)
+ await asyncio.wait_for(_block(), None)
+
+ @property
+ def applications(self):
+ return self.state.get('application', {})
+
+ @property
+ def machines(self):
+ return self.state.get('machine', {})
+
+ @property
+ def units(self):
+ return self.state.get('unit', {})
def add_observer(self, callable_):
"""Register an "on-model-change" callback
"""
self.observers.add(callable_)
- async def watch(self):
+ def _watch(self):
"""Start an asynchronous watch against this model.
See :meth:`add_observer` to register an onchange callback.
"""
- allwatcher = watcher.AllWatcher()
- allwatcher.connect(self.connection)
- while True:
- results = await allwatcher.Next()
- for delta in results.deltas:
- delta = get_entity_delta(delta)
- old_obj, new_obj = self._apply_delta(delta)
- self._notify_observers(delta, old_obj, new_obj)
+ async def _start_watch():
+ self._watch_shutdown.clear()
+ try:
+ allwatcher = watcher.AllWatcher()
+ self._watch_conn = await self.connection.clone()
+ allwatcher.connect(self._watch_conn)
+ while True:
+ results = await allwatcher.Next()
+ for delta in results.deltas:
+ delta = get_entity_delta(delta)
+ old_obj, new_obj = self._apply_delta(delta)
+ self._notify_observers(delta, old_obj, new_obj)
+ self._watch_received.set()
+ except CancelledError:
+ log.debug('Closing watcher connection')
+ await asyncio.wait_for(self._watch_conn.close(), None)
+ self._watch_shutdown.set()
+ self._watch_conn = None
+
+ log.debug('Starting watcher task')
+ self._watcher_task = self.loop.create_task(_start_watch())
+
+ def _stop_watching(self):
+ """Stop the asynchronous watch against this model.
+
+ """
+ log.debug('Stopping watcher task')
+ if self._watcher_task:
+ self._watcher_task.cancel()
def _apply_delta(self, delta):
"""Apply delta to our model state and return the a copy of the
old_obj may be None if the delta is for the creation of a new object,
e.g. a new application or unit is deployed.
- new_obj may be if no object was created or updated, or if an object
- was deleted as a result of the delta being applied.
+ new_obj may be None if no object was created or updated, or if an
+ object was deleted as a result of the delta being applied.
"""
old_obj, new_obj = None, None
pass
add_machines = add_machine
- def add_relation(self, relation1, relation2):
- """Add a relation between two services.
+ async def add_relation(self, relation1, relation2):
+ """Add a relation between two applications.
- :param str relation1: '<service>[:<relation_name>]'
- :param str relation2: '<service>[:<relation_name>]'
+ :param str relation1: '<application>[:<relation_name>]'
+ :param str relation2: '<application>[:<relation_name>]'
"""
- pass
+ app_facade = client.ApplicationFacade()
+ app_facade.connect(self.connection)
+
+ log.debug(
+ 'Adding relation %s <-> %s', relation1, relation2)
+
+ return await app_facade.AddRelation([relation1, relation2])
def add_space(self, name, *cidrs):
"""Add a new network space.
"""
pass
- def deploy(
+ async def deploy(
self, entity_url, service_name=None, bind=None, budget=None,
channel=None, config=None, constraints=None, force=False,
- num_units=1, plan=None, resource=None, series=None, storage=None,
+ num_units=1, plan=None, resources=None, series=None, storage=None,
to=None):
"""Deploy a new service or bundle.
an unsupported series
:param int num_units: Number of units to deploy
:param str plan: Plan under which to deploy charm
- :param dict resource: <resource name>:<file path> pairs
+ :param dict resources: <resource name>:<file path> pairs
:param str series: Series on which to deploy
:param dict storage: Storage constraints TODO how do these look?
:param str to: Placement directive, e.g.::
If None, a new machine is provisioned.
- """
- pass
+
+ TODO::
+
+ - entity_url must have a revision; look up latest automatically if
+ not provided by caller
+ - service_name is required; fill this in automatically if not
+ provided by caller
+ - series is required; how do we pick a default?
+
+ """
+ if constraints:
+ constraints = client.Value(**constraints)
+
+ if to:
+ placement = [
+ client.Placement(**p) for p in to
+ ]
+ else:
+ placement = []
+
+ if storage:
+ storage = {
+ k: client.Constraints(**v)
+ for k, v in storage.items()
+ }
+
+ app_facade = client.ApplicationFacade()
+ client_facade = client.ClientFacade()
+ app_facade.connect(self.connection)
+ client_facade.connect(self.connection)
+
+ log.debug(
+ 'Deploying %s', entity_url)
+
+ await client_facade.AddCharm(channel, entity_url)
+ app = client.ApplicationDeploy(
+ application=service_name,
+ channel=channel,
+ charm_url=entity_url,
+ config=config,
+ constraints=constraints,
+ endpoint_bindings=bind,
+ num_units=num_units,
+ placement=placement,
+ resources=resources,
+ series=series,
+ storage=storage,
+ )
+
+ return await app_facade.Deploy([app])
def destroy(self):
"""Terminate all machines and resources for this model.