import asyncio
import collections
import logging
+import re
+import weakref
from concurrent.futures import CancelledError
+from functools import partial
+
+import yaml
+from theblues import charmstore
from .client import client
from .client import watcher
from .client import connection
+from .constraints import parse as parse_constraints
from .delta import get_entity_delta
from .delta import get_entity_class
from .exceptions import DeadEntityException
+from .errors import JujuAPIError
log = logging.getLogger(__name__)
+class _Observer(object):
+ """Wrapper around an observer callable.
+
+ This wrapper allows filter criteria to be associated with the
+ callable so that it's only called for changes that meet the criteria.
+
+ """
+ def __init__(self, callable_, entity_type, action, entity_id, predicate):
+ self.callable_ = callable_
+ self.entity_type = entity_type
+ self.action = action
+ self.entity_id = entity_id
+ self.predicate = predicate
+ if self.entity_id:
+ self.entity_id = str(self.entity_id)
+ if not self.entity_id.startswith('^'):
+ self.entity_id = '^' + self.entity_id
+ if not self.entity_id.endswith('$'):
+ self.entity_id += '$'
+
+ async def __call__(self, delta, old, new, model):
+ await self.callable_(delta, old, new, model)
+
+ def cares_about(self, delta):
+ """Return True if this observer "cares about" (i.e. wants to be
+ called) for a this delta.
+
+ """
+ if (self.entity_id and delta.get_id() and
+ not re.match(self.entity_id, str(delta.get_id()))):
+ return False
+
+ if self.entity_type and self.entity_type != delta.entity:
+ return False
+
+ if self.action and self.action != delta.type:
+ return False
+
+ if self.predicate and not self.predicate(delta):
+ return False
+
+ return True
+
+
class ModelObserver(object):
async def __call__(self, delta, old, new, model):
- if old is None and new is not None:
- type_ = 'add'
- else:
- type_ = delta.type
- handler_name = 'on_{}_{}'.format(delta.entity, type_)
+ handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
method = getattr(self, handler_name, self.on_change)
await method(delta, old, new, model)
def get_entity(
self, entity_type, entity_id, history_index=-1, connected=True):
- """Return an object instance representing the entity created or
- updated by ``delta``
+ """Return an object instance for the given entity_type and id.
+
+ By default the object state matches the most recent state from
+ Juju. To get an instance of the object in an older state, pass
+ history_index, an index into the history deque for the entity.
"""
+
if history_index < 0 and history_index != -1:
history_index += len(self.entity_history(entity_type, entity_id))
+ if history_index < 0:
+ return None
try:
self.entity_data(entity_type, entity_id, history_index)
self.entity_type, self.entity_id))
return self.data[name]
+ def __bool__(self):
+ return bool(self.data)
+
+ def on_change(self, callable_):
+ """Add a change observer to this entity.
+
+ """
+ self.model.add_observer(
+ callable_, self.entity_type, 'change', self.entity_id)
+
+ def on_remove(self, callable_):
+ """Add a remove observer to this entity.
+
+ """
+ self.model.add_observer(
+ callable_, self.entity_type, 'remove', self.entity_id)
+
@property
def entity_type(self):
"""A string identifying the entity type of this object, e.g.
entity in the underlying model.
This will be True except when the object represents an entity at a
- prior state in history, e.g. if the object was obtained by calling
+ non-latest state in history, e.g. if the object was obtained by calling
.previous() on another object.
"""
"""
self.loop = loop or asyncio.get_event_loop()
self.connection = None
- self.observers = set()
+ self.observers = weakref.WeakValueDictionary()
self.state = ModelState(self)
self._watcher_task = None
self._watch_shutdown = asyncio.Event(loop=loop)
self._watch_received = asyncio.Event(loop=loop)
+ self._charmstore = CharmStore(self.loop)
+
+ async def connect(self, *args, **kw):
+ """Connect to an arbitrary Juju model.
+
+ args and kw are passed through to Connection.connect()
+
+ """
+ self.connection = await connection.Connection.connect(*args, **kw)
+ self._watch()
+ await self._watch_received.wait()
async def connect_current(self):
"""Connect to the current Juju model.
self._watch()
await self._watch_received.wait()
+ async def connect_model(self, arg):
+ """Connect to a specific Juju model.
+ :param arg: <controller>:<user/model>
+
+ """
+ self.connection = await connection.Connection.connect_model(arg)
+ self._watch()
+ await self._watch_received.wait()
+
async def disconnect(self):
"""Shut down the watcher task and close websockets.
"""
async def _block():
while not all(c() for c in conditions):
- await asyncio.sleep(.1)
+ await asyncio.sleep(0)
await asyncio.wait_for(_block(), timeout)
@property
"""
return self.state.units
- def add_observer(self, callable_):
+ def add_observer(
+ self, callable_, entity_type=None, action=None, entity_id=None,
+ predicate=None):
"""Register an "on-model-change" callback
- Once a watch is started (Model.watch() is called), ``callable_``
+ Once the model is connected, ``callable_``
will be called each time the model changes. callable_ should
be Awaitable and accept the following positional arguments:
model - The :class:`Model` itself.
+ Events for which ``callable_`` is called can be specified by passing
+ entity_type, action, and/or id_ filter criteria, e.g.:
+
+ add_observer(
+ myfunc, entity_type='application', action='add', id_='ubuntu')
+
+ For more complex filtering conditions, pass a predicate function. It
+ will be called with a delta as its only argument. If the predicate
+ function returns True, the callable_ will be called.
+
"""
- self.observers.add(callable_)
+ observer = _Observer(
+ callable_, entity_type, action, entity_id, predicate)
+ self.observers[observer] = callable_
def _watch(self):
"""Start an asynchronous watch against this model.
by applying this delta.
"""
+ if new_obj and not old_obj:
+ delta.type = 'add'
+
log.debug(
'Model changed: %s %s %s',
delta.entity, delta.type, delta.get_id())
+
for o in self.observers:
- asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ if o.cares_about(delta):
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+
+ async def _wait(self, entity_type, entity_id, action, predicate=None):
+ """
+ Block the calling routine until a given action has happened to the
+ given entity
+
+ :param entity_type: The entity's type.
+ :param entity_id: The entity's id.
+ :param action: the type of action (e.g., 'add' or 'change')
+ :param predicate: optional callable that must take as an
+ argument a delta, and must return a boolean, indicating
+ whether the delta contains the specific action we're looking
+ for. For example, you might check to see whether a 'change'
+ has a 'completed' status. See the _Observer class for details.
+
+ """
+ q = asyncio.Queue(loop=self.loop)
+
+ async def callback(delta, old, new, model):
+ await q.put(delta.get_id())
+
+ self.add_observer(callback, entity_type, action, entity_id, predicate)
+ entity_id = await q.get()
+ return self.state._live_entity_map(entity_type)[entity_id]
+
+ async def _wait_for_new(self, entity_type, entity_id, predicate=None):
+ """Wait for a new object to appear in the Model and return it.
+
+ Waits for an object of type ``entity_type`` with id ``entity_id``.
+
+ This coroutine blocks until the new object appears in the model.
+
+ """
+ return await self._wait(entity_type, entity_id, 'add', predicate)
+
+ async def wait_for_action(self, action_id):
+ """Given an action, wait for it to complete."""
+
+ if action_id.startswith("action-"):
+ # if we've been passed action.tag, transform it into the
+ # id that the api deltas will use.
+ action_id = action_id[7:]
+
+ def predicate(delta):
+ return delta.data['status'] in ('completed', 'failed')
+
+ return await self._wait('action', action_id, 'change', predicate)
def add_machine(
self, spec=None, constraints=None, disks=None, series=None,
log.debug(
'Adding relation %s <-> %s', relation1, relation2)
- return await app_facade.AddRelation([relation1, relation2])
+ try:
+ result = await app_facade.AddRelation([relation1, relation2])
+ except JujuAPIError as e:
+ if 'relation already exists' not in e.message:
+ raise
+ log.debug(
+ 'Relation %s <-> %s already exists', relation1, relation2)
+ # TODO: if relation already exists we should return the
+ # Relation ModelEntity here
+ return None
+
+ def predicate(delta):
+ endpoints = {}
+ for endpoint in delta.data['endpoints']:
+ endpoints[endpoint['application-name']] = endpoint['relation']
+ return endpoints == result.endpoints
+
+ return await self._wait_for_new('relation', None, predicate)
def add_space(self, name, *cidrs):
"""Add a new network space.
TODO::
- - entity_url must have a revision; look up latest automatically if
- not provided by caller
- service_name is required; fill this in automatically if not
provided by caller
- series is required; how do we pick a default?
"""
- if constraints:
- constraints = client.Value(**constraints)
-
if to:
placement = [
client.Placement(**p) for p in to
for k, v in storage.items()
}
+ entity_id = await self.charmstore.entityId(entity_url)
+
app_facade = client.ApplicationFacade()
client_facade = client.ClientFacade()
app_facade.connect(self.connection)
client_facade.connect(self.connection)
- log.debug(
- 'Deploying %s', entity_url)
-
- await client_facade.AddCharm(channel, entity_url)
- app = client.ApplicationDeploy(
- application=service_name,
- channel=channel,
- charm_url=entity_url,
- config=config,
- constraints=constraints,
- endpoint_bindings=bind,
- num_units=num_units,
- placement=placement,
- resources=resources,
- series=series,
- storage=storage,
- )
-
- return await app_facade.Deploy([app])
+ if 'bundle/' in entity_id:
+ handler = BundleHandler(self)
+ await handler.fetch_plan(entity_id)
+ await handler.execute_plan()
+ extant_apps = {app for app in self.applications}
+ pending_apps = set(handler.applications) - extant_apps
+ if pending_apps:
+ # new apps will usually be in the model by now, but if some
+ # haven't made it yet we'll need to wait on them to be added
+ await asyncio.gather(*[
+ asyncio.ensure_future(
+ self.model._wait_for_new('application', app_name))
+ for app_name in pending_apps
+ ])
+ return [app for name, app in self.applications.items()
+ if name in handler.applications]
+ else:
+ log.debug(
+ 'Deploying %s', entity_id)
+
+ await client_facade.AddCharm(channel, entity_id)
+ app = client.ApplicationDeploy(
+ application=service_name,
+ channel=channel,
+ charm_url=entity_id,
+ config=config,
+ constraints=constraints,
+ endpoint_bindings=bind,
+ num_units=num_units,
+ placement=placement,
+ resources=resources,
+ series=series,
+ storage=storage,
+ )
+
+ await app_facade.Deploy([app])
+ return await self._wait_for_new('application', service_name)
def destroy(self):
"""Terminate all machines and resources for this model.
"""
pass
+ async def destroy_unit(self, *unit_names):
+ """Destroy units by name.
+
+ """
+ app_facade = client.ApplicationFacade()
+ app_facade.connect(self.connection)
+
+ log.debug(
+ 'Destroying unit%s %s',
+ 's' if len(unit_names) == 1 else '',
+ ' '.join(unit_names))
+
+ return await app_facade.Destroy(self.name)
+ destroy_units = destroy_unit
+
def get_backup(self, archive_id):
"""Download a backup archive file.
"""
pass
+
+ @property
+ def charmstore(self):
+ return self._charmstore
+
+
+class BundleHandler(object):
+ """
+ Handle bundles by using the API to translate bundle YAML into a plan of
+ steps and then dispatching each of those using the API.
+ """
+ def __init__(self, model):
+ self.model = model
+ self.charmstore = model.charmstore
+ self.plan = []
+ self.references = {}
+ self._units_by_app = {}
+ for unit_name, unit in model.units.items():
+ app_units = self._units_by_app.setdefault(unit.application, [])
+ app_units.append(unit_name)
+ self.client_facade = client.ClientFacade()
+ self.client_facade.connect(model.connection)
+ self.app_facade = client.ApplicationFacade()
+ self.app_facade.connect(model.connection)
+ self.ann_facade = client.AnnotationsFacade()
+ self.ann_facade.connect(model.connection)
+
+ async def fetch_plan(self, entity_id):
+ bundle_yaml = await self.charmstore.files(entity_id,
+ filename='bundle.yaml',
+ read_file=True)
+ self.bundle = yaml.safe_load(bundle_yaml)
+ self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
+
+ async def execute_plan(self):
+ for step in self.plan.changes:
+ method = getattr(self, step.method)
+ result = await method(*step.args)
+ self.references[step.id_] = result
+
+ @property
+ def applications(self):
+ return list(self.bundle['services'].keys())
+
+ def resolve(self, reference):
+ if reference and reference.startswith('$'):
+ reference = self.references[reference[1:]]
+ return reference
+
+ async def addCharm(self, charm, series):
+ """
+ :param charm string:
+ Charm holds the URL of the charm to be added.
+
+ :param series string:
+ Series holds the series of the charm to be added
+ if the charm default is not sufficient.
+ """
+ entity_id = await self.charmstore.entityId(charm)
+ log.debug('Adding %s', entity_id)
+ await self.client_facade.AddCharm(None, entity_id)
+ return entity_id
+
+ async def addMachines(self, params):
+ """
+ :param series string:
+ Series holds the optional machine OS series.
+
+ :param constraints string:
+ Constraints holds the optional machine constraints.
+
+ :param Container_type string:
+ ContainerType optionally holds the type of the container (for
+ instance ""lxc" or kvm"). It is not specified for top level
+ machines.
+
+ :param parent_id string:
+ ParentId optionally holds a placeholder pointing to another machine
+ change or to a unit change. This value is only specified in the
+ case this machine is a container, in which case also ContainerType
+ is set.
+ """
+ if 'parent_id' in params:
+ params['parent_id'] = self.resolve(params['parent_id'])
+
+ params['constraints'] = parse_constraints(
+ params.get('constraints'))
+ params['jobs'] = params.get('jobs', ['JobHostUnits'])
+
+ params = client.AddMachineParams(**params)
+ results = await self.client_facade.AddMachines([params])
+ error = results.machines[0].error
+ if error:
+ raise ValueError("Error adding machine: %s", error.message)
+ machine = results.machines[0].machine
+ log.debug('Added new machine %s', machine)
+ return machine
+
+ async def addRelation(self, endpoint1, endpoint2):
+ """
+ :param endpoint1 string:
+ :param endpoint2 string:
+ Endpoint1 and Endpoint2 hold relation endpoints in the
+ "application:interface" form, where the application is always a
+ placeholder pointing to an application change, and the interface is
+ optional. Examples are "$deploy-42:web" or just "$deploy-42".
+ """
+ endpoints = [endpoint1, endpoint2]
+ # resolve indirect references
+ for i in range(len(endpoints)):
+ parts = endpoints[i].split(':')
+ parts[0] = self.resolve(parts[0])
+ endpoints[i] = ':'.join(parts)
+
+ log.info('Relating %s <-> %s', *endpoints)
+ return await self.model.add_relation(*endpoints)
+
+ async def deploy(self, charm, series, application, options, constraints,
+ storage, endpoint_bindings, resources):
+ """
+ :param charm string:
+ Charm holds the URL of the charm to be used to deploy this
+ application.
+
+ :param series string:
+ Series holds the series of the application to be deployed
+ if the charm default is not sufficient.
+
+ :param application string:
+ Application holds the application name.
+
+ :param options map[string]interface{}:
+ Options holds application options.
+
+ :param constraints string:
+ Constraints holds the optional application constraints.
+
+ :param storage map[string]string:
+ Storage holds the optional storage constraints.
+
+ :param endpoint_bindings map[string]string:
+ EndpointBindings holds the optional endpoint bindings
+
+ :param resources map[string]int:
+ Resources identifies the revision to use for each resource
+ of the application's charm.
+ """
+ # resolve indirect references
+ charm = self.resolve(charm)
+ # stringify all config values for API
+ options = {k: str(v) for k, v in options.items()}
+ # build param object
+ app = client.ApplicationDeploy(
+ charm_url=charm,
+ series=series,
+ application=application,
+ config=options,
+ constraints=constraints,
+ storage=storage,
+ endpoint_bindings=endpoint_bindings,
+ resources=resources,
+ )
+ # do the do
+ log.info('Deploying %s', charm)
+ await self.app_facade.Deploy([app])
+ return application
+
+ async def addUnit(self, application, to):
+ """
+ :param application string:
+ Application holds the application placeholder name for which a unit
+ is added.
+
+ :param to string:
+ To holds the optional location where to add the unit, as a
+ placeholder pointing to another unit change or to a machine change.
+ """
+ application = self.resolve(application)
+ placement = self.resolve(to)
+ if self._units_by_app.get(application):
+ # enough units for this application already exist;
+ # claim one, and carry on
+ # NB: this should probably honor placement, but the juju client
+ # doesn't, so we're not bothering, either
+ unit_name = self._units_by_app[application].pop()
+ log.debug('Reusing unit %s for %s', unit_name, application)
+ return self.model.units[unit_name]
+
+ log.debug('Adding new unit for %s%s', application,
+ ' to %s' % placement if placement else '')
+ return await self.model.applications[application].add_unit(
+ count=1,
+ to=placement,
+ )
+
+ async def expose(self, application):
+ """
+ :param application string:
+ Application holds the placeholder name of the application that must
+ be exposed.
+ """
+ application = self.resolve(application)
+ log.info('Exposing %s', application)
+ return await self.model.applications[application].expose()
+
+ async def setAnnotations(self, id_, entity_type, annotations):
+ """
+ :param id_ string:
+ Id is the placeholder for the application or machine change
+ corresponding to the entity to be annotated.
+
+ :param entity_type EntityType:
+ EntityType holds the type of the entity, "application" or
+ "machine".
+
+ :param annotations map[string]string:
+ Annotations holds the annotations as key/value pairs.
+ """
+ entity_id = self.resolve(id_)
+ try:
+ entity = self.model.state.get_entity(entity_type, entity_id)
+ except KeyError:
+ entity = await self.model._wait_for_new(entity_type, entity_id)
+ return await entity.set_annotations(annotations)
+
+
+class CharmStore(object):
+ """
+ Async wrapper around theblues.charmstore.CharmStore
+ """
+ def __init__(self, loop):
+ self.loop = loop
+ self._cs = charmstore.CharmStore()
+
+ def __getattr__(self, name):
+ """
+ Wrap method calls in coroutines that use run_in_executor to make them
+ async.
+ """
+ attr = getattr(self._cs, name)
+ if not callable(attr):
+ wrapper = partial(getattr, self._cs, name)
+ setattr(self, name, wrapper)
+ else:
+ async def coro(*args, **kwargs):
+ method = partial(attr, *args, **kwargs)
+ return await self.loop.run_in_executor(None, method)
+ setattr(self, name, coro)
+ wrapper = coro
+ return wrapper