self.state = ModelState(self)
self.info = None
self._watcher_task = None
- self._watch_shutdown = asyncio.Event(loop=loop)
- self._watch_received = asyncio.Event(loop=loop)
+ self._watch_shutdown = asyncio.Event(loop=self.loop)
+ self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
async def connect(self, *args, **kw):
args and kw are passed through to Connection.connect()
"""
+ if 'loop' not in kw:
+ kw['loop'] = self.loop
self.connection = await connection.Connection.connect(*args, **kw)
await self._after_connect()
"""Connect to the current Juju model.
"""
- self.connection = await connection.Connection.connect_current()
+ self.connection = await connection.Connection.connect_current(
+ self.loop)
await self._after_connect()
async def connect_model(self, model_name):
:param model_name: Format [controller:][user/]model
"""
- self.connection = await connection.Connection.connect_model(model_name)
+ self.connection = await connection.Connection.connect_model(model_name,
+ self.loop)
await self._after_connect()
async def _after_connect(self):
"""
async def _block():
while not all(c() for c in conditions):
- await asyncio.sleep(wait_period)
- await asyncio.wait_for(_block(), timeout)
+ await asyncio.sleep(wait_period, loop=self.loop)
+ await asyncio.wait_for(_block(), timeout, loop=self.loop)
@property
def applications(self):
# canceled with it. So we shield them. But this means
# they can *never* be canceled.
await asyncio.shield(
- self._notify_observers(delta, old_obj, new_obj))
+ self._notify_observers(delta, old_obj, new_obj),
+ loop=self.loop)
self._watch_received.set()
except CancelledError:
log.debug('Closing watcher connection')
for o in self.observers:
if o.cares_about(delta):
- asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+ loop=self.loop)
async def _wait(self, entity_type, entity_id, action, predicate=None):
"""
:param \*cidrs: Optional list of existing subnet CIDRs
"""
- pass
+ raise NotImplementedError()
def add_ssh_key(self, key):
"""Add a public SSH key to this model.
:param str key: The public ssh key
"""
- pass
+ raise NotImplementedError()
add_ssh_keys = add_ssh_key
def add_subnet(self, cidr_or_id, space, *zones):
:param str \*zones: Zone(s) in which the subnet resides
"""
- pass
+ raise NotImplementedError()
def get_backups(self):
"""Retrieve metadata for backups in this model.
"""
- pass
+ raise NotImplementedError()
def block(self, *commands):
"""Add a new block to this model.
'all-changes', 'destroy-model', 'remove-object'
"""
- pass
+ raise NotImplementedError()
def get_blocks(self):
"""List blocks for this model.
"""
- pass
+ raise NotImplementedError()
def get_cached_images(self, arch=None, kind=None, series=None):
"""Return a list of cached OS images.
:param str series: Filter by image series, e.g. 'xenial'
"""
- pass
+ raise NotImplementedError()
def create_backup(self, note=None, no_download=False):
"""Create a backup of this model.
:return str: Path to downloaded archive
"""
- pass
+ raise NotImplementedError()
def create_storage_pool(self, name, provider_type, **pool_config):
"""Create or define a storage pool.
:param \*\*pool_config: key/value pool configuration pairs
"""
- pass
+ raise NotImplementedError()
def debug_log(
self, no_tail=False, exclude_module=None, include_module=None,
:param list exclude: Do not show log messages for these entities
"""
- pass
+ raise NotImplementedError()
async def deploy(
self, entity_url, application_name=None, bind=None, budget=None,
# haven't made it yet we'll need to wait on them to be added
await asyncio.gather(*[
asyncio.ensure_future(
- self._wait_for_new('application', app_name))
+ self._wait_for_new('application', app_name),
+ loop=self.loop)
for app_name in pending_apps
- ])
+ ], loop=self.loop)
return [app for name, app in self.applications.items()
if name in handler.applications]
else:
'Deploying %s', entity_id)
if not is_local:
+ parts = entity_id[3:].split('/')
+ if parts[0].startswith('~'):
+ parts.pop(0)
+ if not application_name:
+ application_name = parts[-1].split('-')[0]
+ if not series:
+ if len(parts) > 1:
+ series = parts[0]
+ else:
+ entity = await self.charmstore.entity(entity_id)
+ ss = entity['Meta']['supported-series']
+ series = ss['SupportedSeries'][0]
await client_facade.AddCharm(channel, entity_id)
elif not entity_id.startswith('local:'):
# We have a local charm dir that needs to be uploaded
"""Terminate all machines and resources for this model.
"""
- pass
+ raise NotImplementedError()
async def destroy_unit(self, *unit_names):
"""Destroy units by name.
:return str: Path to the archive file
"""
- pass
+ raise NotImplementedError()
def enable_ha(
self, num_controllers=0, constraints=None, series=None, to=None):
If None, a new machine is provisioned.
"""
- pass
+ raise NotImplementedError()
def get_config(self):
"""Return the configuration settings for this model.
"""
- pass
+ raise NotImplementedError()
def get_constraints(self):
"""Return the machine constraints for this model.
"""
- pass
+ raise NotImplementedError()
def grant(self, username, acl='read'):
"""Grant a user access to this model.
:param str acl: Access control ('read' or 'write')
"""
- pass
+ raise NotImplementedError()
def import_ssh_key(self, identity):
"""Add a public SSH key from a trusted indentity source to this model.
:param str identity: User identity in the form <lp|gh>:<username>
"""
- pass
+ raise NotImplementedError()
import_ssh_keys = import_ssh_key
def get_machines(self, machine, utc=False):
:param bool utc: Display time as UTC in RFC3339 format
"""
- pass
+ raise NotImplementedError()
def get_shares(self):
"""Return list of all users with access to this model.
"""
- pass
+ raise NotImplementedError()
def get_spaces(self):
"""Return list of all known spaces, including associated subnets.
"""
- pass
+ raise NotImplementedError()
def get_ssh_key(self):
"""Return known SSH keys for this model.
"""
- pass
+ raise NotImplementedError()
get_ssh_keys = get_ssh_key
def get_storage(self, filesystem=False, volume=False):
:param bool volume: Include volume storage
"""
- pass
+ raise NotImplementedError()
def get_storage_pools(self, names=None, providers=None):
"""Return list of storage pools.
:param list providers: Only include pools for these providers
"""
- pass
+ raise NotImplementedError()
def get_subnets(self, space=None, zone=None):
"""Return list of known subnets.
:param str zone: Only include subnets in this zone
"""
- pass
+ raise NotImplementedError()
def remove_blocks(self):
"""Remove all blocks from this model.
"""
- pass
+ raise NotImplementedError()
def remove_backup(self, backup_id):
"""Delete a backup.
:param str backup_id: The id of the backup to remove
"""
- pass
+ raise NotImplementedError()
def remove_cached_images(self, arch=None, kind=None, series=None):
"""Remove cached OS images.
:param str series: Image series to remove, e.g. 'xenial'
"""
- pass
+ raise NotImplementedError()
def remove_machine(self, *machine_ids):
"""Remove a machine from this model.
:param str \*machine_ids: Ids of the machines to remove
"""
- pass
+ raise NotImplementedError()
remove_machines = remove_machine
def remove_ssh_key(self, *keys):
:param str \*keys: Keys to remove
"""
- pass
+ raise NotImplementedError()
remove_ssh_keys = remove_ssh_key
def restore_backup(
:param bool upload_tools: Upload tools if bootstrapping a new machine
"""
- pass
+ raise NotImplementedError()
def retry_provisioning(self):
"""Retry provisioning for failed machines.
"""
- pass
+ raise NotImplementedError()
def revoke(self, username, acl='read'):
"""Revoke a user's access to this model.
:param str acl: Access control ('read' or 'write')
"""
- pass
+ raise NotImplementedError()
def run(self, command, timeout=None):
"""Run command on all machines in this model.
:param int timeout: Time to wait before command is considered failed
"""
- pass
+ raise NotImplementedError()
def set_config(self, **config):
"""Set configuration keys on this model.
:param \*\*config: Config key/values
"""
- pass
+ raise NotImplementedError()
def set_constraints(self, constraints):
"""Set machine constraints on this model.
:param :class:`juju.Constraints` constraints: Machine constraints
"""
- pass
+ raise NotImplementedError()
def get_action_output(self, action_uuid, wait=-1):
"""Get the results of an action by ID.
:param int wait: Time in seconds to wait for action to complete
"""
- pass
+ raise NotImplementedError()
def get_action_status(self, uuid_or_prefix=None, name=None):
"""Get the status of all actions, filtered by ID, ID prefix, or action name.
:param str name: Filter by action name
"""
- pass
+ raise NotImplementedError()
def get_budget(self, budget_name):
"""Get budget usage info.
:param str budget_name: Name of budget
"""
- pass
+ raise NotImplementedError()
- def get_status(self, filter_=None, utc=False):
+ async def get_status(self, filters=None, utc=False):
"""Return the status of the model.
- :param str filter_: Service or unit name or wildcard ('*')
+ :param str filters: Optional list of applications, units, or machines
+ to include, which can use wildcards ('*').
:param bool utc: Display time as UTC in RFC3339 format
"""
- pass
- status = get_status
+ client_facade = client.ClientFacade()
+ client_facade.connect(self.connection)
+ return await client_facade.FullStatus(filters)
def sync_tools(
self, all_=False, destination=None, dry_run=False, public=False,
:param str version: Copy a specific major.minor version
"""
- pass
+ raise NotImplementedError()
def unblock(self, *commands):
"""Unblock an operation that would alter this model.
'all-changes', 'destroy-model', 'remove-object'
"""
- pass
+ raise NotImplementedError()
def unset_config(self, *keys):
"""Unset configuration on this model.
:param str \*keys: The keys to unset
"""
- pass
+ raise NotImplementedError()
def upgrade_gui(self):
"""Upgrade the Juju GUI for this model.
"""
- pass
+ raise NotImplementedError()
def upgrade_juju(
self, dry_run=False, reset_previous_upgrade=False,
:param str version: Upgrade to a specific version
"""
- pass
+ raise NotImplementedError()
def upload_backup(self, archive_path):
"""Store a backup archive remotely in Juju.
:param str archive_path: Path to local archive
"""
- pass
+ raise NotImplementedError()
@property
def charmstore(self):
charm_urls = await asyncio.gather(*[
self.model.add_local_charm_dir(*params)
for params in args
- ])
+ ], loop=self.model.loop)
# Update the 'charm:' entry for each app with the new 'local:' url.
for app_name, charm_url in zip(apps, charm_urls):
bundle['services'][app_name]['charm'] = charm_url
"""
# resolve indirect references
charm = self.resolve(charm)
- # stringify all config values for API
+ # stringify all config values for API, and convert to YAML
options = {k: str(v) for k, v in options.items()}
+ options = yaml.dump({application: options}, default_flow_style=False)
# build param object
app = client.ApplicationDeploy(
charm_url=charm,
series=series,
application=application,
- config=options,
+ # Pass options to config-yaml rather than config, as
+ # config-yaml invokes a newer codepath that better handles
+ # empty strings in the options values.
+ config_yaml=options,
constraints=parse_constraints(constraints),
storage=storage,
endpoint_bindings=endpoint_bindings,