X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=4a8bc039f470c5e2feffb0cbd5fa55c49d567cb0;hb=0d70179f2088da28e47bcad569366fb3dddd6e6d;hp=fe07f8099a571d4bdb6b4401bec8b6a54dd62fbd;hpb=fd2a74a2eba716e7ea298fe420f070221ff4581f;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index fe07f80..4a8bc03 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,14 +1,20 @@ import asyncio import collections import logging +import os +import re +import weakref from concurrent.futures import CancelledError from functools import partial +from pathlib import Path +import yaml from theblues import charmstore from .client import client from .client import watcher from .client import connection +from .constraints import parse as parse_constraints, normalize_key from .delta import get_entity_delta from .delta import get_entity_class from .exceptions import DeadEntityException @@ -17,13 +23,53 @@ from .errors import JujuAPIError log = logging.getLogger(__name__) +class _Observer(object): + """Wrapper around an observer callable. + + This wrapper allows filter criteria to be associated with the + callable so that it's only called for changes that meet the criteria. + + """ + def __init__(self, callable_, entity_type, action, entity_id, predicate): + self.callable_ = callable_ + self.entity_type = entity_type + self.action = action + self.entity_id = entity_id + self.predicate = predicate + if self.entity_id: + self.entity_id = str(self.entity_id) + if not self.entity_id.startswith('^'): + self.entity_id = '^' + self.entity_id + if not self.entity_id.endswith('$'): + self.entity_id += '$' + + async def __call__(self, delta, old, new, model): + await self.callable_(delta, old, new, model) + + def cares_about(self, delta): + """Return True if this observer "cares about" (i.e. wants to be + called) for a this delta. + + """ + if (self.entity_id and delta.get_id() and + not re.match(self.entity_id, str(delta.get_id()))): + return False + + if self.entity_type and self.entity_type != delta.entity: + return False + + if self.action and self.action != delta.type: + return False + + if self.predicate and not self.predicate(delta): + return False + + return True + + class ModelObserver(object): async def __call__(self, delta, old, new, model): - if old is None and new is not None: - type_ = 'add' - else: - type_ = delta.type - handler_name = 'on_{}_{}'.format(delta.entity, type_) + handler_name = 'on_{}_{}'.format(delta.entity, delta.type) method = getattr(self, handler_name, self.on_change) await method(delta, old, new, model) @@ -116,12 +162,18 @@ class ModelState(object): def get_entity( self, entity_type, entity_id, history_index=-1, connected=True): - """Return an object instance representing the entity created or - updated by ``delta`` + """Return an object instance for the given entity_type and id. + + By default the object state matches the most recent state from + Juju. To get an instance of the object in an older state, pass + history_index, an index into the history deque for the entity. """ + if history_index < 0 and history_index != -1: history_index += len(self.entity_history(entity_type, entity_id)) + if history_index < 0: + return None try: self.entity_data(entity_type, entity_id, history_index) @@ -155,18 +207,33 @@ class ModelEntity(object): self.connected = connected self.connection = model.connection + def __repr__(self): + return '<{} entity_id="{}">'.format(type(self).__name__, + self.entity_id) + def __getattr__(self, name): """Fetch object attributes from the underlying data dict held in the model. """ - if self.data is None: - raise DeadEntityException( - "Entity {}:{} is dead - its attributes can no longer be " - "accessed. Use the .previous() method on this object to get " - "a copy of the object at its previous state.".format( - self.entity_type, self.entity_id)) - return self.data[name] + return self.safe_data[name] + + def __bool__(self): + return bool(self.data) + + def on_change(self, callable_): + """Add a change observer to this entity. + + """ + self.model.add_observer( + callable_, self.entity_type, 'change', self.entity_id) + + def on_remove(self, callable_): + """Add a remove observer to this entity. + + """ + self.model.add_observer( + callable_, self.entity_type, 'remove', self.entity_id) @property def entity_type(self): @@ -182,7 +249,7 @@ class ModelEntity(object): entity in the underlying model. This will be True except when the object represents an entity at a - prior state in history, e.g. if the object was obtained by calling + non-latest state in history, e.g. if the object was obtained by calling .previous() on another object. """ @@ -216,6 +283,22 @@ class ModelEntity(object): return self.model.state.entity_data( self.entity_type, self.entity_id, self._history_index) + @property + def safe_data(self): + """The data dictionary for this entity. + + If this `ModelEntity` points to the dead state, it will + raise `DeadEntityException`. + + """ + if self.data is None: + raise DeadEntityException( + "Entity {}:{} is dead - its attributes can no longer be " + "accessed. Use the .previous() method on this object to get " + "a copy of the object at its previous state.".format( + self.entity_type, self.entity_id)) + return self.data + def previous(self): """Return a copy of this object as was at its previous state in history. @@ -276,20 +359,46 @@ class Model(object): """ self.loop = loop or asyncio.get_event_loop() self.connection = None - self.observers = set() + self.observers = weakref.WeakValueDictionary() self.state = ModelState(self) + self.info = None self._watcher_task = None self._watch_shutdown = asyncio.Event(loop=loop) self._watch_received = asyncio.Event(loop=loop) self._charmstore = CharmStore(self.loop) + async def connect(self, *args, **kw): + """Connect to an arbitrary Juju model. + + args and kw are passed through to Connection.connect() + + """ + self.connection = await connection.Connection.connect(*args, **kw) + await self._after_connect() + async def connect_current(self): """Connect to the current Juju model. """ self.connection = await connection.Connection.connect_current() + await self._after_connect() + + async def connect_model(self, model_name): + """Connect to a specific Juju model by name. + + :param model_name: Format [controller:][user/]model + + """ + self.connection = await connection.Connection.connect_model(model_name) + await self._after_connect() + + async def _after_connect(self): + """Run initialization steps after connecting to websocket. + + """ self._watch() await self._watch_received.wait() + await self.get_info() async def disconnect(self): """Shut down the watcher task and close websockets. @@ -330,13 +439,13 @@ class Model(object): lambda: len(self.machines) == 0 ) - async def block_until(self, *conditions, timeout=None): + async def block_until(self, *conditions, timeout=None, wait_period=0.5): """Return only after all conditions are true. """ async def _block(): while not all(c() for c in conditions): - await asyncio.sleep(.1) + await asyncio.sleep(wait_period) await asyncio.wait_for(_block(), timeout) @property @@ -363,11 +472,34 @@ class Model(object): """ return self.state.units - def add_observer(self, callable_): + async def get_info(self): + """Return a client.ModelInfo object for this Model. + + Retrieves latest info for this Model from the api server. The + return value is cached on the Model.info attribute so that the + valued may be accessed again without another api call, if + desired. + + This method is called automatically when the Model is connected, + resulting in Model.info being initialized without requiring an + explicit call to this method. + + """ + facade = client.ClientFacade() + facade.connect(self.connection) + + self.info = await facade.ModelInfo() + log.debug('Got ModelInfo: %s', vars(self.info)) + + return self.info + + def add_observer( + self, callable_, entity_type=None, action=None, entity_id=None, + predicate=None): """Register an "on-model-change" callback - Once a watch is started (Model.watch() is called), ``callable_`` - will be called each time the model changes. callable_ should + Once the model is connected, ``callable_`` + will be called each time the model changes. ``callable_`` should be Awaitable and accept the following positional arguments: delta - An instance of :class:`juju.delta.EntityDelta` @@ -385,8 +517,21 @@ class Model(object): model - The :class:`Model` itself. + Events for which ``callable_`` is called can be specified by passing + entity_type, action, and/or entitiy_id filter criteria, e.g.:: + + add_observer( + myfunc, + entity_type='application', action='add', entity_id='ubuntu') + + For more complex filtering conditions, pass a predicate function. It + will be called with a delta as its only argument. If the predicate + function returns True, the ``callable_`` will be called. + """ - self.observers.add(callable_) + observer = _Observer( + callable_, entity_type, action, entity_id, predicate) + self.observers[observer] = callable_ def _watch(self): """Start an asynchronous watch against this model. @@ -442,11 +587,72 @@ class Model(object): by applying this delta. """ + if new_obj and not old_obj: + delta.type = 'add' + log.debug( 'Model changed: %s %s %s', delta.entity, delta.type, delta.get_id()) + for o in self.observers: - asyncio.ensure_future(o(delta, old_obj, new_obj, self)) + if o.cares_about(delta): + asyncio.ensure_future(o(delta, old_obj, new_obj, self)) + + async def _wait(self, entity_type, entity_id, action, predicate=None): + """ + Block the calling routine until a given action has happened to the + given entity + + :param entity_type: The entity's type. + :param entity_id: The entity's id. + :param action: the type of action (e.g., 'add' or 'change') + :param predicate: optional callable that must take as an + argument a delta, and must return a boolean, indicating + whether the delta contains the specific action we're looking + for. For example, you might check to see whether a 'change' + has a 'completed' status. See the _Observer class for details. + + """ + q = asyncio.Queue(loop=self.loop) + + async def callback(delta, old, new, model): + await q.put(delta.get_id()) + + self.add_observer(callback, entity_type, action, entity_id, predicate) + entity_id = await q.get() + return self.state._live_entity_map(entity_type)[entity_id] + + async def _wait_for_new(self, entity_type, entity_id=None, predicate=None): + """Wait for a new object to appear in the Model and return it. + + Waits for an object of type ``entity_type`` with id ``entity_id``. + If ``entity_id`` is ``None``, it will wait for the first new entity + of the correct type. + + This coroutine blocks until the new object appears in the model. + + """ + # if the entity is already in the model, just return it + if entity_id in self.state._live_entity_map(entity_type): + return self.state._live_entity_map(entity_type)[entity_id] + # if we know the entity_id, we can trigger on any action that puts + # the enitty into the model; otherwise, we have to watch for the + # next "add" action on that entity_type + action = 'add' if entity_id is None else None + return await self._wait(entity_type, entity_id, action, predicate) + + async def wait_for_action(self, action_id): + """Given an action, wait for it to complete.""" + + if action_id.startswith("action-"): + # if we've been passed action.tag, transform it into the + # id that the api deltas will use. + action_id = action_id[7:] + + def predicate(delta): + return delta.data['status'] in ('completed', 'failed') + + return await self._wait('action', action_id, 'change', predicate) def add_machine( self, spec=None, constraints=None, disks=None, series=None, @@ -491,7 +697,24 @@ class Model(object): log.debug( 'Adding relation %s <-> %s', relation1, relation2) - return await app_facade.AddRelation([relation1, relation2]) + try: + result = await app_facade.AddRelation([relation1, relation2]) + except JujuAPIError as e: + if 'relation already exists' not in e.message: + raise + log.debug( + 'Relation %s <-> %s already exists', relation1, relation2) + # TODO: if relation already exists we should return the + # Relation ModelEntity here + return None + + def predicate(delta): + endpoints = {} + for endpoint in delta.data['endpoints']: + endpoints[endpoint['application-name']] = endpoint['relation'] + return endpoints == result.endpoints + + return await self._wait_for_new('relation', None, predicate) def add_space(self, name, *cidrs): """Add a new network space. @@ -600,14 +823,14 @@ class Model(object): pass async def deploy( - self, entity_url, service_name=None, bind=None, budget=None, + self, entity_url, application_name=None, bind=None, budget=None, channel=None, config=None, constraints=None, force=False, num_units=1, plan=None, resources=None, series=None, storage=None, to=None): """Deploy a new service or bundle. :param str entity_url: Charm or bundle url - :param str service_name: Name to give the service + :param str application_name: Name to give the service :param dict bind: : pairs :param dict budget: : pairs :param str channel: Charm store channel from which to retrieve @@ -633,16 +856,11 @@ class Model(object): TODO:: - - entity_url must have a revision; look up latest automatically if - not provided by caller - - service_name is required; fill this in automatically if not + - application_name is required; fill this in automatically if not provided by caller - series is required; how do we pick a default? """ - if constraints: - constraints = client.Value(**constraints) - if to: placement = [ client.Placement(**p) for p in to @@ -656,28 +874,47 @@ class Model(object): for k, v in storage.items() } - entity_id = await self.charmstore.entityId(entity_url) + is_local = not entity_url.startswith('cs:') and \ + os.path.isdir(entity_url) + entity_id = await self.charmstore.entityId(entity_url) \ + if not is_local else entity_url app_facade = client.ApplicationFacade() client_facade = client.ClientFacade() app_facade.connect(self.connection) client_facade.connect(self.connection) - if 'bundle/' in entity_id: + is_bundle = ((is_local and + (Path(entity_id) / 'bundle.yaml').exists()) or + (not is_local and 'bundle/' in entity_id)) + + if is_bundle: handler = BundleHandler(self) await handler.fetch_plan(entity_id) await handler.execute_plan() + extant_apps = {app for app in self.applications} + pending_apps = set(handler.applications) - extant_apps + if pending_apps: + # new apps will usually be in the model by now, but if some + # haven't made it yet we'll need to wait on them to be added + await asyncio.gather(*[ + asyncio.ensure_future( + self._wait_for_new('application', app_name)) + for app_name in pending_apps + ]) + return [app for name, app in self.applications.items() + if name in handler.applications] else: log.debug( 'Deploying %s', entity_id) await client_facade.AddCharm(channel, entity_id) app = client.ApplicationDeploy( - application=service_name, + application=application_name, channel=channel, charm_url=entity_id, config=config, - constraints=constraints, + constraints=parse_constraints(constraints), endpoint_bindings=bind, num_units=num_units, placement=placement, @@ -686,7 +923,8 @@ class Model(object): storage=storage, ) - return await app_facade.Deploy([app]) + await app_facade.Deploy([app]) + return await self._wait_for_new('application', application_name) def destroy(self): """Terminate all machines and resources for this model. @@ -694,6 +932,21 @@ class Model(object): """ pass + async def destroy_unit(self, *unit_names): + """Destroy units by name. + + """ + app_facade = client.ApplicationFacade() + app_facade.connect(self.connection) + + log.debug( + 'Destroying unit%s %s', + 's' if len(unit_names) == 1 else '', + ' '.join(unit_names)) + + return await app_facade.Destroy(self.name) + destroy_units = destroy_unit + def get_backup(self, archive_id): """Download a backup archive file. @@ -1006,6 +1259,36 @@ class Model(object): def charmstore(self): return self._charmstore + async def get_metrics(self, *tags): + """Retrieve metrics. + + :param str \*tags: Tags of entities from which to retrieve metrics. + No tags retrieves the metrics of all units in the model. + """ + log.debug("Retrieving metrics for %s", + ', '.join(tags) if tags else "all units") + + metrics_facade = client.MetricsDebugFacade() + metrics_facade.connect(self.connection) + + entities = [client.Entity(tag) for tag in tags] + metrics_result = await metrics_facade.GetMetrics(entities) + + metrics = collections.defaultdict(list) + + for entity_metrics in metrics_result.results: + error = entity_metrics.error + if error: + if "is not a valid tag" in error: + raise ValueError(error.message) + else: + raise Exception(error.message) + + for metric in entity_metrics.metrics: + metrics[metric.unit].append(vars(metric)) + + return metrics + class BundleHandler(object): """ @@ -1029,10 +1312,15 @@ class BundleHandler(object): self.ann_facade.connect(model.connection) async def fetch_plan(self, entity_id): - yaml = await self.charmstore.files(entity_id, - filename='bundle.yaml', - read_file=True) - self.plan = await self.client_facade.GetBundleChanges(yaml) + is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id) + if is_local: + bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text() + else: + bundle_yaml = await self.charmstore.files(entity_id, + filename='bundle.yaml', + read_file=True) + self.bundle = yaml.safe_load(bundle_yaml) + self.plan = await self.client_facade.GetBundleChanges(bundle_yaml) async def execute_plan(self): for step in self.plan.changes: @@ -1040,6 +1328,10 @@ class BundleHandler(object): result = await method(*step.args) self.references[step.id_] = result + @property + def applications(self): + return list(self.bundle['services'].keys()) + def resolve(self, reference): if reference and reference.startswith('$'): reference = self.references[reference[1:]] @@ -1059,35 +1351,55 @@ class BundleHandler(object): await self.client_facade.AddCharm(None, entity_id) return entity_id - async def addMachines(self, series, constraints, container_type, - parent_id): + async def addMachines(self, params=None): """ - :param series string: - Series holds the optional machine OS series. + :param params dict: + Dictionary specifying the machine to add. All keys are optional. + Keys include: - :param constraints string: - Constraints holds the optional machine constraints. + series: string specifying the machine OS series. + + constraints: string holding machine constraints, if any. We'll + parse this into the json friendly dict that the juju api + expects. + + container_type: string holding the type of the container (for + instance ""lxd" or kvm"). It is not specified for top level + machines. - :param Container_type string: - ContainerType optionally holds the type of the container (for - instance ""lxc" or kvm"). It is not specified for top level - machines. + parent_id: string holding a placeholder pointing to another + machine change or to a unit change. This value is only + specified in the case this machine is a container, in + which case also ContainerType is set. - :param parent_id string: - ParentId optionally holds a placeholder pointing to another machine - change or to a unit change. This value is only specified in the - case this machine is a container, in which case also ContainerType - is set. """ - params = client.AddMachineParams( - series=series, - constraints=constraints, - container_type=container_type, - parent_id=self.resolve(parent_id), - ) - results = await self.client_facade.AddMachines(params) - log.debug('Added new machine %s', results[0].machine) - return results[0].machine + params = params or {} + + # Normalize keys + params = {normalize_key(k): params[k] for k in params.keys()} + + # Fix up values, as necessary. + if 'parent_id' in params: + params['parent_id'] = self.resolve(params['parent_id']) + + params['constraints'] = parse_constraints( + params.get('constraints')) + params['jobs'] = params.get('jobs', ['JobHostUnits']) + + if params.get('container_type') == 'lxc': + log.warning('Juju 2.0 does not support lxc containers. ' + 'Converting containers to lxd.') + params['container_type'] = 'lxd' + + # Submit the request. + params = client.AddMachineParams(**params) + results = await self.client_facade.AddMachines([params]) + error = results.machines[0].error + if error: + raise ValueError("Error adding machine: %s", error.message) + machine = results.machines[0].machine + log.debug('Added new machine %s', machine) + return machine async def addRelation(self, endpoint1, endpoint2): """ @@ -1104,14 +1416,9 @@ class BundleHandler(object): parts = endpoints[i].split(':') parts[0] = self.resolve(parts[0]) endpoints[i] = ':'.join(parts) - try: - await self.app_facade.AddRelation(endpoints) - log.debug('Added relation %s <-> %s', *endpoints) - except JujuAPIError as e: - if 'relation already exists' not in e.message: - raise - log.debug('Relation %s <-> %s already exists', *endpoints) - return None + + log.info('Relating %s <-> %s', *endpoints) + return await self.model.add_relation(*endpoints) async def deploy(self, charm, series, application, options, constraints, storage, endpoint_bindings, resources): @@ -1153,14 +1460,16 @@ class BundleHandler(object): series=series, application=application, config=options, - constraints=constraints, + constraints=parse_constraints(constraints), storage=storage, endpoint_bindings=endpoint_bindings, resources=resources, ) # do the do - log.debug('Deploying %s', charm) + log.info('Deploying %s', charm) await self.app_facade.Deploy([app]) + # ensure the app is in the model for future operations + await self.model._wait_for_new('application', application) return application async def addUnit(self, application, to): @@ -1182,16 +1491,14 @@ class BundleHandler(object): # doesn't, so we're not bothering, either unit_name = self._units_by_app[application].pop() log.debug('Reusing unit %s for %s', unit_name, application) - return unit_name - log.debug('Adding unit of %s%s', - application, - (' to %s' % placement) if placement else '') - result = await self.app_facade.AddUnits( - application=application, - placement=placement, - num_units=1, + return self.model.units[unit_name] + + log.debug('Adding new unit for %s%s', application, + ' to %s' % placement if placement else '') + return await self.model.applications[application].add_unit( + count=1, + to=placement, ) - return result.units[0] async def expose(self, application): """ @@ -1200,9 +1507,8 @@ class BundleHandler(object): be exposed. """ application = self.resolve(application) - log.debug('Exposing %s', application) - await self.app_facade.Expose(application) - return None + log.info('Exposing %s', application) + return await self.model.applications[application].expose() async def setAnnotations(self, id_, entity_type, annotations): """ @@ -1218,13 +1524,11 @@ class BundleHandler(object): Annotations holds the annotations as key/value pairs. """ entity_id = self.resolve(id_) - log.debug('Updating annotations of %s', entity_id) - ann = client.EntityAnnotations( - entity=entity_id, - annotations=annotations, - ) - await self.ann_facade.Set([ann]) - return None + try: + entity = self.model.state.get_entity(entity_type, entity_id) + except KeyError: + entity = await self.model._wait_for_new(entity_type, entity_id) + return await entity.set_annotations(annotations) class CharmStore(object):