X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=bd8709a3bfa057a68319d0a5d732b8c54a55d8d7;hb=c50c361a8b9a3bbf1a33f5659e492b481f065cd2;hp=e024c65eea45aa258b5e90aff2dfa3f13b8ca9d3;hpb=054805953c7c8ed11ce341adf55bf6e19e589fbe;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index e024c65..bd8709a 100644 --- a/juju/model.py +++ b/juju/model.py @@ -14,6 +14,7 @@ from concurrent.futures import CancelledError from functools import partial from pathlib import Path +import websockets import yaml import theblues.charmstore import theblues.errors @@ -21,6 +22,7 @@ import theblues.errors from . import tag, utils from .client import client from .client import connection +from .client.client import ConfigValue from .constraints import parse as parse_constraints, normalize_key from .delta import get_entity_delta from .delta import get_entity_class @@ -76,6 +78,9 @@ class _Observer(object): class ModelObserver(object): + """ + Base class for creating observers that react to changes in a model. + """ async def __call__(self, delta, old, new, model): handler_name = 'on_{}_{}'.format(delta.entity, delta.type) method = getattr(self, handler_name, self.on_change) @@ -84,6 +89,8 @@ class ModelObserver(object): async def on_change(self, delta, old, new, model): """Generic model-change handler. + This should be overridden in a subclass. + :param delta: :class:`juju.client.overrides.Delta` :param old: :class:`juju.model.ModelEntity` :param new: :class:`juju.model.ModelEntity` @@ -374,13 +381,20 @@ class ModelEntity(object): class Model(object): - def __init__(self, loop=None): + """ + The main API for interacting with a Juju model. + """ + def __init__(self, loop=None, + max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE): """Instantiate a new connected Model. :param loop: an asyncio event loop + :param max_frame_size: See + `juju.client.connection.Connection.MAX_FRAME_SIZE` """ self.loop = loop or asyncio.get_event_loop() + self.max_frame_size = max_frame_size self.connection = None self.observers = weakref.WeakValueDictionary() self.state = ModelState(self) @@ -390,6 +404,16 @@ class Model(object): self._watch_received = asyncio.Event(loop=self.loop) self._charmstore = CharmStore(self.loop) + async def __aenter__(self): + await self.connect_current() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.disconnect() + + if exc_type is not None: + return False + async def connect(self, *args, **kw): """Connect to an arbitrary Juju model. @@ -398,6 +422,8 @@ class Model(object): """ if 'loop' not in kw: kw['loop'] = self.loop + if 'max_frame_size' not in kw: + kw['max_frame_size'] = self.max_frame_size self.connection = await connection.Connection.connect(*args, **kw) await self._after_connect() @@ -406,7 +432,7 @@ class Model(object): """ self.connection = await connection.Connection.connect_current( - self.loop) + self.loop, max_frame_size=self.max_frame_size) await self._after_connect() async def connect_model(self, model_name): @@ -415,8 +441,8 @@ class Model(object): :param model_name: Format [controller:][user/]model """ - self.connection = await connection.Connection.connect_model(model_name, - self.loop) + self.connection = await connection.Connection.connect_model( + model_name, self.loop, self.max_frame_size) await self._after_connect() async def _after_connect(self): @@ -526,6 +552,8 @@ class Model(object): """ async def _block(): while not all(c() for c in conditions): + if not (self.connection and self.connection.is_open): + raise websockets.ConnectionClosed(1006, 'no reason') await asyncio.sleep(wait_period, loop=self.loop) await asyncio.wait_for(_block(), timeout, loop=self.loop) @@ -619,16 +647,45 @@ class Model(object): See :meth:`add_observer` to register an onchange callback. """ - async def _start_watch(): + async def _all_watcher(): try: allwatcher = client.AllWatcherFacade.from_connection( self.connection) while not self._watch_stopping.is_set(): - results = await utils.run_with_interrupt( - allwatcher.Next(), - self._watch_stopping, - self.loop) + try: + results = await utils.run_with_interrupt( + allwatcher.Next(), + self._watch_stopping, + self.loop) + except JujuAPIError as e: + if 'watcher was stopped' not in str(e): + raise + if self._watch_stopping.is_set(): + # this shouldn't ever actually happen, because + # the event should trigger before the controller + # has a chance to tell us the watcher is stopped + # but handle it gracefully, just in case + break + # controller stopped our watcher for some reason + # but we're not actually stopping, so just restart it + log.warning( + 'Watcher: watcher stopped, restarting') + del allwatcher.Id + continue + except websockets.ConnectionClosed: + monitor = self.connection.monitor + if monitor.status == monitor.ERROR: + # closed unexpectedly, try to reopen + log.warning( + 'Watcher: connection closed, reopening') + await self.connection.reconnect() + del allwatcher.Id + continue + else: + # closed on request, go ahead and shutdown + break if self._watch_stopping.is_set(): + await allwatcher.Stop() break for delta in results.deltas: delta = get_entity_delta(delta) @@ -647,7 +704,7 @@ class Model(object): self._watch_received.clear() self._watch_stopping.clear() self._watch_stopped.clear() - self.loop.create_task(_start_watch()) + self.loop.create_task(_all_watcher()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state @@ -745,14 +802,34 @@ class Model(object): 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS 'maas2.name' - acquire machine maas2.name on MAAS - :param dict constraints: Machine constraints + :param dict constraints: Machine constraints, which can contain the + the following keys:: + + arch : str + container : str + cores : int + cpu_power : int + instance_type : str + mem : int + root_disk : int + spaces : list(str) + tags : list(str) + virt_type : str + Example:: constraints={ 'mem': 256 * MB, + 'tags': ['virtual'], } - :param list disks: List of disk constraint dictionaries + :param list disks: List of disk constraint dictionaries, which can + contain the following keys:: + + count : int + pool : str + size : int + Example:: disks=[{ @@ -964,7 +1041,7 @@ class Model(object): :param dict bind: : pairs :param dict budget: : pairs :param str channel: Charm store channel from which to retrieve - the charm or bundle, e.g. 'development' + the charm or bundle, e.g. 'edge' :param dict config: Charm configuration dictionary :param constraints: Service constraints :type constraints: :class:`juju.Constraints` @@ -1000,9 +1077,9 @@ class Model(object): os.path.isdir(entity_url) ) if is_local: - entity_id = entity_url + entity_id = entity_url.replace('local:', '') else: - entity = await self.charmstore.entity(entity_url) + entity = await self.charmstore.entity(entity_url, channel=channel) entity_id = entity['Id'] client_facade = client.ClientFacade.from_connection(self.connection) @@ -1034,8 +1111,6 @@ class Model(object): application_name = entity['Meta']['charm-metadata']['Name'] if not series: series = self._get_series(entity_url, entity) - if not channel: - channel = 'stable' await client_facade.AddCharm(channel, entity_id) # XXX: we're dropping local resources here, but we don't # actually support them yet anyway @@ -1181,11 +1256,20 @@ class Model(object): """ raise NotImplementedError() - def get_config(self): + async def get_config(self): """Return the configuration settings for this model. + :returns: A ``dict`` mapping keys to `ConfigValue` instances, + which have `source` and `value` attributes. """ - raise NotImplementedError() + config_facade = client.ModelConfigFacade.from_connection( + self.connection + ) + result = await config_facade.ModelGet() + config = result.config + for key, value in config.items(): + config[key] = ConfigValue.from_json(value) + return config def get_constraints(self): """Return the machine constraints for this model. @@ -1366,13 +1450,19 @@ class Model(object): """ raise NotImplementedError() - def set_config(self, **config): + async def set_config(self, config): """Set configuration keys on this model. - :param \*\*config: Config key/values - + :param dict config: Mapping of config keys to either string values or + `ConfigValue` instances, as returned by `get_config`. """ - raise NotImplementedError() + config_facade = client.ModelConfigFacade.from_connection( + self.connection + ) + for key, value in config.items(): + if isinstance(value, ConfigValue): + config[key] = value.value + await config_facade.ModelSet(config) def set_constraints(self, constraints): """Set machine constraints on this model. @@ -1860,6 +1950,12 @@ class CharmStore(object): class CharmArchiveGenerator(object): + """ + Create a Zip archive of a local charm directory for upload to a controller. + + This is used automatically by + `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_. + """ def __init__(self, path): self.path = os.path.abspath(os.path.expanduser(path))