+import asyncio
+import logging
+from concurrent.futures import CancelledError
+
+from .client import client
+from .client import watcher
+from .client import connection
+from .delta import get_entity_delta
+
+log = logging.getLogger(__name__)
+
+
+class ModelObserver(object):
+ def __call__(self, delta, old, new, model):
+ if old is None and new is not None:
+ type_ = 'add'
+ else:
+ type_ = delta.type
+ handler_name = 'on_{}_{}'.format(delta.entity, type_)
+ method = getattr(self, handler_name, self.on_change)
+ log.debug(
+ 'Model changed: %s %s %s',
+ delta.entity, delta.type, delta.get_id())
+ method(delta, old, new, model)
+
+ def on_change(self, delta, old, new, model):
+ pass
+
+
+class ModelEntity(object):
+ """An object in the Model tree"""
+
+ def __init__(self, data, model):
+ """Initialize a new entity
+
+ :param data: dict of data from a watcher delta
+ :param model: The model instance in whose object tree this
+ entity resides
+
+ """
+ self.data = data
+ self.model = model
+ self.connection = model.connection
+
+ def __getattr__(self, name):
+ return self.data[name]
+
+
class Model(object):
+ def __init__(self, loop=None):
+ """Instantiate a new connected Model.
+
+ :param loop: an asyncio event loop
+
+ """
+ self.loop = loop or asyncio.get_event_loop()
+ self.connection = None
+ self.observers = set()
+ self.state = dict()
+ self._watcher_task = None
+ self._watch_shutdown = asyncio.Event(loop=loop)
+ self._watch_received = asyncio.Event(loop=loop)
+
+ async def connect_current(self):
+ self.connection = await connection.Connection.connect_current()
+ self._watch()
+ await self._watch_received.wait()
+
+ async def disconnect(self):
+ self._stop_watching()
+ if self.connection and self.connection.is_open:
+ await self._watch_shutdown.wait()
+ log.debug('Closing model connection')
+ await asyncio.wait_for(self.connection.close(), None)
+ self.connection = None
+
+ def all_units_idle(self):
+ """Return True if all units are idle.
+
+ """
+ for unit in self.units.values():
+ unit_status = unit.data['agent-status']['current']
+ if unit_status != 'idle':
+ return False
+ return True
+
+ async def reset(self, force=False):
+ for app in self.applications.values():
+ await app.destroy()
+ for machine in self.machines.values():
+ await machine.destroy(force=force)
+
+ async def block_until(self, func):
+ async def _block():
+ while not func():
+ await asyncio.sleep(.1)
+ await asyncio.wait_for(_block(), None)
+
+ @property
+ def applications(self):
+ return self.state.get('application', {})
+
+ @property
+ def machines(self):
+ return self.state.get('machine', {})
+
+ @property
+ def units(self):
+ return self.state.get('unit', {})
+
+ def add_observer(self, callable_):
+ """Register an "on-model-change" callback
+
+ Once a watch is started (Model.watch() is called), ``callable_``
+ will be called each time the model changes. callable_ should
+ accept the following positional arguments:
+
+ delta - An instance of :class:`juju.delta.EntityDelta`
+ containing the raw delta data recv'd from the Juju
+ websocket.
+
+ old_obj - If the delta modifies an existing object in the model,
+ old_obj will be a copy of that object, as it was before the
+ delta was applied. Will be None if the delta creates a new
+ entity in the model.
+
+ new_obj - A copy of the new or updated object, after the delta
+ is applied. Will be None if the delta removes an entity
+ from the model.
+
+ model - The :class:`Model` itself.
+
+ """
+ self.observers.add(callable_)
+
+ def _watch(self):
+ """Start an asynchronous watch against this model.
+
+ See :meth:`add_observer` to register an onchange callback.
+
+ """
+ async def _start_watch():
+ self._watch_shutdown.clear()
+ try:
+ allwatcher = watcher.AllWatcher()
+ self._watch_conn = await self.connection.clone()
+ allwatcher.connect(self._watch_conn)
+ while True:
+ results = await allwatcher.Next()
+ for delta in results.deltas:
+ delta = get_entity_delta(delta)
+ old_obj, new_obj = self._apply_delta(delta)
+ self._notify_observers(delta, old_obj, new_obj)
+ self._watch_received.set()
+ except CancelledError:
+ log.debug('Closing watcher connection')
+ await asyncio.wait_for(self._watch_conn.close(), None)
+ self._watch_shutdown.set()
+ self._watch_conn = None
+
+ log.debug('Starting watcher task')
+ self._watcher_task = self.loop.create_task(_start_watch())
+
+ def _stop_watching(self):
+ """Stop the asynchronous watch against this model.
+
+ """
+ log.debug('Stopping watcher task')
+ if self._watcher_task:
+ self._watcher_task.cancel()
+
+ def _apply_delta(self, delta):
+ """Apply delta to our model state and return the a copy of the
+ affected object as it was before and after the update, e.g.:
+
+ old_obj, new_obj = self._apply_delta(delta)
+
+ old_obj may be None if the delta is for the creation of a new object,
+ e.g. a new application or unit is deployed.
+
+ new_obj may be None if no object was created or updated, or if an
+ object was deleted as a result of the delta being applied.
+
+ """
+ old_obj, new_obj = None, None
+
+ if (delta.entity in self.state and
+ delta.get_id() in self.state[delta.entity]):
+ old_obj = self.state[delta.entity][delta.get_id()]
+ if delta.type == 'remove':
+ del self.state[delta.entity][delta.get_id()]
+ return old_obj, new_obj
+
+ new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = (
+ self._create_model_entity(delta))
+
+ return old_obj, new_obj
+
+ def _create_model_entity(self, delta):
+ """Return an object instance representing the entity created or
+ updated by ``delta``
+
+ """
+ entity_class = delta.get_entity_class()
+ return entity_class(delta.data, self)
+
+ def _notify_observers(self, delta, old_obj, new_obj):
+ """Call observing callbacks, notifying them of a change in model state
+
+ :param delta: The raw change from the watcher
+ (:class:`juju.client.overrides.Delta`)
+ :param old_obj: The object in the model that this delta updates.
+ May be None.
+ :param new_obj: The object in the model that is created or updated
+ by applying this delta.
+
+ """
+ for o in self.observers:
+ o(delta, old_obj, new_obj, self)
+
def add_machine(
self, spec=None, constraints=None, disks=None, series=None,
count=1):
:param str spec: Machine specification
Examples::
+
(None) - starts a new machine
'lxc' - starts a new machine with on lxc container
'lxc:4' - starts a new lxc container on machine 4
'ssh:user@10.10.0.3' - manually provisions a machine with ssh
'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
'maas2.name' - acquire machine maas2.name on MAAS
- :param :class:`juju.Constraints` constraints: Machine constraints
+ :param constraints: Machine constraints
+ :type constraints: :class:`juju.Constraints`
:param list disks: List of disk :class:`constraints <juju.Constraints>`
:param str series: Series
:param int count: Number of machines to deploy
pass
add_machines = add_machine
- def add_relation(self, relation1, relation2):
- """Add a relation between two services.
+ async def add_relation(self, relation1, relation2):
+ """Add a relation between two applications.
- :param str relation1: '<service>[:<relation_name>]'
- :param str relation2: '<service>[:<relation_name>]'
+ :param str relation1: '<application>[:<relation_name>]'
+ :param str relation2: '<application>[:<relation_name>]'
"""
- pass
+ app_facade = client.ApplicationFacade()
+ app_facade.connect(self.connection)
+
+ log.debug(
+ 'Adding relation %s <-> %s', relation1, relation2)
+
+ return await app_facade.AddRelation([relation1, relation2])
def add_space(self, name, *cidrs):
"""Add a new network space.
"""
pass
- def deploy(
+ async def deploy(
self, entity_url, service_name=None, bind=None, budget=None,
channel=None, config=None, constraints=None, force=False,
- num_units=1, plan=None, resource=None, series=None, storage=None,
+ num_units=1, plan=None, resources=None, series=None, storage=None,
to=None):
"""Deploy a new service or bundle.
:param str channel: Charm store channel from which to retrieve
the charm or bundle, e.g. 'development'
:param dict config: Charm configuration dictionary
- :param :class:`juju.Constraints` constraints: Service constraints
+ :param constraints: Service constraints
+ :type constraints: :class:`juju.Constraints`
:param bool force: Allow charm to be deployed to a machine running
an unsupported series
:param int num_units: Number of units to deploy
:param str plan: Plan under which to deploy charm
- :param dict resource: <resource name>:<file path> pairs
+ :param dict resources: <resource name>:<file path> pairs
:param str series: Series on which to deploy
:param dict storage: Storage constraints TODO how do these look?
:param str to: Placement directive, e.g.::
+
'23' - machine 23
'lxc:7' - new lxc container on machine 7
'24/lxc/3' - lxc container 3 or machine 24
If None, a new machine is provisioned.
- """
- pass
+
+ TODO::
+
+ - entity_url must have a revision; look up latest automatically if
+ not provided by caller
+ - service_name is required; fill this in automatically if not
+ provided by caller
+ - series is required; how do we pick a default?
+
+ """
+ if constraints:
+ constraints = client.Value(**constraints)
+
+ if to:
+ placement = [
+ client.Placement(**p) for p in to
+ ]
+ else:
+ placement = []
+
+ if storage:
+ storage = {
+ k: client.Constraints(**v)
+ for k, v in storage.items()
+ }
+
+ app_facade = client.ApplicationFacade()
+ client_facade = client.ClientFacade()
+ app_facade.connect(self.connection)
+ client_facade.connect(self.connection)
+
+ log.debug(
+ 'Deploying %s', entity_url)
+
+ await client_facade.AddCharm(channel, entity_url)
+ app = client.ApplicationDeploy(
+ application=service_name,
+ channel=channel,
+ charm_url=entity_url,
+ config=config,
+ constraints=constraints,
+ endpoint_bindings=bind,
+ num_units=num_units,
+ placement=placement,
+ resources=resources,
+ series=series,
+ storage=storage,
+ )
+
+ return await app_facade.Deploy([app])
def destroy(self):
"""Terminate all machines and resources for this model.
"""Ensure sufficient controllers exist to provide redundancy.
:param int num_controllers: Number of controllers to make available
- :param :class:`juju.Constraints` constraints: Constraints to apply
- to the controller machines
+ :param constraints: Constraints to apply to the controller machines
+ :type constraints: :class:`juju.Constraints`
:param str series: Series of the controller machines
:param list to: Placement directives for controller machines, e.g.::
+
'23' - machine 23
'lxc:7' - new lxc container on machine 7
'24/lxc/3' - lxc container 3 or machine 24
pass
import_ssh_keys = import_ssh_key
- def get_machines(self, utc=False):
+ def get_machines(self, machine, utc=False):
"""Return list of machines in this model.
+ :param str machine: Machine id, e.g. '0'
:param bool utc: Display time as UTC in RFC3339 format
"""
pass
remove_ssh_keys = remove_ssh_key
- def restore_backup(self):
+ def restore_backup(
+ self, bootstrap=False, constraints=None, archive=None,
+ backup_id=None, upload_tools=False):
"""Restore a backup archive to a new controller.
+ :param bool bootstrap: Bootstrap a new state machine
+ :param constraints: Model constraints
+ :type constraints: :class:`juju.Constraints`
+ :param str archive: Path to backup archive to restore
+ :param str backup_id: Id of backup to restore
+ :param bool upload_tools: Upload tools if bootstrapping a new machine
+
"""
pass
"""
pass
- def revoke(self):
+ def revoke(self, username, acl='read'):
"""Revoke a user's access to this model.
+ :param str username: Username to revoke
+ :param str acl: Access control ('read' or 'write')
+
"""
pass
- def run(self):
+ def run(self, command, timeout=None):
"""Run command on all machines in this model.
+ :param str command: The command to run
+ :param int timeout: Time to wait before command is considered failed
+
"""
pass
- def set_config(self):
+ def set_config(self, **config):
"""Set configuration keys on this model.
+ :param \*\*config: Config key/values
+
"""
pass
- def set_constraints(self):
+ def set_constraints(self, constraints):
"""Set machine constraints on this model.
+ :param :class:`juju.Constraints` constraints: Machine constraints
+
"""
pass
- def get_action_output(self, action_uuid):
+ def get_action_output(self, action_uuid, wait=-1):
"""Get the results of an action by ID.
+ :param str action_uuid: Id of the action
+ :param int wait: Time in seconds to wait for action to complete
+
"""
pass
- def get_action_status(self, uuid_or_prefix):
- """Get the status of all actions, filtered by ID or prefix.
+ def get_action_status(self, uuid_or_prefix=None, name=None):
+ """Get the status of all actions, filtered by ID, ID prefix, or action name.
+
+ :param str uuid_or_prefix: Filter by action uuid or prefix
+ :param str name: Filter by action name
"""
pass
def get_budget(self, budget_name):
- """Get budget by name.
+ """Get budget usage info.
+
+ :param str budget_name: Name of budget
"""
pass
- def get_status(self):
+ def get_status(self, filter_=None, utc=False):
"""Return the status of the model.
+ :param str filter_: Service or unit name or wildcard ('*')
+ :param bool utc: Display time as UTC in RFC3339 format
+
"""
pass
status = get_status
- def sync_tools(self):
+ def sync_tools(
+ self, all_=False, destination=None, dry_run=False, public=False,
+ source=None, stream=None, version=None):
"""Copy Juju tools into this model.
+ :param bool all_: Copy all versions, not just the latest
+ :param str destination: Path to local destination directory
+ :param bool dry_run: Don't do the actual copy
+ :param bool public: Tools are for a public cloud, so generate mirrors
+ information
+ :param str source: Path to local source directory
+ :param str stream: Simplestreams stream for which to sync metadata
+ :param str version: Copy a specific major.minor version
+
"""
pass
- def unblock(self, operation):
+ def unblock(self, *commands):
"""Unblock an operation that would alter this model.
+ :param str \*commands: The commands to unblock. Valid values are
+ 'all-changes', 'destroy-model', 'remove-object'
+
"""
pass
- def unset_config(self):
+ def unset_config(self, *keys):
"""Unset configuration on this model.
+ :param str \*keys: The keys to unset
+
"""
pass
"""
pass
- def upload_backup(self):
+ def upgrade_juju(
+ self, dry_run=False, reset_previous_upgrade=False,
+ upload_tools=False, version=None):
+ """Upgrade Juju on all machines in a model.
+
+ :param bool dry_run: Don't do the actual upgrade
+ :param bool reset_previous_upgrade: Clear the previous (incomplete)
+ upgrade status
+ :param bool upload_tools: Upload local version of tools
+ :param str version: Upgrade to a specific version
+
+ """
+ pass
+
+ def upload_backup(self, archive_path):
"""Store a backup archive remotely in Juju.
+ :param str archive_path: Path to local archive
+
"""
pass