X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=7fe0514549b91d1f3b0fd6d65e82547066bdf9e6;hb=5acba37aaefa55fab65eb3ca8b5a5fcf24606a2a;hp=5d436fd8209acc14cd89447ae42647d0d098e058;hpb=370083ea917098f0913c52d13294311783e33164;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index 5d436fd..7fe0514 100644 --- a/juju/model.py +++ b/juju/model.py @@ -6,6 +6,7 @@ import weakref from concurrent.futures import CancelledError from functools import partial +import yaml from theblues import charmstore from .client import client @@ -26,12 +27,14 @@ class _Observer(object): callable so that it's only called for changes that meet the criteria. """ - def __init__(self, callable_, entity_type, action, entity_id): + def __init__(self, callable_, entity_type, action, entity_id, predicate): self.callable_ = callable_ self.entity_type = entity_type self.action = action self.entity_id = entity_id + self.predicate = predicate if self.entity_id: + self.entity_id = str(self.entity_id) if not self.entity_id.startswith('^'): self.entity_id = '^' + self.entity_id if not self.entity_id.endswith('$'): @@ -40,20 +43,22 @@ class _Observer(object): async def __call__(self, delta, old, new, model): await self.callable_(delta, old, new, model) - def cares_about(self, entity_type, action, entity_id): + def cares_about(self, delta): """Return True if this observer "cares about" (i.e. wants to be - called) for a change matching the entity_type, action, and - entity_id parameters. + called) for a this delta. """ - if (self.entity_id and entity_id and - not re.match(self.entity_id, str(entity_id))): + if (self.entity_id and delta.get_id() and + not re.match(self.entity_id, str(delta.get_id()))): return False - if self.entity_type and self.entity_type != entity_type: + if self.entity_type and self.entity_type != delta.entity: return False - if self.action and self.action != action: + if self.action and self.action != delta.type: + return False + + if self.predicate and not self.predicate(delta): return False return True @@ -78,9 +83,6 @@ class ModelState(object): self.model = model self.state = dict() - def clear(self): - self.state.clear() - def _live_entity_map(self, entity_type): """Return an id:Entity map of all the living entities of type ``entity_type``. @@ -395,7 +397,6 @@ class Model(object): await self.block_until( lambda: len(self.machines) == 0 ) - self.state.clear() async def block_until(self, *conditions, timeout=None): """Return only after all conditions are true. @@ -431,7 +432,8 @@ class Model(object): return self.state.units def add_observer( - self, callable_, entity_type=None, action=None, entity_id=None): + self, callable_, entity_type=None, action=None, entity_id=None, + predicate=None): """Register an "on-model-change" callback Once the model is connected, ``callable_`` @@ -459,8 +461,13 @@ class Model(object): add_observer( myfunc, entity_type='application', action='add', id_='ubuntu') + For more complex filtering conditions, pass a predicate function. It + will be called with a delta as its only argument. If the predicate + function returns True, the callable_ will be called. + """ - observer = _Observer(callable_, entity_type, action, entity_id) + observer = _Observer( + callable_, entity_type, action, entity_id, predicate) self.observers[observer] = callable_ def _watch(self): @@ -517,7 +524,7 @@ class Model(object): by applying this delta. """ - if not old_obj: + if new_obj and not old_obj: delta.type = 'add' log.debug( @@ -525,10 +532,10 @@ class Model(object): delta.entity, delta.type, delta.get_id()) for o in self.observers: - if o.cares_about(delta.entity, delta.type, delta.get_id()): + if o.cares_about(delta): asyncio.ensure_future(o(delta, old_obj, new_obj, self)) - async def _wait_for_new(self, entity_type, entity_id): + async def _wait_for_new(self, entity_type, entity_id, predicate=None): """Wait for a new object to appear in the Model and return it. Waits for an object of type ``entity_type`` with id ``entity_id``. @@ -536,12 +543,13 @@ class Model(object): This coroutine blocks until the new object appears in the model. """ - entity_added = asyncio.Event(loop=self.loop) + entity_added = asyncio.Queue(loop=self.loop) async def callback(delta, old, new, model): - entity_added.set() - self.add_observer(callback, entity_type, 'add', entity_id) - await entity_added.wait() + await entity_added.put(delta.get_id()) + + self.add_observer(callback, entity_type, 'add', entity_id, predicate) + entity_id = await entity_added.get() return self.state._live_entity_map(entity_type)[entity_id] def add_machine( @@ -587,7 +595,24 @@ class Model(object): log.debug( 'Adding relation %s <-> %s', relation1, relation2) - return await app_facade.AddRelation([relation1, relation2]) + try: + result = await app_facade.AddRelation([relation1, relation2]) + except JujuAPIError as e: + if 'relation already exists' not in e.message: + raise + log.debug( + 'Relation %s <-> %s already exists', relation1, relation2) + # TODO: if relation already exists we should return the + # Relation ModelEntity here + return None + + def predicate(delta): + endpoints = {} + for endpoint in delta.data['endpoints']: + endpoints[endpoint['application-name']] = endpoint['relation'] + return endpoints == result.endpoints + + return await self._wait_for_new('relation', None, predicate) def add_space(self, name, *cidrs): """Add a new network space. @@ -729,8 +754,6 @@ class Model(object): TODO:: - - entity_url must have a revision; look up latest automatically if - not provided by caller - service_name is required; fill this in automatically if not provided by caller - series is required; how do we pick a default? @@ -763,6 +786,15 @@ class Model(object): handler = BundleHandler(self) await handler.fetch_plan(entity_id) await handler.execute_plan() + extant_apps = {app for app in self.applications} + pending_apps = set(handler.applications) - extant_apps + if pending_apps: + # new apps will usually be in the model by now, but if some + # haven't made it yet we'll need to wait on them to be added + await asyncio.wait([self._wait_for_new('application', app_name) + for app_name in pending_apps]) + return [app for name, app in self.applications.items() + if name in handler.applications] else: log.debug( 'Deploying %s', entity_id) @@ -783,7 +815,7 @@ class Model(object): ) await app_facade.Deploy([app]) - return await self._wait_for_new('application', service_name) + return [await self._wait_for_new('application', service_name)] def destroy(self): """Terminate all machines and resources for this model. @@ -1126,10 +1158,11 @@ class BundleHandler(object): self.ann_facade.connect(model.connection) async def fetch_plan(self, entity_id): - yaml = await self.charmstore.files(entity_id, - filename='bundle.yaml', - read_file=True) - self.plan = await self.client_facade.GetBundleChanges(yaml) + bundle_yaml = await self.charmstore.files(entity_id, + filename='bundle.yaml', + read_file=True) + self.bundle = yaml.safe_load(bundle_yaml) + self.plan = await self.client_facade.GetBundleChanges(bundle_yaml) async def execute_plan(self): for step in self.plan.changes: @@ -1137,6 +1170,10 @@ class BundleHandler(object): result = await method(*step.args) self.references[step.id_] = result + @property + def applications(self): + return list(self.bundle['services'].keys()) + def resolve(self, reference): if reference and reference.startswith('$'): reference = self.references[reference[1:]] @@ -1201,14 +1238,9 @@ class BundleHandler(object): parts = endpoints[i].split(':') parts[0] = self.resolve(parts[0]) endpoints[i] = ':'.join(parts) - try: - await self.app_facade.AddRelation(endpoints) - log.debug('Added relation %s <-> %s', *endpoints) - except JujuAPIError as e: - if 'relation already exists' not in e.message: - raise - log.debug('Relation %s <-> %s already exists', *endpoints) - return None + + log.info('Relating %s <-> %s', *endpoints) + return await self.model.add_relation(*endpoints) async def deploy(self, charm, series, application, options, constraints, storage, endpoint_bindings, resources): @@ -1256,7 +1288,7 @@ class BundleHandler(object): resources=resources, ) # do the do - log.debug('Deploying %s', charm) + log.info('Deploying %s', charm) await self.app_facade.Deploy([app]) return application @@ -1279,16 +1311,14 @@ class BundleHandler(object): # doesn't, so we're not bothering, either unit_name = self._units_by_app[application].pop() log.debug('Reusing unit %s for %s', unit_name, application) - return unit_name - log.debug('Adding unit of %s%s', - application, - (' to %s' % placement) if placement else '') - result = await self.app_facade.AddUnits( - application=application, - placement=placement, - num_units=1, + return self.model.units[unit_name] + + log.debug('Adding new unit for %s%s', application, + ' to %s' % placement if placement else '') + return await self.model.applications[application].add_unit( + count=1, + to=placement, ) - return result.units[0] async def expose(self, application): """ @@ -1297,9 +1327,8 @@ class BundleHandler(object): be exposed. """ application = self.resolve(application) - log.debug('Exposing %s', application) - await self.app_facade.Expose(application) - return None + log.info('Exposing %s', application) + return await self.model.applications[application].expose() async def setAnnotations(self, id_, entity_type, annotations): """ @@ -1315,13 +1344,11 @@ class BundleHandler(object): Annotations holds the annotations as key/value pairs. """ entity_id = self.resolve(id_) - log.debug('Updating annotations of %s', entity_id) - ann = client.EntityAnnotations( - entity=entity_id, - annotations=annotations, - ) - await self.ann_facade.Set([ann]) - return None + try: + entity = self.model.state.get_entity(entity_type, entity_id) + except KeyError: + entity = await self.model._wait_for_new(entity_type, entity_id) + return await entity.set_annotations(annotations) class CharmStore(object):