X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=4db711b479b68ebbb79bc783303a1e5dad92b0a5;hb=0491d0bde941bbbda28dec771f71ac5cc41f21e6;hp=f162c7e8565592b66c28d7ae4d0ea18b66b59496;hpb=1a3cee44420e79fda92943edf636eaddb393145e;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index f162c7e..4db711b 100644 --- a/juju/model.py +++ b/juju/model.py @@ -18,7 +18,7 @@ import yaml import theblues.charmstore import theblues.errors -from . import tag +from . import tag, utils from .client import client from .client import connection from .constraints import parse as parse_constraints, normalize_key @@ -76,6 +76,9 @@ class _Observer(object): class ModelObserver(object): + """ + Base class for creating observers that react to changes in a model. + """ async def __call__(self, delta, old, new, model): handler_name = 'on_{}_{}'.format(delta.entity, delta.type) method = getattr(self, handler_name, self.on_change) @@ -84,6 +87,8 @@ class ModelObserver(object): async def on_change(self, delta, old, new, model): """Generic model-change handler. + This should be overridden in a subclass. + :param delta: :class:`juju.client.overrides.Delta` :param old: :class:`juju.model.ModelEntity` :param new: :class:`juju.model.ModelEntity` @@ -374,6 +379,9 @@ class ModelEntity(object): class Model(object): + """ + The main API for interacting with a Juju model. + """ def __init__(self, loop=None): """Instantiate a new connected Model. @@ -385,11 +393,21 @@ class Model(object): self.observers = weakref.WeakValueDictionary() self.state = ModelState(self) self.info = None - self._watcher_task = None - self._watch_shutdown = asyncio.Event(loop=self.loop) + self._watch_stopping = asyncio.Event(loop=self.loop) + self._watch_stopped = asyncio.Event(loop=self.loop) self._watch_received = asyncio.Event(loop=self.loop) self._charmstore = CharmStore(self.loop) + async def __aenter__(self): + await self.connect_current() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.disconnect() + + if exc_type is not None: + return False + async def connect(self, *args, **kw): """Connect to an arbitrary Juju model. @@ -431,9 +449,10 @@ class Model(object): """Shut down the watcher task and close websockets. """ - self._stop_watching() if self.connection and self.connection.is_open: - await self._watch_shutdown.wait() + log.debug('Stopping watcher task') + self._watch_stopping.set() + await self._watch_stopped.wait() log.debug('Closing model connection') await self.connection.close() self.connection = None @@ -619,45 +638,34 @@ class Model(object): """ async def _start_watch(): - self._watch_shutdown.clear() try: - self._watch_conn = await self.connection.clone() allwatcher = client.AllWatcherFacade.from_connection( - self._watch_conn) - while True: - results = await allwatcher.Next() + self.connection) + while not self._watch_stopping.is_set(): + results = await utils.run_with_interrupt( + allwatcher.Next(), + self._watch_stopping, + self.loop) + if self._watch_stopping.is_set(): + break for delta in results.deltas: delta = get_entity_delta(delta) old_obj, new_obj = self.state.apply_delta(delta) - # XXX: Might not want to shield at this level - # We are shielding because when the watcher is - # canceled (on disconnect()), we don't want all of - # its children (every observer callback) to be - # canceled with it. So we shield them. But this means - # they can *never* be canceled. - await asyncio.shield( - self._notify_observers(delta, old_obj, new_obj), - loop=self.loop) + await self._notify_observers(delta, old_obj, new_obj) self._watch_received.set() except CancelledError: - log.debug('Closing watcher connection') - await self._watch_conn.close() - self._watch_shutdown.set() - self._watch_conn = None - except Exception as e: + pass + except Exception: log.exception('Error in watcher') raise + finally: + self._watch_stopped.set() log.debug('Starting watcher task') - self._watcher_task = self.loop.create_task(_start_watch()) - - def _stop_watching(self): - """Stop the asynchronous watch against this model. - - """ - log.debug('Stopping watcher task') - if self._watcher_task: - self._watcher_task.cancel() + self._watch_received.clear() + self._watch_stopping.clear() + self._watch_stopped.clear() + self.loop.create_task(_start_watch()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state @@ -755,14 +763,34 @@ class Model(object): 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS 'maas2.name' - acquire machine maas2.name on MAAS - :param dict constraints: Machine constraints + :param dict constraints: Machine constraints, which can contain the + the following keys:: + + arch : str + container : str + cores : int + cpu_power : int + instance_type : str + mem : int + root_disk : int + spaces : list(str) + tags : list(str) + virt_type : str + Example:: constraints={ 'mem': 256 * MB, + 'tags': ['virtual'], } - :param list disks: List of disk constraint dictionaries + :param list disks: List of disk constraint dictionaries, which can + contain the following keys:: + + count : int + pool : str + size : int + Example:: disks=[{ @@ -974,7 +1002,7 @@ class Model(object): :param dict bind: : pairs :param dict budget: : pairs :param str channel: Charm store channel from which to retrieve - the charm or bundle, e.g. 'development' + the charm or bundle, e.g. 'edge' :param dict config: Charm configuration dictionary :param constraints: Service constraints :type constraints: :class:`juju.Constraints` @@ -1010,9 +1038,9 @@ class Model(object): os.path.isdir(entity_url) ) if is_local: - entity_id = entity_url + entity_id = entity_url.replace('local:', '') else: - entity = await self.charmstore.entity(entity_url) + entity = await self.charmstore.entity(entity_url, channel=channel) entity_id = entity['Id'] client_facade = client.ClientFacade.from_connection(self.connection) @@ -1044,8 +1072,6 @@ class Model(object): application_name = entity['Meta']['charm-metadata']['Name'] if not series: series = self._get_series(entity_url, entity) - if not channel: - channel = 'stable' await client_facade.AddCharm(channel, entity_id) # XXX: we're dropping local resources here, but we don't # actually support them yet anyway @@ -1870,6 +1896,12 @@ class CharmStore(object): class CharmArchiveGenerator(object): + """ + Create a Zip archive of a local charm directory for upload to a controller. + + This is used automatically by + `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_. + """ def __init__(self, path): self.path = os.path.abspath(os.path.expanduser(path))