X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=modules%2Flibjuju%2Fjuju%2Fmodel.py;fp=modules%2Flibjuju%2Fjuju%2Fmodel.py;h=ac225992fc8fd82209271d07d837e683e93aedf5;hp=fc8d5e98dd83880cb00cc3ef92825db7fcd32ead;hb=c3e6c2ec9a1fddfc8e9bd31509b366e633b6d99e;hpb=1a15d1c84fc826fa7996c1c9d221a324edd33432 diff --git a/modules/libjuju/juju/model.py b/modules/libjuju/juju/model.py index fc8d5e9..ac22599 100644 --- a/modules/libjuju/juju/model.py +++ b/modules/libjuju/juju/model.py @@ -14,26 +14,25 @@ from concurrent.futures import CancelledError from functools import partial from pathlib import Path -import websockets -import yaml import theblues.charmstore import theblues.errors +import websockets +import yaml from . import tag, utils -from .client import client -from .client import connection +from .client import client, connector from .client.client import ConfigValue -from .constraints import parse as parse_constraints, normalize_key -from .delta import get_entity_delta -from .delta import get_entity_class +from .constraints import parse as parse_constraints +from .constraints import normalize_key +from .delta import get_entity_class, get_entity_delta +from .errors import JujuAPIError, JujuError from .exceptions import DeadEntityException -from .errors import JujuError, JujuAPIError from .placement import parse as parse_placement log = logging.getLogger(__name__) -class _Observer(object): +class _Observer: """Wrapper around an observer callable. This wrapper allows filter criteria to be associated with the @@ -77,7 +76,7 @@ class _Observer(object): return True -class ModelObserver(object): +class ModelObserver: """ Base class for creating observers that react to changes in a model. """ @@ -100,7 +99,7 @@ class ModelObserver(object): pass -class ModelState(object): +class ModelState: """Holds the state of the model, including the delta history of all entities in the model. @@ -144,6 +143,14 @@ class ModelState(object): """ return self._live_entity_map('unit') + @property + def relations(self): + """Return a map of relation-id:Relation for all relations currently in + the model. + + """ + return self._live_entity_map('relation') + def entity_history(self, entity_type, entity_id): """Return the history deque for an entity. @@ -209,7 +216,7 @@ class ModelState(object): connected=connected) -class ModelEntity(object): +class ModelEntity: """An object in the Model tree""" def __init__(self, entity_id, model, history_index=-1, connected=True): @@ -228,7 +235,7 @@ class ModelEntity(object): self.model = model self._history_index = history_index self.connected = connected - self.connection = model.connection + self.connection = model.connection() def __repr__(self): return '<{} entity_id="{}">'.format(type(self).__name__, @@ -380,90 +387,148 @@ class ModelEntity(object): return self.model.state.get_entity(self.entity_type, self.entity_id) -class Model(object): +class Model: """ The main API for interacting with a Juju model. """ - def __init__(self, loop=None, - max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE): - """Instantiate a new connected Model. + def __init__( + self, + loop=None, + max_frame_size=None, + bakery_client=None, + jujudata=None, + ): + """Instantiate a new Model. + + The connect method will need to be called before this + object can be used for anything interesting. + + If jujudata is None, jujudata.FileJujuData will be used. :param loop: an asyncio event loop :param max_frame_size: See `juju.client.connection.Connection.MAX_FRAME_SIZE` + :param bakery_client httpbakery.Client: The bakery client to use + for macaroon authorization. + :param jujudata JujuData: The source for current controller information. + """ + self._connector = connector.Connector( + loop=loop, + max_frame_size=max_frame_size, + bakery_client=bakery_client, + jujudata=jujudata, + ) + self._observers = weakref.WeakValueDictionary() + self.state = ModelState(self) + self._info = None + self._watch_stopping = asyncio.Event(loop=self._connector.loop) + self._watch_stopped = asyncio.Event(loop=self._connector.loop) + self._watch_received = asyncio.Event(loop=self._connector.loop) + self._watch_stopped.set() + self._charmstore = CharmStore(self._connector.loop) + + def is_connected(self): + """Reports whether the Model is currently connected.""" + return self._connector.is_connected() + + @property + def loop(self): + return self._connector.loop + def connection(self): + """Return the current Connection object. It raises an exception + if the Model is disconnected""" + return self._connector.connection() + + async def get_controller(self): + """Return a Controller instance for the currently connected model. + :return Controller: """ - self.loop = loop or asyncio.get_event_loop() - self.max_frame_size = max_frame_size - self.connection = None - self.observers = weakref.WeakValueDictionary() - self.state = ModelState(self) - self.info = None - self._watch_stopping = asyncio.Event(loop=self.loop) - self._watch_stopped = asyncio.Event(loop=self.loop) - self._watch_received = asyncio.Event(loop=self.loop) - self._charmstore = CharmStore(self.loop) + from juju.controller import Controller + controller = Controller(jujudata=self._connector.jujudata) + kwargs = self.connection().connect_params() + kwargs.pop('uuid') + await controller._connect_direct(**kwargs) + return controller async def __aenter__(self): - await self.connect_current() + await self.connect() return self async def __aexit__(self, exc_type, exc, tb): await self.disconnect() - if exc_type is not None: - return False + async def connect(self, model_name=None, **kwargs): + """Connect to a juju model. - async def connect(self, *args, **kw): - """Connect to an arbitrary Juju model. + If any arguments are specified other than model_name, then + model_name must be None and an explicit connection will be made + using Connection.connect using those parameters (the 'uuid' + parameter must be specified). - args and kw are passed through to Connection.connect() + Otherwise, if model_name is None, connect to the current model. - """ - if 'loop' not in kw: - kw['loop'] = self.loop - if 'max_frame_size' not in kw: - kw['max_frame_size'] = self.max_frame_size - self.connection = await connection.Connection.connect(*args, **kw) - await self._after_connect() + Otherwise, model_name must specify the name of a known + model. - async def connect_current(self): - """Connect to the current Juju model. + :param model_name: Format [controller:][user/]model """ - self.connection = await connection.Connection.connect_current( - self.loop, max_frame_size=self.max_frame_size) + await self.disconnect() + if not kwargs: + await self._connector.connect_model(model_name) + else: + if kwargs.get('uuid') is None: + raise ValueError('no UUID specified when connecting to model') + await self._connector.connect(**kwargs) await self._after_connect() async def connect_model(self, model_name): - """Connect to a specific Juju model by name. - - :param model_name: Format [controller:][user/]model + """ + .. deprecated:: 0.6.2 + Use connect(model_name=model_name) instead. + """ + return await self.connect(model_name=model_name) + async def connect_current(self): + """ + .. deprecated:: 0.6.2 + Use connect instead. """ - self.connection = await connection.Connection.connect_model( - model_name, self.loop, self.max_frame_size) + return await self.connect() + + async def _connect_direct(self, **kwargs): + await self.disconnect() + await self._connector.connect(**kwargs) await self._after_connect() async def _after_connect(self): - """Run initialization steps after connecting to websocket. - - """ self._watch() + + # Wait for the first packet of data from the AllWatcher, + # which contains all information on the model. + # TODO this means that we can't do anything until + # we've received all the model data, which might be + # a whole load of unneeded data if all the client wants + # to do is make one RPC call. await self._watch_received.wait() + await self.get_info() async def disconnect(self): """Shut down the watcher task and close websockets. """ - if self.connection and self.connection.is_open: + if not self._watch_stopped.is_set(): log.debug('Stopping watcher task') self._watch_stopping.set() await self._watch_stopped.wait() + self._watch_stopping.clear() + + if self.is_connected(): log.debug('Closing model connection') - await self.connection.close() - self.connection = None + await self._connector.disconnect() + self.info = None async def add_local_charm_dir(self, charm_dir, series): """Upload a local charm to the model. @@ -480,7 +545,7 @@ class Model(object): with fh: func = partial( self.add_local_charm, fh, series, os.stat(fh.name).st_size) - charm_url = await self.loop.run_in_executor(None, func) + charm_url = await self._connector.loop.run_in_executor(None, func) log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url) return charm_url @@ -505,7 +570,7 @@ class Model(object): instead. """ - conn, headers, path_prefix = self.connection.https_connection() + conn, headers, path_prefix = self.connection().https_connection() path = "%s/charms?series=%s" % (path_prefix, series) headers['Content-Type'] = 'application/zip' if size: @@ -549,13 +614,20 @@ class Model(object): async def block_until(self, *conditions, timeout=None, wait_period=0.5): """Return only after all conditions are true. + Raises `websockets.ConnectionClosed` if disconnected. """ - async def _block(): - while not all(c() for c in conditions): - if not (self.connection and self.connection.is_open): - raise websockets.ConnectionClosed(1006, 'no reason') - await asyncio.sleep(wait_period, loop=self.loop) - await asyncio.wait_for(_block(), timeout, loop=self.loop) + def _disconnected(): + return not (self.is_connected() and self.connection().is_open) + + def done(): + return _disconnected() or all(c() for c in conditions) + + await utils.block_until(done, + timeout=timeout, + wait_period=wait_period, + loop=self.loop) + if _disconnected(): + raise websockets.ConnectionClosed(1006, 'no reason') @property def applications(self): @@ -581,6 +653,13 @@ class Model(object): """ return self.state.units + @property + def relations(self): + """Return a list of all Relations currently in the model. + + """ + return list(self.state.relations.values()) + async def get_info(self): """Return a client.ModelInfo object for this Model. @@ -594,7 +673,7 @@ class Model(object): explicit call to this method. """ - facade = client.ClientFacade.from_connection(self.connection) + facade = client.ClientFacade.from_connection(self.connection()) self.info = await facade.ModelInfo() log.debug('Got ModelInfo: %s', vars(self.info)) @@ -639,7 +718,7 @@ class Model(object): """ observer = _Observer( callable_, entity_type, action, entity_id, predicate) - self.observers[observer] = callable_ + self._observers[observer] = callable_ def _watch(self): """Start an asynchronous watch against this model. @@ -650,13 +729,13 @@ class Model(object): async def _all_watcher(): try: allwatcher = client.AllWatcherFacade.from_connection( - self.connection) + self.connection()) while not self._watch_stopping.is_set(): try: results = await utils.run_with_interrupt( allwatcher.Next(), self._watch_stopping, - self.loop) + self._connector.loop) except JujuAPIError as e: if 'watcher was stopped' not in str(e): raise @@ -673,19 +752,27 @@ class Model(object): del allwatcher.Id continue except websockets.ConnectionClosed: - monitor = self.connection.monitor + monitor = self.connection().monitor if monitor.status == monitor.ERROR: # closed unexpectedly, try to reopen log.warning( 'Watcher: connection closed, reopening') - await self.connection.reconnect() + await self.connection().reconnect() + if monitor.status != monitor.CONNECTED: + # reconnect failed; abort and shutdown + log.error('Watcher: automatic reconnect ' + 'failed; stopping watcher') + break del allwatcher.Id continue else: # closed on request, go ahead and shutdown break if self._watch_stopping.is_set(): - await allwatcher.Stop() + try: + await allwatcher.Stop() + except websockets.ConnectionClosed: + pass # can't stop on a closed conn break for delta in results.deltas: delta = get_entity_delta(delta) @@ -704,7 +791,7 @@ class Model(object): self._watch_received.clear() self._watch_stopping.clear() self._watch_stopped.clear() - self.loop.create_task(_all_watcher()) + self._connector.loop.create_task(_all_watcher()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state @@ -724,10 +811,10 @@ class Model(object): 'Model changed: %s %s %s', delta.entity, delta.type, delta.get_id()) - for o in self.observers: + for o in self._observers: if o.cares_about(delta): asyncio.ensure_future(o(delta, old_obj, new_obj, self), - loop=self.loop) + loop=self._connector.loop) async def _wait(self, entity_type, entity_id, action, predicate=None): """ @@ -744,7 +831,7 @@ class Model(object): has a 'completed' status. See the _Observer class for details. """ - q = asyncio.Queue(loop=self.loop) + q = asyncio.Queue(loop=self._connector.loop) async def callback(delta, old, new, model): await q.put(delta.get_id()) @@ -755,24 +842,19 @@ class Model(object): # 'remove' action return self.state._live_entity_map(entity_type).get(entity_id) - async def _wait_for_new(self, entity_type, entity_id=None, predicate=None): + async def _wait_for_new(self, entity_type, entity_id): """Wait for a new object to appear in the Model and return it. - Waits for an object of type ``entity_type`` with id ``entity_id``. - If ``entity_id`` is ``None``, it will wait for the first new entity - of the correct type. - - This coroutine blocks until the new object appears in the model. + Waits for an object of type ``entity_type`` with id ``entity_id`` + to appear in the model. This is similar to watching for the + object using ``block_until``, but uses the watcher rather than + polling. """ # if the entity is already in the model, just return it if entity_id in self.state._live_entity_map(entity_type): return self.state._live_entity_map(entity_type)[entity_id] - # if we know the entity_id, we can trigger on any action that puts - # the enitty into the model; otherwise, we have to watch for the - # next "add" action on that entity_type - action = 'add' if entity_id is None else None - return await self._wait(entity_type, entity_id, action, predicate) + return await self._wait(entity_type, entity_id, None) async def wait_for_action(self, action_id): """Given an action, wait for it to complete.""" @@ -785,7 +867,7 @@ class Model(object): def predicate(delta): return delta.data['status'] in ('completed', 'failed') - return await self._wait('action', action_id, 'change', predicate) + return await self._wait('action', action_id, None, predicate) async def add_machine( self, spec=None, constraints=None, disks=None, series=None): @@ -865,7 +947,7 @@ class Model(object): params.series = series # Submit the request. - client_facade = client.ClientFacade.from_connection(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection()) results = await client_facade.AddMachines([params]) error = results.machines[0].error if error: @@ -881,29 +963,33 @@ class Model(object): :param str relation2: '[:]' """ - app_facade = client.ApplicationFacade.from_connection(self.connection) + app_facade = client.ApplicationFacade.from_connection(self.connection()) log.debug( 'Adding relation %s <-> %s', relation1, relation2) + def _find_relation(*specs): + for rel in self.relations: + if rel.matches(*specs): + return rel + return None + try: result = await app_facade.AddRelation([relation1, relation2]) except JujuAPIError as e: if 'relation already exists' not in e.message: raise - log.debug( - 'Relation %s <-> %s already exists', relation1, relation2) - # TODO: if relation already exists we should return the - # Relation ModelEntity here - return None + rel = _find_relation(relation1, relation2) + if rel: + return rel + raise JujuError('Relation {} {} exists but not in model'.format( + relation1, relation2)) - def predicate(delta): - endpoints = {} - for endpoint in delta.data['endpoints']: - endpoints[endpoint['application-name']] = endpoint['relation'] - return endpoints == result.endpoints + specs = ['{}:{}'.format(app, data['name']) + for app, data in result.endpoints.items()] - return await self._wait_for_new('relation', None, predicate) + await self.block_until(lambda: _find_relation(*specs) is not None) + return _find_relation(*specs) def add_space(self, name, *cidrs): """Add a new network space. @@ -924,7 +1010,7 @@ class Model(object): :param str key: The public ssh key """ - key_facade = client.KeyManagerFacade.from_connection(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection()) return await key_facade.AddKeys([key], user) add_ssh_keys = add_ssh_key @@ -1072,9 +1158,14 @@ class Model(object): for k, v in storage.items() } + entity_path = Path(entity_url.replace('local:', '')) + bundle_path = entity_path / 'bundle.yaml' + metadata_path = entity_path / 'metadata.yaml' + is_local = ( entity_url.startswith('local:') or - os.path.isdir(entity_url) + entity_path.is_dir() or + entity_path.is_file() ) if is_local: entity_id = entity_url.replace('local:', '') @@ -1082,10 +1173,11 @@ class Model(object): entity = await self.charmstore.entity(entity_url, channel=channel) entity_id = entity['Id'] - client_facade = client.ClientFacade.from_connection(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection()) is_bundle = ((is_local and - (Path(entity_id) / 'bundle.yaml').exists()) or + (entity_id.endswith('.yaml') and entity_path.exists()) or + bundle_path.exists()) or (not is_local and 'bundle/' in entity_id)) if is_bundle: @@ -1100,9 +1192,9 @@ class Model(object): await asyncio.gather(*[ asyncio.ensure_future( self._wait_for_new('application', app_name), - loop=self.loop) + loop=self._connector.loop) for app_name in pending_apps - ], loop=self.loop) + ], loop=self._connector.loop) return [app for name, app in self.applications.items() if name in handler.applications] else: @@ -1118,6 +1210,9 @@ class Model(object): entity_id, entity) else: + if not application_name: + metadata = yaml.load(metadata_path.read_text()) + application_name = metadata['name'] # We have a local charm dir that needs to be uploaded charm_dir = os.path.abspath( os.path.expanduser(entity_id)) @@ -1163,7 +1258,7 @@ class Model(object): return None resources_facade = client.ResourcesFacade.from_connection( - self.connection) + self.connection()) response = await resources_facade.AddPendingResources( tag.application(application), entity_url, @@ -1186,7 +1281,7 @@ class Model(object): default_flow_style=False) app_facade = client.ApplicationFacade.from_connection( - self.connection) + self.connection()) app = client.ApplicationDeploy( charm_url=charm_url, @@ -1201,7 +1296,6 @@ class Model(object): storage=storage, placement=placement ) - result = await app_facade.Deploy([app]) errors = [r.error.message for r in result.results if r.error] if errors: @@ -1218,7 +1312,7 @@ class Model(object): """Destroy units by name. """ - app_facade = client.ApplicationFacade.from_connection(self.connection) + app_facade = client.ApplicationFacade.from_connection(self.connection()) log.debug( 'Destroying unit%s %s', @@ -1263,7 +1357,7 @@ class Model(object): which have `source` and `value` attributes. """ config_facade = client.ModelConfigFacade.from_connection( - self.connection + self.connection() ) result = await config_facade.ModelGet() config = result.config @@ -1277,22 +1371,6 @@ class Model(object): """ raise NotImplementedError() - async def grant(self, username, acl='read'): - """Grant a user access to this model. - - :param str username: Username - :param str acl: Access control ('read' or 'write') - - """ - controller_conn = await self.connection.controller() - model_facade = client.ModelManagerFacade.from_connection( - controller_conn) - user = tag.user(username) - model = tag.model(self.info.uuid) - changes = client.ModifyModelAccess(acl, 'grant', model, user) - await self.revoke(username) - return await model_facade.ModifyModelAccess([changes]) - def import_ssh_key(self, identity): """Add a public SSH key from a trusted indentity source to this model. @@ -1326,7 +1404,7 @@ class Model(object): else it's fingerprint """ - key_facade = client.KeyManagerFacade.from_connection(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection()) entity = {'tag': tag.model(self.info.uuid)} entities = client.Entities([entity]) return await key_facade.ListKeys(entities, raw_ssh) @@ -1399,7 +1477,7 @@ class Model(object): :param str user: Juju user to which the key is registered """ - key_facade = client.KeyManagerFacade.from_connection(self.connection) + key_facade = client.KeyManagerFacade.from_connection(self.connection()) key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii'))) key = hashlib.md5(key).hexdigest() key = ':'.join(a + b for a, b in zip(key[::2], key[1::2])) @@ -1427,20 +1505,6 @@ class Model(object): """ raise NotImplementedError() - async def revoke(self, username): - """Revoke a user's access to this model. - - :param str username: Username to revoke - - """ - controller_conn = await self.connection.controller() - model_facade = client.ModelManagerFacade.from_connection( - controller_conn) - user = tag.user(username) - model = tag.model(self.info.uuid) - changes = client.ModifyModelAccess('read', 'revoke', model, user) - return await model_facade.ModifyModelAccess([changes]) - def run(self, command, timeout=None): """Run command on all machines in this model. @@ -1457,7 +1521,7 @@ class Model(object): `ConfigValue` instances, as returned by `get_config`. """ config_facade = client.ModelConfigFacade.from_connection( - self.connection + self.connection() ) for key, value in config.items(): if isinstance(value, ConfigValue): @@ -1506,7 +1570,7 @@ class Model(object): :param bool utc: Display time as UTC in RFC3339 format """ - client_facade = client.ClientFacade.from_connection(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection()) return await client_facade.FullStatus(filters) def sync_tools( @@ -1587,7 +1651,7 @@ class Model(object): ', '.join(tags) if tags else "all units") metrics_facade = client.MetricsDebugFacade.from_connection( - self.connection) + self.connection()) entities = [client.Entity(tag) for tag in tags] metrics_result = await metrics_facade.GetMetrics(entities) @@ -1623,7 +1687,7 @@ def get_charm_series(path): return series[0] if series else None -class BundleHandler(object): +class BundleHandler: """ Handle bundles by using the API to translate bundle YAML into a plan of steps and then dispatching each of those using the API. @@ -1638,11 +1702,11 @@ class BundleHandler(object): app_units = self._units_by_app.setdefault(unit.application, []) app_units.append(unit_name) self.client_facade = client.ClientFacade.from_connection( - model.connection) + model.connection()) self.app_facade = client.ApplicationFacade.from_connection( - model.connection) + model.connection()) self.ann_facade = client.AnnotationsFacade.from_connection( - model.connection) + model.connection()) async def _handle_local_charms(self, bundle): """Search for references to local charms (i.e. filesystem paths) @@ -1694,8 +1758,11 @@ class BundleHandler(object): return bundle async def fetch_plan(self, entity_id): - is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id) - if is_local: + is_local = not entity_id.startswith('cs:') + + if is_local and os.path.isfile(entity_id): + bundle_yaml = Path(entity_id).read_text() + elif is_local and os.path.isdir(entity_id): bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text() else: bundle_yaml = await self.charmstore.files(entity_id, @@ -1920,7 +1987,7 @@ class BundleHandler(object): return await entity.set_annotations(annotations) -class CharmStore(object): +class CharmStore: """ Async wrapper around theblues.charmstore.CharmStore """ @@ -1952,7 +2019,7 @@ class CharmStore(object): return wrapper -class CharmArchiveGenerator(object): +class CharmArchiveGenerator: """ Create a Zip archive of a local charm directory for upload to a controller.