X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=160707b775888d464061bd2f238b7d82ea7ee09d;hb=d6348ef405d6e87a4f72e7842ea4376678456283;hp=6eb4e9fe1a0b6f06294d8046d540ac22f1cc3189;hpb=d2fa820fda0498109ed316c5e1506695f151ce18;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index 6eb4e9f..160707b 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,5 +1,7 @@ import asyncio +import base64 import collections +import hashlib import json import logging import os @@ -13,16 +15,18 @@ from functools import partial from pathlib import Path import yaml -from theblues import charmstore +import theblues.charmstore +import theblues.errors +from . import tag, utils from .client import client -from .client import watcher from .client import connection from .constraints import parse as parse_constraints, normalize_key from .delta import get_entity_delta from .delta import get_entity_class from .exceptions import DeadEntityException from .errors import JujuError, JujuAPIError +from .placement import parse as parse_placement log = logging.getLogger(__name__) @@ -72,6 +76,9 @@ class _Observer(object): class ModelObserver(object): + """ + Base class for creating observers that react to changes in a model. + """ async def __call__(self, delta, old, new, model): handler_name = 'on_{}_{}'.format(delta.entity, delta.type) method = getattr(self, handler_name, self.on_change) @@ -80,6 +87,8 @@ class ModelObserver(object): async def on_change(self, delta, old, new, model): """Generic model-change handler. + This should be overridden in a subclass. + :param delta: :class:`juju.client.overrides.Delta` :param old: :class:`juju.model.ModelEntity` :param new: :class:`juju.model.ModelEntity` @@ -228,7 +237,14 @@ class ModelEntity(object): model. """ - return self.safe_data[name] + try: + return self.safe_data[name] + except KeyError: + name = name.replace('_', '-') + if name in self.safe_data: + return self.safe_data[name] + else: + raise def __bool__(self): return bool(self.data) @@ -363,6 +379,9 @@ class ModelEntity(object): class Model(object): + """ + The main API for interacting with a Juju model. + """ def __init__(self, loop=None): """Instantiate a new connected Model. @@ -374,17 +393,29 @@ class Model(object): self.observers = weakref.WeakValueDictionary() self.state = ModelState(self) self.info = None - self._watcher_task = None - self._watch_shutdown = asyncio.Event(loop=loop) - self._watch_received = asyncio.Event(loop=loop) + self._watch_stopping = asyncio.Event(loop=self.loop) + self._watch_stopped = asyncio.Event(loop=self.loop) + self._watch_received = asyncio.Event(loop=self.loop) self._charmstore = CharmStore(self.loop) + async def __aenter__(self): + await self.connect_current() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.disconnect() + + if exc_type is not None: + return False + async def connect(self, *args, **kw): """Connect to an arbitrary Juju model. args and kw are passed through to Connection.connect() """ + if 'loop' not in kw: + kw['loop'] = self.loop self.connection = await connection.Connection.connect(*args, **kw) await self._after_connect() @@ -392,7 +423,8 @@ class Model(object): """Connect to the current Juju model. """ - self.connection = await connection.Connection.connect_current() + self.connection = await connection.Connection.connect_current( + self.loop) await self._after_connect() async def connect_model(self, model_name): @@ -401,7 +433,8 @@ class Model(object): :param model_name: Format [controller:][user/]model """ - self.connection = await connection.Connection.connect_model(model_name) + self.connection = await connection.Connection.connect_model(model_name, + self.loop) await self._after_connect() async def _after_connect(self): @@ -416,9 +449,10 @@ class Model(object): """Shut down the watcher task and close websockets. """ - self._stop_watching() if self.connection and self.connection.is_open: - await self._watch_shutdown.wait() + log.debug('Stopping watcher task') + self._watch_stopping.set() + await self._watch_stopped.wait() log.debug('Closing model connection') await self.connection.close() self.connection = None @@ -510,8 +544,8 @@ class Model(object): """ async def _block(): while not all(c() for c in conditions): - await asyncio.sleep(wait_period) - await asyncio.wait_for(_block(), timeout) + await asyncio.sleep(wait_period, loop=self.loop) + await asyncio.wait_for(_block(), timeout, loop=self.loop) @property def applications(self): @@ -550,8 +584,7 @@ class Model(object): explicit call to this method. """ - facade = client.ClientFacade() - facade.connect(self.connection) + facade = client.ClientFacade.from_connection(self.connection) self.info = await facade.ModelInfo() log.debug('Got ModelInfo: %s', vars(self.info)) @@ -605,41 +638,34 @@ class Model(object): """ async def _start_watch(): - self._watch_shutdown.clear() try: - allwatcher = watcher.AllWatcher() - self._watch_conn = await self.connection.clone() - allwatcher.connect(self._watch_conn) - while True: - results = await allwatcher.Next() + allwatcher = client.AllWatcherFacade.from_connection( + self.connection) + while not self._watch_stopping.is_set(): + results = await utils.run_with_interrupt( + allwatcher.Next(), + self._watch_stopping, + self.loop) + if self._watch_stopping.is_set(): + break for delta in results.deltas: delta = get_entity_delta(delta) old_obj, new_obj = self.state.apply_delta(delta) - # XXX: Might not want to shield at this level - # We are shielding because when the watcher is - # canceled (on disconnect()), we don't want all of - # its children (every observer callback) to be - # canceled with it. So we shield them. But this means - # they can *never* be canceled. - await asyncio.shield( - self._notify_observers(delta, old_obj, new_obj)) + await self._notify_observers(delta, old_obj, new_obj) self._watch_received.set() except CancelledError: - log.debug('Closing watcher connection') - await self._watch_conn.close() - self._watch_shutdown.set() - self._watch_conn = None + pass + except Exception: + log.exception('Error in watcher') + raise + finally: + self._watch_stopped.set() log.debug('Starting watcher task') - self._watcher_task = self.loop.create_task(_start_watch()) - - def _stop_watching(self): - """Stop the asynchronous watch against this model. - - """ - log.debug('Stopping watcher task') - if self._watcher_task: - self._watcher_task.cancel() + self._watch_received.clear() + self._watch_stopping.clear() + self._watch_stopped.clear() + self.loop.create_task(_start_watch()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state @@ -661,7 +687,8 @@ class Model(object): for o in self.observers: if o.cares_about(delta): - asyncio.ensure_future(o(delta, old_obj, new_obj, self)) + asyncio.ensure_future(o(delta, old_obj, new_obj, self), + loop=self.loop) async def _wait(self, entity_type, entity_id, action, predicate=None): """ @@ -670,7 +697,7 @@ class Model(object): :param entity_type: The entity's type. :param entity_id: The entity's id. - :param action: the type of action (e.g., 'add' or 'change') + :param action: the type of action (e.g., 'add', 'change', or 'remove') :param predicate: optional callable that must take as an argument a delta, and must return a boolean, indicating whether the delta contains the specific action we're looking @@ -685,7 +712,9 @@ class Model(object): self.add_observer(callback, entity_type, action, entity_id, predicate) entity_id = await q.get() - return self.state._live_entity_map(entity_type)[entity_id] + # object might not be in the entity_map if we were waiting for a + # 'remove' action + return self.state._live_entity_map(entity_type).get(entity_id) async def _wait_for_new(self, entity_type, entity_id=None, predicate=None): """Wait for a new object to appear in the Model and return it. @@ -719,9 +748,8 @@ class Model(object): return await self._wait('action', action_id, 'change', predicate) - def add_machine( - self, spec=None, constraints=None, disks=None, series=None, - count=1): + async def add_machine( + self, spec=None, constraints=None, disks=None, series=None): """Start a new, empty machine and optionally a container, or add a container to a machine. @@ -729,25 +757,83 @@ class Model(object): Examples:: (None) - starts a new machine - 'lxc' - starts a new machine with on lxc container - 'lxc:4' - starts a new lxc container on machine 4 + 'lxd' - starts a new machine with one lxd container + 'lxd:4' - starts a new lxd container on machine 4 'ssh:user@10.10.0.3' - manually provisions a machine with ssh 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS 'maas2.name' - acquire machine maas2.name on MAAS - :param constraints: Machine constraints - :type constraints: :class:`juju.Constraints` - :param list disks: List of disk :class:`constraints ` - :param str series: Series - :param int count: Number of machines to deploy - Supported container types are: lxc, lxd, kvm + :param dict constraints: Machine constraints, which can contain the + the following keys:: + + arch : str + container : str + cores : int + cpu_power : int + instance_type : str + mem : int + root_disk : int + spaces : list(str) + tags : list(str) + virt_type : str + + Example:: + + constraints={ + 'mem': 256 * MB, + 'tags': ['virtual'], + } + + :param list disks: List of disk constraint dictionaries, which can + contain the following keys:: + + count : int + pool : str + size : int + + Example:: + + disks=[{ + 'pool': 'rootfs', + 'size': 10 * GB, + 'count': 1, + }] + + :param str series: Series, e.g. 'xenial' + + Supported container types are: lxd, kvm When deploying a container to an existing machine, constraints cannot be used. """ - pass - add_machines = add_machine + params = client.AddMachineParams() + params.jobs = ['JobHostUnits'] + + if spec: + placement = parse_placement(spec) + if placement: + params.placement = placement[0] + + if constraints: + params.constraints = client.Value.from_json(constraints) + + if disks: + params.disks = [ + client.Constraints.from_json(o) for o in disks] + + if series: + params.series = series + + # Submit the request. + client_facade = client.ClientFacade.from_connection(self.connection) + results = await client_facade.AddMachines([params]) + error = results.machines[0].error + if error: + raise ValueError("Error adding machine: %s" % error.message) + machine_id = results.machines[0].machine + log.debug('Added new machine %s', machine_id) + return await self._wait_for_new('machine', machine_id) async def add_relation(self, relation1, relation2): """Add a relation between two applications. @@ -756,8 +842,7 @@ class Model(object): :param str relation2: '[:]' """ - app_facade = client.ApplicationFacade() - app_facade.connect(self.connection) + app_facade = client.ApplicationFacade.from_connection(self.connection) log.debug( 'Adding relation %s <-> %s', relation1, relation2) @@ -791,15 +876,17 @@ class Model(object): :param \*cidrs: Optional list of existing subnet CIDRs """ - pass + raise NotImplementedError() - def add_ssh_key(self, key): + async def add_ssh_key(self, user, key): """Add a public SSH key to this model. + :param str user: The username of the user :param str key: The public ssh key """ - pass + key_facade = client.KeyManagerFacade.from_connection(self.connection) + return await key_facade.AddKeys([key], user) add_ssh_keys = add_ssh_key def add_subnet(self, cidr_or_id, space, *zones): @@ -810,13 +897,13 @@ class Model(object): :param str \*zones: Zone(s) in which the subnet resides """ - pass + raise NotImplementedError() def get_backups(self): """Retrieve metadata for backups in this model. """ - pass + raise NotImplementedError() def block(self, *commands): """Add a new block to this model. @@ -825,13 +912,13 @@ class Model(object): 'all-changes', 'destroy-model', 'remove-object' """ - pass + raise NotImplementedError() def get_blocks(self): """List blocks for this model. """ - pass + raise NotImplementedError() def get_cached_images(self, arch=None, kind=None, series=None): """Return a list of cached OS images. @@ -841,7 +928,7 @@ class Model(object): :param str series: Filter by image series, e.g. 'xenial' """ - pass + raise NotImplementedError() def create_backup(self, note=None, no_download=False): """Create a backup of this model. @@ -851,7 +938,7 @@ class Model(object): :return str: Path to downloaded archive """ - pass + raise NotImplementedError() def create_storage_pool(self, name, provider_type, **pool_config): """Create or define a storage pool. @@ -861,7 +948,7 @@ class Model(object): :param \*\*pool_config: key/value pool configuration pairs """ - pass + raise NotImplementedError() def debug_log( self, no_tail=False, exclude_module=None, include_module=None, @@ -885,7 +972,23 @@ class Model(object): :param list exclude: Do not show log messages for these entities """ - pass + raise NotImplementedError() + + def _get_series(self, entity_url, entity): + # try to get the series from the provided charm URL + if entity_url.startswith('cs:'): + parts = entity_url[3:].split('/') + else: + parts = entity_url.split('/') + if parts[0].startswith('~'): + parts.pop(0) + if len(parts) > 1: + # series was specified in the URL + return parts[0] + # series was not supplied at all, so use the newest + # supported series according to the charm store + ss = entity['Meta']['supported-series'] + return ss['SupportedSeries'][0] async def deploy( self, entity_url, application_name=None, bind=None, budget=None, @@ -910,29 +1013,20 @@ class Model(object): :param dict resources: : pairs :param str series: Series on which to deploy :param dict storage: Storage constraints TODO how do these look? - :param str to: Placement directive, e.g.:: + :param to: Placement directive as a string. For example: - '23' - machine 23 - 'lxc:7' - new lxc container on machine 7 - '24/lxc/3' - lxc container 3 or machine 24 + '23' - place on machine 23 + 'lxd:7' - place in new lxd container on machine 7 + '24/lxd/3' - place in container 3 on machine 24 If None, a new machine is provisioned. TODO:: - - application_name is required; fill this in automatically if not - provided by caller - - series is required; how do we pick a default? + - support local resources """ - if to: - placement = [ - client.Placement(**p) for p in to - ] - else: - placement = [] - if storage: storage = { k: client.Constraints(**v) @@ -943,13 +1037,13 @@ class Model(object): entity_url.startswith('local:') or os.path.isdir(entity_url) ) - entity_id = await self.charmstore.entityId(entity_url) \ - if not is_local else entity_url + if is_local: + entity_id = entity_url.replace('local:', '') + else: + entity = await self.charmstore.entity(entity_url) + entity_id = entity['Id'] - app_facade = client.ApplicationFacade() - client_facade = client.ClientFacade() - app_facade.connect(self.connection) - client_facade.connect(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection) is_bundle = ((is_local and (Path(entity_id) / 'bundle.yaml').exists()) or @@ -966,18 +1060,27 @@ class Model(object): # haven't made it yet we'll need to wait on them to be added await asyncio.gather(*[ asyncio.ensure_future( - self._wait_for_new('application', app_name)) + self._wait_for_new('application', app_name), + loop=self.loop) for app_name in pending_apps - ]) + ], loop=self.loop) return [app for name, app in self.applications.items() if name in handler.applications] else: - log.debug( - 'Deploying %s', entity_id) - if not is_local: + if not application_name: + application_name = entity['Meta']['charm-metadata']['Name'] + if not series: + series = self._get_series(entity_url, entity) + if not channel: + channel = 'stable' await client_facade.AddCharm(channel, entity_id) - elif not entity_id.startswith('local:'): + # XXX: we're dropping local resources here, but we don't + # actually support them yet anyway + resources = await self._add_store_resources(application_name, + entity_id, + entity) + else: # We have a local charm dir that needs to be uploaded charm_dir = os.path.abspath( os.path.expanduser(entity_id)) @@ -988,39 +1091,97 @@ class Model(object): "Pass a 'series' kwarg to Model.deploy().".format( charm_dir)) entity_id = await self.add_local_charm_dir(charm_dir, series) - - app = client.ApplicationDeploy( - application=application_name, - channel=channel, + return await self._deploy( charm_url=entity_id, - config=config, - constraints=parse_constraints(constraints), + application=application_name, + series=series, + config=config or {}, + constraints=constraints, endpoint_bindings=bind, - num_units=num_units, - placement=placement, resources=resources, - series=series, storage=storage, + channel=channel, + num_units=num_units, + placement=parse_placement(to) ) - result = await app_facade.Deploy([app]) - errors = [r.error.message for r in result.results if r.error] - if errors: - raise JujuError('\n'.join(errors)) - return await self._wait_for_new('application', application_name) + async def _add_store_resources(self, application, entity_url, entity=None): + if not entity: + # avoid extra charm store call if one was already made + entity = await self.charmstore.entity(entity_url) + resources = [ + { + 'description': resource['Description'], + 'fingerprint': resource['Fingerprint'], + 'name': resource['Name'], + 'path': resource['Path'], + 'revision': resource['Revision'], + 'size': resource['Size'], + 'type_': resource['Type'], + 'origin': 'store', + } for resource in entity['Meta']['resources'] + ] + + if not resources: + return None - def destroy(self): - """Terminate all machines and resources for this model. + resources_facade = client.ResourcesFacade.from_connection( + self.connection) + response = await resources_facade.AddPendingResources( + tag.application(application), + entity_url, + [client.CharmResource(**resource) for resource in resources]) + resource_map = {resource['name']: pid + for resource, pid + in zip(resources, response.pending_ids)} + return resource_map + async def _deploy(self, charm_url, application, series, config, + constraints, endpoint_bindings, resources, storage, + channel=None, num_units=None, placement=None): + """Logic shared between `Model.deploy` and `BundleHandler.deploy`. """ - pass + log.info('Deploying %s', charm_url) + + # stringify all config values for API, and convert to YAML + config = {k: str(v) for k, v in config.items()} + config = yaml.dump({application: config}, + default_flow_style=False) + + app_facade = client.ApplicationFacade.from_connection( + self.connection) + + app = client.ApplicationDeploy( + charm_url=charm_url, + application=application, + series=series, + channel=channel, + config_yaml=config, + constraints=parse_constraints(constraints), + endpoint_bindings=endpoint_bindings, + num_units=num_units, + resources=resources, + storage=storage, + placement=placement + ) + + result = await app_facade.Deploy([app]) + errors = [r.error.message for r in result.results if r.error] + if errors: + raise JujuError('\n'.join(errors)) + return await self._wait_for_new('application', application) + + async def destroy(self): + """Terminate all machines and resources for this model. + Is already implemented in controller.py. + """ + raise NotImplementedError() async def destroy_unit(self, *unit_names): """Destroy units by name. """ - app_facade = client.ApplicationFacade() - app_facade.connect(self.connection) + app_facade = client.ApplicationFacade.from_connection(self.connection) log.debug( 'Destroying unit%s %s', @@ -1037,7 +1198,7 @@ class Model(object): :return str: Path to the archive file """ - pass + raise NotImplementedError() def enable_ha( self, num_controllers=0, constraints=None, series=None, to=None): @@ -1056,28 +1217,35 @@ class Model(object): If None, a new machine is provisioned. """ - pass + raise NotImplementedError() def get_config(self): """Return the configuration settings for this model. """ - pass + raise NotImplementedError() def get_constraints(self): """Return the machine constraints for this model. """ - pass + raise NotImplementedError() - def grant(self, username, acl='read'): + async def grant(self, username, acl='read'): """Grant a user access to this model. :param str username: Username :param str acl: Access control ('read' or 'write') """ - pass + controller_conn = await self.connection.controller() + model_facade = client.ModelManagerFacade.from_connection( + controller_conn) + user = tag.user(username) + model = tag.model(self.info.uuid) + changes = client.ModifyModelAccess(acl, 'grant', model, user) + await self.revoke(username) + return await model_facade.ModifyModelAccess([changes]) def import_ssh_key(self, identity): """Add a public SSH key from a trusted indentity source to this model. @@ -1085,35 +1253,37 @@ class Model(object): :param str identity: User identity in the form : """ - pass + raise NotImplementedError() import_ssh_keys = import_ssh_key - def get_machines(self, machine, utc=False): + async def get_machines(self): """Return list of machines in this model. - :param str machine: Machine id, e.g. '0' - :param bool utc: Display time as UTC in RFC3339 format - """ - pass + return list(self.state.machines.keys()) def get_shares(self): """Return list of all users with access to this model. """ - pass + raise NotImplementedError() def get_spaces(self): """Return list of all known spaces, including associated subnets. """ - pass + raise NotImplementedError() - def get_ssh_key(self): + async def get_ssh_key(self, raw_ssh=False): """Return known SSH keys for this model. + :param bool raw_ssh: if True, returns the raw ssh key, + else it's fingerprint """ - pass + key_facade = client.KeyManagerFacade.from_connection(self.connection) + entity = {'tag': tag.model(self.info.uuid)} + entities = client.Entities([entity]) + return await key_facade.ListKeys(entities, raw_ssh) get_ssh_keys = get_ssh_key def get_storage(self, filesystem=False, volume=False): @@ -1123,7 +1293,7 @@ class Model(object): :param bool volume: Include volume storage """ - pass + raise NotImplementedError() def get_storage_pools(self, names=None, providers=None): """Return list of storage pools. @@ -1132,7 +1302,7 @@ class Model(object): :param list providers: Only include pools for these providers """ - pass + raise NotImplementedError() def get_subnets(self, space=None, zone=None): """Return list of known subnets. @@ -1141,13 +1311,13 @@ class Model(object): :param str zone: Only include subnets in this zone """ - pass + raise NotImplementedError() def remove_blocks(self): """Remove all blocks from this model. """ - pass + raise NotImplementedError() def remove_backup(self, backup_id): """Delete a backup. @@ -1155,7 +1325,7 @@ class Model(object): :param str backup_id: The id of the backup to remove """ - pass + raise NotImplementedError() def remove_cached_images(self, arch=None, kind=None, series=None): """Remove cached OS images. @@ -1165,7 +1335,7 @@ class Model(object): :param str series: Image series to remove, e.g. 'xenial' """ - pass + raise NotImplementedError() def remove_machine(self, *machine_ids): """Remove a machine from this model. @@ -1173,16 +1343,21 @@ class Model(object): :param str \*machine_ids: Ids of the machines to remove """ - pass + raise NotImplementedError() remove_machines = remove_machine - def remove_ssh_key(self, *keys): + async def remove_ssh_key(self, user, key): """Remove a public SSH key(s) from this model. - :param str \*keys: Keys to remove + :param str key: Full ssh key + :param str user: Juju user to which the key is registered """ - pass + key_facade = client.KeyManagerFacade.from_connection(self.connection) + key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii'))) + key = hashlib.md5(key).hexdigest() + key = ':'.join(a+b for a, b in zip(key[::2], key[1::2])) + await key_facade.DeleteKeys([key], user) remove_ssh_keys = remove_ssh_key def restore_backup( @@ -1198,22 +1373,27 @@ class Model(object): :param bool upload_tools: Upload tools if bootstrapping a new machine """ - pass + raise NotImplementedError() def retry_provisioning(self): """Retry provisioning for failed machines. """ - pass + raise NotImplementedError() - def revoke(self, username, acl='read'): + async def revoke(self, username): """Revoke a user's access to this model. :param str username: Username to revoke - :param str acl: Access control ('read' or 'write') """ - pass + controller_conn = await self.connection.controller() + model_facade = client.ModelManagerFacade.from_connection( + controller_conn) + user = tag.user(username) + model = tag.model(self.info.uuid) + changes = client.ModifyModelAccess('read', 'revoke', model, user) + return await model_facade.ModifyModelAccess([changes]) def run(self, command, timeout=None): """Run command on all machines in this model. @@ -1222,7 +1402,7 @@ class Model(object): :param int timeout: Time to wait before command is considered failed """ - pass + raise NotImplementedError() def set_config(self, **config): """Set configuration keys on this model. @@ -1230,7 +1410,7 @@ class Model(object): :param \*\*config: Config key/values """ - pass + raise NotImplementedError() def set_constraints(self, constraints): """Set machine constraints on this model. @@ -1238,7 +1418,7 @@ class Model(object): :param :class:`juju.Constraints` constraints: Machine constraints """ - pass + raise NotImplementedError() def get_action_output(self, action_uuid, wait=-1): """Get the results of an action by ID. @@ -1247,7 +1427,7 @@ class Model(object): :param int wait: Time in seconds to wait for action to complete """ - pass + raise NotImplementedError() def get_action_status(self, uuid_or_prefix=None, name=None): """Get the status of all actions, filtered by ID, ID prefix, or action name. @@ -1256,7 +1436,7 @@ class Model(object): :param str name: Filter by action name """ - pass + raise NotImplementedError() def get_budget(self, budget_name): """Get budget usage info. @@ -1264,17 +1444,18 @@ class Model(object): :param str budget_name: Name of budget """ - pass + raise NotImplementedError() - def get_status(self, filter_=None, utc=False): + async def get_status(self, filters=None, utc=False): """Return the status of the model. - :param str filter_: Service or unit name or wildcard ('*') + :param str filters: Optional list of applications, units, or machines + to include, which can use wildcards ('*'). :param bool utc: Display time as UTC in RFC3339 format """ - pass - status = get_status + client_facade = client.ClientFacade.from_connection(self.connection) + return await client_facade.FullStatus(filters) def sync_tools( self, all_=False, destination=None, dry_run=False, public=False, @@ -1291,7 +1472,7 @@ class Model(object): :param str version: Copy a specific major.minor version """ - pass + raise NotImplementedError() def unblock(self, *commands): """Unblock an operation that would alter this model. @@ -1300,7 +1481,7 @@ class Model(object): 'all-changes', 'destroy-model', 'remove-object' """ - pass + raise NotImplementedError() def unset_config(self, *keys): """Unset configuration on this model. @@ -1308,13 +1489,13 @@ class Model(object): :param str \*keys: The keys to unset """ - pass + raise NotImplementedError() def upgrade_gui(self): """Upgrade the Juju GUI for this model. """ - pass + raise NotImplementedError() def upgrade_juju( self, dry_run=False, reset_previous_upgrade=False, @@ -1328,7 +1509,7 @@ class Model(object): :param str version: Upgrade to a specific version """ - pass + raise NotImplementedError() def upload_backup(self, archive_path): """Store a backup archive remotely in Juju. @@ -1336,7 +1517,7 @@ class Model(object): :param str archive_path: Path to local archive """ - pass + raise NotImplementedError() @property def charmstore(self): @@ -1353,8 +1534,8 @@ class Model(object): log.debug("Retrieving metrics for %s", ', '.join(tags) if tags else "all units") - metrics_facade = client.MetricsDebugFacade() - metrics_facade.connect(self.connection) + metrics_facade = client.MetricsDebugFacade.from_connection( + self.connection) entities = [client.Entity(tag) for tag in tags] metrics_result = await metrics_facade.GetMetrics(entities) @@ -1404,12 +1585,12 @@ class BundleHandler(object): for unit_name, unit in model.units.items(): app_units = self._units_by_app.setdefault(unit.application, []) app_units.append(unit_name) - self.client_facade = client.ClientFacade() - self.client_facade.connect(model.connection) - self.app_facade = client.ApplicationFacade() - self.app_facade.connect(model.connection) - self.ann_facade = client.AnnotationsFacade() - self.ann_facade.connect(model.connection) + self.client_facade = client.ClientFacade.from_connection( + model.connection) + self.app_facade = client.ApplicationFacade.from_connection( + model.connection) + self.ann_facade = client.AnnotationsFacade.from_connection( + model.connection) async def _handle_local_charms(self, bundle): """Search for references to local charms (i.e. filesystem paths) @@ -1452,7 +1633,7 @@ class BundleHandler(object): charm_urls = await asyncio.gather(*[ self.model.add_local_charm_dir(*params) for params in args - ]) + ], loop=self.model.loop) # Update the 'charm:' entry for each app with the new 'local:' url. for app_name, charm_url in zip(apps, charm_urls): bundle['services'][app_name]['charm'] = charm_url @@ -1473,9 +1654,6 @@ class BundleHandler(object): self.plan = await self.client_facade.GetBundleChanges( yaml.dump(self.bundle)) - if self.plan.errors: - raise JujuError('\n'.join(self.plan.errors)) - async def execute_plan(self): for step in self.plan.changes: method = getattr(self, step.method) @@ -1555,7 +1733,7 @@ class BundleHandler(object): results = await self.client_facade.AddMachines([params]) error = results.machines[0].error if error: - raise ValueError("Error adding machine: %s", error.message) + raise ValueError("Error adding machine: %s" % error.message) machine = results.machines[0].machine log.debug('Added new machine %s', machine) return machine @@ -1611,24 +1789,21 @@ class BundleHandler(object): """ # resolve indirect references charm = self.resolve(charm) - # stringify all config values for API - options = {k: str(v) for k, v in options.items()} - # build param object - app = client.ApplicationDeploy( + # the bundle plan doesn't actually do anything with resources, even + # though it ostensibly gives us something (None) for that param + if not charm.startswith('local:'): + resources = await self.model._add_store_resources(application, + charm) + await self.model._deploy( charm_url=charm, - series=series, application=application, + series=series, config=options, - constraints=parse_constraints(constraints), - storage=storage, + constraints=constraints, endpoint_bindings=endpoint_bindings, resources=resources, + storage=storage, ) - # do the do - log.info('Deploying %s', charm) - await self.app_facade.Deploy([app]) - # ensure the app is in the model for future operations - await self.model._wait_for_new('application', application) return application async def addUnit(self, application, to): @@ -1696,7 +1871,7 @@ class CharmStore(object): """ def __init__(self, loop): self.loop = loop - self._cs = charmstore.CharmStore() + self._cs = theblues.charmstore.CharmStore(timeout=5) def __getattr__(self, name): """ @@ -1710,13 +1885,25 @@ class CharmStore(object): else: async def coro(*args, **kwargs): method = partial(attr, *args, **kwargs) - return await self.loop.run_in_executor(None, method) + for attempt in range(1, 4): + try: + return await self.loop.run_in_executor(None, method) + except theblues.errors.ServerError: + if attempt == 3: + raise + await asyncio.sleep(1, loop=self.loop) setattr(self, name, coro) wrapper = coro return wrapper class CharmArchiveGenerator(object): + """ + Create a Zip archive of a local charm directory for upload to a controller. + + This is used automatically by + `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_. + """ def __init__(self, path): self.path = os.path.abspath(os.path.expanduser(path))