import theblues.charmstore
import theblues.errors
-from . import tag
+from . import tag, utils
from .client import client
from .client import connection
from .constraints import parse as parse_constraints, normalize_key
class ModelObserver(object):
+ """
+ Base class for creating observers that react to changes in a model.
+ """
async def __call__(self, delta, old, new, model):
handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
method = getattr(self, handler_name, self.on_change)
async def on_change(self, delta, old, new, model):
"""Generic model-change handler.
+ This should be overridden in a subclass.
+
:param delta: :class:`juju.client.overrides.Delta`
:param old: :class:`juju.model.ModelEntity`
:param new: :class:`juju.model.ModelEntity`
class Model(object):
+ """
+ The main API for interacting with a Juju model.
+ """
def __init__(self, loop=None):
"""Instantiate a new connected Model.
self.observers = weakref.WeakValueDictionary()
self.state = ModelState(self)
self.info = None
- self._watcher_task = None
- self._watch_shutdown = asyncio.Event(loop=self.loop)
+ self._watch_stopping = asyncio.Event(loop=self.loop)
+ self._watch_stopped = asyncio.Event(loop=self.loop)
self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
+ async def __aenter__(self):
+ await self.connect_current()
+ return self
+
+ async def __aexit__(self, exc_type, exc, tb):
+ await self.disconnect()
+
+ if exc_type is not None:
+ return False
+
async def connect(self, *args, **kw):
"""Connect to an arbitrary Juju model.
"""Shut down the watcher task and close websockets.
"""
- self._stop_watching()
if self.connection and self.connection.is_open:
- await self._watch_shutdown.wait()
+ log.debug('Stopping watcher task')
+ self._watch_stopping.set()
+ await self._watch_stopped.wait()
log.debug('Closing model connection')
await self.connection.close()
self.connection = None
"""
async def _start_watch():
- self._watch_shutdown.clear()
try:
- self._watch_conn = await self.connection.clone()
allwatcher = client.AllWatcherFacade.from_connection(
- self._watch_conn)
- while True:
- results = await allwatcher.Next()
+ self.connection)
+ while not self._watch_stopping.is_set():
+ results = await utils.run_with_interrupt(
+ allwatcher.Next(),
+ self._watch_stopping,
+ self.loop)
+ if self._watch_stopping.is_set():
+ break
for delta in results.deltas:
delta = get_entity_delta(delta)
old_obj, new_obj = self.state.apply_delta(delta)
- # XXX: Might not want to shield at this level
- # We are shielding because when the watcher is
- # canceled (on disconnect()), we don't want all of
- # its children (every observer callback) to be
- # canceled with it. So we shield them. But this means
- # they can *never* be canceled.
- await asyncio.shield(
- self._notify_observers(delta, old_obj, new_obj),
- loop=self.loop)
+ await self._notify_observers(delta, old_obj, new_obj)
self._watch_received.set()
except CancelledError:
- log.debug('Closing watcher connection')
- await self._watch_conn.close()
- self._watch_shutdown.set()
- self._watch_conn = None
- except Exception as e:
+ pass
+ except Exception:
log.exception('Error in watcher')
raise
+ finally:
+ self._watch_stopped.set()
log.debug('Starting watcher task')
- self._watcher_task = self.loop.create_task(_start_watch())
-
- def _stop_watching(self):
- """Stop the asynchronous watch against this model.
-
- """
- log.debug('Stopping watcher task')
- if self._watcher_task:
- self._watcher_task.cancel()
+ self._watch_received.clear()
+ self._watch_stopping.clear()
+ self._watch_stopped.clear()
+ self.loop.create_task(_start_watch())
async def _notify_observers(self, delta, old_obj, new_obj):
"""Call observing callbacks, notifying them of a change in model state
'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
'maas2.name' - acquire machine maas2.name on MAAS
- :param dict constraints: Machine constraints
+ :param dict constraints: Machine constraints, which can contain the
+ the following keys::
+
+ arch : str
+ container : str
+ cores : int
+ cpu_power : int
+ instance_type : str
+ mem : int
+ root_disk : int
+ spaces : list(str)
+ tags : list(str)
+ virt_type : str
+
Example::
constraints={
'mem': 256 * MB,
+ 'tags': ['virtual'],
}
- :param list disks: List of disk constraint dictionaries
+ :param list disks: List of disk constraint dictionaries, which can
+ contain the following keys::
+
+ count : int
+ pool : str
+ size : int
+
Example::
disks=[{
:param dict bind: <charm endpoint>:<network space> pairs
:param dict budget: <budget name>:<limit> pairs
:param str channel: Charm store channel from which to retrieve
- the charm or bundle, e.g. 'development'
+ the charm or bundle, e.g. 'edge'
:param dict config: Charm configuration dictionary
:param constraints: Service constraints
:type constraints: :class:`juju.Constraints`
os.path.isdir(entity_url)
)
if is_local:
- entity_id = entity_url
+ entity_id = entity_url.replace('local:', '')
else:
- entity = await self.charmstore.entity(entity_url)
+ entity = await self.charmstore.entity(entity_url, channel=channel)
entity_id = entity['Id']
client_facade = client.ClientFacade.from_connection(self.connection)
application_name = entity['Meta']['charm-metadata']['Name']
if not series:
series = self._get_series(entity_url, entity)
- if not channel:
- channel = 'stable'
await client_facade.AddCharm(channel, entity_id)
# XXX: we're dropping local resources here, but we don't
# actually support them yet anyway
class CharmArchiveGenerator(object):
+ """
+ Create a Zip archive of a local charm directory for upload to a controller.
+
+ This is used automatically by
+ `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_.
+ """
def __init__(self, path):
self.path = os.path.abspath(os.path.expanduser(path))