from functools import partial
from pathlib import Path
+import websockets
import yaml
import theblues.charmstore
import theblues.errors
from . import tag, utils
from .client import client
from .client import connection
+from .client.client import ConfigValue
from .constraints import parse as parse_constraints, normalize_key
from .delta import get_entity_delta
from .delta import get_entity_class
class ModelObserver(object):
+ """
+ Base class for creating observers that react to changes in a model.
+ """
async def __call__(self, delta, old, new, model):
handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
method = getattr(self, handler_name, self.on_change)
async def on_change(self, delta, old, new, model):
"""Generic model-change handler.
+ This should be overridden in a subclass.
+
:param delta: :class:`juju.client.overrides.Delta`
:param old: :class:`juju.model.ModelEntity`
:param new: :class:`juju.model.ModelEntity`
class Model(object):
- def __init__(self, loop=None):
+ """
+ The main API for interacting with a Juju model.
+ """
+ def __init__(self, loop=None,
+ max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE):
"""Instantiate a new connected Model.
:param loop: an asyncio event loop
+ :param max_frame_size: See
+ `juju.client.connection.Connection.MAX_FRAME_SIZE`
"""
self.loop = loop or asyncio.get_event_loop()
+ self.max_frame_size = max_frame_size
self.connection = None
self.observers = weakref.WeakValueDictionary()
self.state = ModelState(self)
self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
+ async def __aenter__(self):
+ await self.connect_current()
+ return self
+
+ async def __aexit__(self, exc_type, exc, tb):
+ await self.disconnect()
+
+ if exc_type is not None:
+ return False
+
async def connect(self, *args, **kw):
"""Connect to an arbitrary Juju model.
"""
if 'loop' not in kw:
kw['loop'] = self.loop
+ if 'max_frame_size' not in kw:
+ kw['max_frame_size'] = self.max_frame_size
self.connection = await connection.Connection.connect(*args, **kw)
await self._after_connect()
"""
self.connection = await connection.Connection.connect_current(
- self.loop)
+ self.loop, max_frame_size=self.max_frame_size)
await self._after_connect()
async def connect_model(self, model_name):
:param model_name: Format [controller:][user/]model
"""
- self.connection = await connection.Connection.connect_model(model_name,
- self.loop)
+ self.connection = await connection.Connection.connect_model(
+ model_name, self.loop, self.max_frame_size)
await self._after_connect()
async def _after_connect(self):
"""
async def _block():
while not all(c() for c in conditions):
+ if not (self.connection and self.connection.is_open):
+ raise websockets.ConnectionClosed(1006, 'no reason')
await asyncio.sleep(wait_period, loop=self.loop)
await asyncio.wait_for(_block(), timeout, loop=self.loop)
See :meth:`add_observer` to register an onchange callback.
"""
- async def _start_watch():
+ async def _all_watcher():
try:
allwatcher = client.AllWatcherFacade.from_connection(
self.connection)
while not self._watch_stopping.is_set():
- results = await utils.run_with_interrupt(
- allwatcher.Next(),
- self._watch_stopping,
- self.loop)
+ try:
+ results = await utils.run_with_interrupt(
+ allwatcher.Next(),
+ self._watch_stopping,
+ self.loop)
+ except JujuAPIError as e:
+ if 'watcher was stopped' not in str(e):
+ raise
+ if self._watch_stopping.is_set():
+ # this shouldn't ever actually happen, because
+ # the event should trigger before the controller
+ # has a chance to tell us the watcher is stopped
+ # but handle it gracefully, just in case
+ break
+ # controller stopped our watcher for some reason
+ # but we're not actually stopping, so just restart it
+ log.warning(
+ 'Watcher: watcher stopped, restarting')
+ del allwatcher.Id
+ continue
+ except websockets.ConnectionClosed:
+ monitor = self.connection.monitor
+ if monitor.status == monitor.ERROR:
+ # closed unexpectedly, try to reopen
+ log.warning(
+ 'Watcher: connection closed, reopening')
+ await self.connection.reconnect()
+ del allwatcher.Id
+ continue
+ else:
+ # closed on request, go ahead and shutdown
+ break
if self._watch_stopping.is_set():
+ await allwatcher.Stop()
break
for delta in results.deltas:
delta = get_entity_delta(delta)
self._watch_received.clear()
self._watch_stopping.clear()
self._watch_stopped.clear()
- self.loop.create_task(_start_watch())
+ self.loop.create_task(_all_watcher())
async def _notify_observers(self, delta, old_obj, new_obj):
"""Call observing callbacks, notifying them of a change in model state
'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
'maas2.name' - acquire machine maas2.name on MAAS
- :param dict constraints: Machine constraints
+ :param dict constraints: Machine constraints, which can contain the
+ the following keys::
+
+ arch : str
+ container : str
+ cores : int
+ cpu_power : int
+ instance_type : str
+ mem : int
+ root_disk : int
+ spaces : list(str)
+ tags : list(str)
+ virt_type : str
+
Example::
constraints={
'mem': 256 * MB,
+ 'tags': ['virtual'],
}
- :param list disks: List of disk constraint dictionaries
+ :param list disks: List of disk constraint dictionaries, which can
+ contain the following keys::
+
+ count : int
+ pool : str
+ size : int
+
Example::
disks=[{
:param dict bind: <charm endpoint>:<network space> pairs
:param dict budget: <budget name>:<limit> pairs
:param str channel: Charm store channel from which to retrieve
- the charm or bundle, e.g. 'development'
+ the charm or bundle, e.g. 'edge'
:param dict config: Charm configuration dictionary
:param constraints: Service constraints
:type constraints: :class:`juju.Constraints`
if is_local:
entity_id = entity_url.replace('local:', '')
else:
- entity = await self.charmstore.entity(entity_url)
+ entity = await self.charmstore.entity(entity_url, channel=channel)
entity_id = entity['Id']
client_facade = client.ClientFacade.from_connection(self.connection)
application_name = entity['Meta']['charm-metadata']['Name']
if not series:
series = self._get_series(entity_url, entity)
- if not channel:
- channel = 'stable'
await client_facade.AddCharm(channel, entity_id)
# XXX: we're dropping local resources here, but we don't
# actually support them yet anyway
"""
raise NotImplementedError()
- def get_config(self):
+ async def get_config(self):
"""Return the configuration settings for this model.
+ :returns: A ``dict`` mapping keys to `ConfigValue` instances,
+ which have `source` and `value` attributes.
"""
- raise NotImplementedError()
+ config_facade = client.ModelConfigFacade.from_connection(
+ self.connection
+ )
+ result = await config_facade.ModelGet()
+ config = result.config
+ for key, value in config.items():
+ config[key] = ConfigValue.from_json(value)
+ return config
def get_constraints(self):
"""Return the machine constraints for this model.
"""
raise NotImplementedError()
- def set_config(self, **config):
+ async def set_config(self, config):
"""Set configuration keys on this model.
- :param \*\*config: Config key/values
-
+ :param dict config: Mapping of config keys to either string values or
+ `ConfigValue` instances, as returned by `get_config`.
"""
- raise NotImplementedError()
+ config_facade = client.ModelConfigFacade.from_connection(
+ self.connection
+ )
+ for key, value in config.items():
+ if isinstance(value, ConfigValue):
+ config[key] = value.value
+ await config_facade.ModelSet(config)
def set_constraints(self, constraints):
"""Set machine constraints on this model.
class CharmArchiveGenerator(object):
+ """
+ Create a Zip archive of a local charm directory for upload to a controller.
+
+ This is used automatically by
+ `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_.
+ """
def __init__(self, path):
self.path = os.path.abspath(os.path.expanduser(path))