from . import tag, utils
from .client import client, connector
from .client.client import ConfigValue
+from .client.client import Value
from .constraints import parse as parse_constraints
from .constraints import normalize_key
from .delta import get_entity_class, get_entity_delta
from .errors import JujuAPIError, JujuError
from .exceptions import DeadEntityException
from .placement import parse as parse_placement
+from . import provisioner
+
log = logging.getLogger(__name__)
`juju.client.connection.Connection.MAX_FRAME_SIZE`
:param bakery_client httpbakery.Client: The bakery client to use
for macaroon authorization.
- :param jujudata JujuData: The source for current controller information.
+ :param jujudata JujuData: The source for current controller information
"""
self._connector = connector.Connector(
loop=loop,
async def __aexit__(self, exc_type, exc, tb):
await self.disconnect()
- async def connect(self, model_name=None, **kwargs):
+ async def connect(self, *args, **kwargs):
"""Connect to a juju model.
- If any arguments are specified other than model_name, then
- model_name must be None and an explicit connection will be made
- using Connection.connect using those parameters (the 'uuid'
- parameter must be specified).
+ This supports two calling conventions:
- Otherwise, if model_name is None, connect to the current model.
+ The model and (optionally) authentication information can be taken
+ from the data files created by the Juju CLI. This convention will
+ be used if a ``model_name`` is specified, or if the ``endpoint``
+ and ``uuid`` are not.
- Otherwise, model_name must specify the name of a known
- model.
+ Otherwise, all of the ``endpoint``, ``uuid``, and authentication
+ information (``username`` and ``password``, or ``bakery_client`` and/or
+ ``macaroons``) are required.
- :param model_name: Format [controller:][user/]model
+ If a single positional argument is given, it will be assumed to be
+ the ``model_name``. Otherwise, the first positional argument, if any,
+ must be the ``endpoint``.
+
+ Available parameters are:
+ :param model_name: Format [controller:][user/]model
+ :param str endpoint: The hostname:port of the controller to connect to.
+ :param str uuid: The model UUID to connect to.
+ :param str username: The username for controller-local users (or None
+ to use macaroon-based login.)
+ :param str password: The password for controller-local users.
+ :param str cacert: The CA certificate of the controller
+ (PEM formatted).
+ :param httpbakery.Client bakery_client: The macaroon bakery client to
+ to use when performing macaroon-based login. Macaroon tokens
+ acquired when logging will be saved to bakery_client.cookies.
+ If this is None, a default bakery_client will be used.
+ :param list macaroons: List of macaroons to load into the
+ ``bakery_client``.
+ :param asyncio.BaseEventLoop loop: The event loop to use for async
+ operations.
+ :param int max_frame_size: The maximum websocket frame size to allow.
"""
await self.disconnect()
- if not kwargs:
- await self._connector.connect_model(model_name)
+ if 'endpoint' not in kwargs and len(args) < 2:
+ if args and 'model_name' in kwargs:
+ raise TypeError('connect() got multiple values for model_name')
+ elif args:
+ model_name = args[0]
+ else:
+ model_name = kwargs.pop('model_name', None)
+ await self._connector.connect_model(model_name, **kwargs)
else:
- if kwargs.get('uuid') is None:
- raise ValueError('no UUID specified when connecting to model')
+ if 'model_name' in kwargs:
+ raise TypeError('connect() got values for both '
+ 'model_name and endpoint')
+ if args and 'endpoint' in kwargs:
+ raise TypeError('connect() got multiple values for endpoint')
+ if len(args) < 2 and 'uuid' not in kwargs:
+ raise TypeError('connect() missing value for uuid')
+ has_userpass = (len(args) >= 4 or
+ {'username', 'password'}.issubset(kwargs))
+ has_macaroons = (len(args) >= 6 or not
+ {'bakery_client', 'macaroons'}.isdisjoint(kwargs))
+ if not (has_userpass or has_macaroons):
+ raise TypeError('connect() missing auth params')
+ arg_names = [
+ 'endpoint',
+ 'uuid',
+ 'username',
+ 'password',
+ 'cacert',
+ 'bakery_client',
+ 'macaroons',
+ 'loop',
+ 'max_frame_size',
+ ]
+ for i, arg in enumerate(args):
+ kwargs[arg_names[i]] = arg
+ if not {'endpoint', 'uuid'}.issubset(kwargs):
+ raise ValueError('endpoint and uuid are required '
+ 'if model_name not given')
+ if not ({'username', 'password'}.issubset(kwargs) or
+ {'bakery_client', 'macaroons'}.intersection(kwargs)):
+ raise ValueError('Authentication parameters are required '
+ 'if model_name not given')
await self._connector.connect(**kwargs)
await self._after_connect()
async def connect_model(self, model_name):
"""
.. deprecated:: 0.6.2
- Use connect(model_name=model_name) instead.
+ Use ``connect(model_name=model_name)`` instead.
"""
return await self.connect(model_name=model_name)
async def connect_current(self):
"""
.. deprecated:: 0.6.2
- Use connect instead.
+ Use ``connect()`` instead.
"""
return await self.connect()
if self.is_connected():
log.debug('Closing model connection')
await self._connector.disconnect()
- self.info = None
+ self._info = None
async def add_local_charm_dir(self, charm_dir, series):
"""Upload a local charm to the model.
"""
facade = client.ClientFacade.from_connection(self.connection())
- self.info = await facade.ModelInfo()
+ self._info = await facade.ModelInfo()
log.debug('Got ModelInfo: %s', vars(self.info))
return self.info
+ @property
+ def info(self):
+ """Return the cached client.ModelInfo object for this Model.
+
+ If Model.get_info() has not been called, this will return None.
+ """
+ return self._info
+
def add_observer(
self, callable_, entity_type=None, action=None, entity_id=None,
predicate=None):
results = await utils.run_with_interrupt(
allwatcher.Next(),
self._watch_stopping,
- self._connector.loop)
+ loop=self._connector.loop)
except JujuAPIError as e:
if 'watcher was stopped' not in str(e):
raise
pass # can't stop on a closed conn
break
for delta in results.deltas:
- delta = get_entity_delta(delta)
- old_obj, new_obj = self.state.apply_delta(delta)
- await self._notify_observers(delta, old_obj, new_obj)
+ try:
+ delta = get_entity_delta(delta)
+ old_obj, new_obj = self.state.apply_delta(delta)
+ await self._notify_observers(delta, old_obj, new_obj)
+ except KeyError as e:
+ log.debug("unknown delta type: %s", e.args[0])
self._watch_received.set()
except CancelledError:
pass
(None) - starts a new machine
'lxd' - starts a new machine with one lxd container
'lxd:4' - starts a new lxd container on machine 4
- 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
+ 'ssh:user@10.10.0.3:/path/to/private/key' - manually provision
+ a machine with ssh and the private key used for authentication
'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
'maas2.name' - acquire machine maas2.name on MAAS
"""
params = client.AddMachineParams()
- params.jobs = ['JobHostUnits']
if spec:
- placement = parse_placement(spec)
- if placement:
- params.placement = placement[0]
+ if spec.startswith("ssh:"):
+ placement, target, private_key_path = spec.split(":")
+ user, host = target.split("@")
+
+ sshProvisioner = provisioner.SSHProvisioner(
+ host=host,
+ user=user,
+ private_key_path=private_key_path,
+ )
+
+ params = sshProvisioner.provision_machine()
+ else:
+ placement = parse_placement(spec)
+ if placement:
+ params.placement = placement[0]
+
+ params.jobs = ['JobHostUnits']
if constraints:
params.constraints = client.Value.from_json(constraints)
if error:
raise ValueError("Error adding machine: %s" % error.message)
machine_id = results.machines[0].machine
+
+ if spec:
+ if spec.startswith("ssh:"):
+ # Need to run this after AddMachines has been called,
+ # as we need the machine_id
+ await sshProvisioner.install_agent(
+ self.connection(),
+ params.nonce,
+ machine_id,
+ )
+
log.debug('Added new machine %s', machine_id)
return await self._wait_for_new('machine', machine_id)
:param str relation2: '<application>[:<relation_name>]'
"""
- app_facade = client.ApplicationFacade.from_connection(self.connection())
+ connection = self.connection()
+ app_facade = client.ApplicationFacade.from_connection(connection)
log.debug(
'Adding relation %s <-> %s', relation1, relation2)
(optional) list of existing subnet CIDRs with it.
:param str name: Name of the space
- :param \*cidrs: Optional list of existing subnet CIDRs
+ :param *cidrs: Optional list of existing subnet CIDRs
"""
raise NotImplementedError()
:param str cidr_or_id: CIDR or provider ID of the existing subnet
:param str space: Network space with which to associate
- :param str \*zones: Zone(s) in which the subnet resides
+ :param str *zones: Zone(s) in which the subnet resides
"""
raise NotImplementedError()
def block(self, *commands):
"""Add a new block to this model.
- :param str \*commands: The commands to block. Valid values are
+ :param str *commands: The commands to block. Valid values are
'all-changes', 'destroy-model', 'remove-object'
"""
:param str name: Name to give the storage pool
:param str provider_type: Pool provider type
- :param \*\*pool_config: key/value pool configuration pairs
+ :param **pool_config: key/value pool configuration pairs
"""
raise NotImplementedError()
self, entity_url, application_name=None, bind=None, budget=None,
channel=None, config=None, constraints=None, force=False,
num_units=1, plan=None, resources=None, series=None, storage=None,
- to=None):
+ to=None, devices=None):
"""Deploy a new service or bundle.
:param str entity_url: Charm or bundle url
if is_local:
entity_id = entity_url.replace('local:', '')
else:
- entity = await self.charmstore.entity(entity_url, channel=channel)
+ entity = await self.charmstore.entity(entity_url, channel=channel,
+ include_stats=False)
entity_id = entity['Id']
client_facade = client.ClientFacade.from_connection(self.connection())
# actually support them yet anyway
resources = await self._add_store_resources(application_name,
entity_id,
- entity)
+ entity=entity)
else:
if not application_name:
metadata = yaml.load(metadata_path.read_text())
storage=storage,
channel=channel,
num_units=num_units,
- placement=parse_placement(to)
+ placement=parse_placement(to),
+ devices=devices,
)
- async def _add_store_resources(self, application, entity_url, entity=None):
+ async def _add_store_resources(self, application, entity_url,
+ overrides=None, entity=None):
if not entity:
# avoid extra charm store call if one was already made
- entity = await self.charmstore.entity(entity_url)
+ entity = await self.charmstore.entity(entity_url,
+ include_stats=False)
resources = [
{
'description': resource['Description'],
} for resource in entity['Meta']['resources']
]
+ if overrides:
+ names = {r['name'] for r in resources}
+ unknown = overrides.keys() - names
+ if unknown:
+ raise JujuError('Unrecognized resource{}: {}'.format(
+ 's' if len(unknown) > 1 else '',
+ ', '.join(unknown)))
+ for resource in resources:
+ if resource['name'] in overrides:
+ resource['revision'] = overrides[resource['name']]
+
if not resources:
return None
async def _deploy(self, charm_url, application, series, config,
constraints, endpoint_bindings, resources, storage,
- channel=None, num_units=None, placement=None):
+ channel=None, num_units=None, placement=None,
+ devices=None):
"""Logic shared between `Model.deploy` and `BundleHandler.deploy`.
"""
log.info('Deploying %s', charm_url)
num_units=num_units,
resources=resources,
storage=storage,
- placement=placement
+ placement=placement,
+ devices=devices,
)
result = await app_facade.Deploy([app])
errors = [r.error.message for r in result.results if r.error]
"""Destroy units by name.
"""
- app_facade = client.ApplicationFacade.from_connection(self.connection())
+ connection = self.connection()
+ app_facade = client.ApplicationFacade.from_connection(connection)
log.debug(
'Destroying unit%s %s',
config[key] = ConfigValue.from_json(value)
return config
- def get_constraints(self):
+ async def get_constraints(self):
"""Return the machine constraints for this model.
+ :returns: A ``dict`` of constraints.
"""
- raise NotImplementedError()
+ constraints = {}
+ client_facade = client.ClientFacade.from_connection(self.connection())
+ result = await client_facade.GetModelConstraints()
+
+ # GetModelConstraints returns GetConstraintsResults which has a
+ # 'constraints' attribute. If no constraints have been set
+ # GetConstraintsResults.constraints is None. Otherwise
+ # GetConstraintsResults.constraints has an attribute for each possible
+ # constraint, each of these in turn will be None if they have not been
+ # set.
+ if result.constraints:
+ constraint_types = [a for a in dir(result.constraints)
+ if a in Value._toSchema.keys()]
+ for constraint in constraint_types:
+ value = getattr(result.constraints, constraint)
+ if value is not None:
+ constraints[constraint] = getattr(result.constraints,
+ constraint)
+ return constraints
def import_ssh_key(self, identity):
"""Add a public SSH key from a trusted indentity source to this model.
def remove_machine(self, *machine_ids):
"""Remove a machine from this model.
- :param str \*machine_ids: Ids of the machines to remove
+ :param str *machine_ids: Ids of the machines to remove
"""
raise NotImplementedError()
config[key] = value.value
await config_facade.ModelSet(config)
- def set_constraints(self, constraints):
+ async def set_constraints(self, constraints):
"""Set machine constraints on this model.
- :param :class:`juju.Constraints` constraints: Machine constraints
-
+ :param dict config: Mapping of model constraints
"""
- raise NotImplementedError()
+ client_facade = client.ClientFacade.from_connection(self.connection())
+ await client_facade.SetModelConstraints(
+ application='',
+ constraints=constraints)
- def get_action_output(self, action_uuid, wait=-1):
+ async def get_action_output(self, action_uuid, wait=None):
"""Get the results of an action by ID.
:param str action_uuid: Id of the action
- :param int wait: Time in seconds to wait for action to complete
-
+ :param int wait: Time in seconds to wait for action to complete.
+ :return dict: Output from action
+ :raises: :class:`JujuError` if invalid action_uuid
"""
- raise NotImplementedError()
+ action_facade = client.ActionFacade.from_connection(
+ self.connection()
+ )
+ entity = [{'tag': tag.action(action_uuid)}]
+ # Cannot use self.wait_for_action as the action event has probably
+ # already happened and self.wait_for_action works by processing
+ # model deltas and checking if they match our type. If the action
+ # has already occured then the delta has gone.
+
+ async def _wait_for_action_status():
+ while True:
+ action_output = await action_facade.Actions(entity)
+ if action_output.results[0].status in ('completed', 'failed'):
+ return
+ else:
+ await asyncio.sleep(1)
+ await asyncio.wait_for(
+ _wait_for_action_status(),
+ timeout=wait)
+ action_output = await action_facade.Actions(entity)
+ # ActionResult.output is None if the action produced no output
+ if action_output.results[0].output is None:
+ output = {}
+ else:
+ output = action_output.results[0].output
+ return output
- def get_action_status(self, uuid_or_prefix=None, name=None):
- """Get the status of all actions, filtered by ID, ID prefix, or action name.
+ async def get_action_status(self, uuid_or_prefix=None, name=None):
+ """Get the status of all actions, filtered by ID, ID prefix, or name.
:param str uuid_or_prefix: Filter by action uuid or prefix
:param str name: Filter by action name
"""
- raise NotImplementedError()
+ results = {}
+ action_results = []
+ action_facade = client.ActionFacade.from_connection(
+ self.connection()
+ )
+ if name:
+ name_results = await action_facade.FindActionsByNames([name])
+ action_results.extend(name_results.actions[0].actions)
+ if uuid_or_prefix:
+ # Collect list of actions matching uuid or prefix
+ matching_actions = await action_facade.FindActionTagsByPrefix(
+ [uuid_or_prefix])
+ entities = []
+ for actions in matching_actions.matches.values():
+ entities = [{'tag': a.tag} for a in actions]
+ # Get action results matching action tags
+ uuid_results = await action_facade.Actions(entities)
+ action_results.extend(uuid_results.results)
+ for a in action_results:
+ results[tag.untag('action-', a.action.tag)] = a.status
+ return results
def get_budget(self, budget_name):
"""Get budget usage info.
def unblock(self, *commands):
"""Unblock an operation that would alter this model.
- :param str \*commands: The commands to unblock. Valid values are
+ :param str *commands: The commands to unblock. Valid values are
'all-changes', 'destroy-model', 'remove-object'
"""
def unset_config(self, *keys):
"""Unset configuration on this model.
- :param str \*keys: The keys to unset
+ :param str *keys: The keys to unset
"""
raise NotImplementedError()
async def get_metrics(self, *tags):
"""Retrieve metrics.
- :param str \*tags: Tags of entities from which to retrieve metrics.
+ :param str *tags: Tags of entities from which to retrieve metrics.
No tags retrieves the metrics of all units in the model.
:return: Dictionary of unit_name:metrics
for unit_name, unit in model.units.items():
app_units = self._units_by_app.setdefault(unit.application, [])
app_units.append(unit_name)
+ self.bundle_facade = client.BundleFacade.from_connection(
+ model.connection())
self.client_facade = client.ClientFacade.from_connection(
model.connection())
self.app_facade = client.ApplicationFacade.from_connection(
return bundle
async def fetch_plan(self, entity_id):
- is_local = not entity_id.startswith('cs:')
+ is_store_url = entity_id.startswith('cs:')
- if is_local and os.path.isfile(entity_id):
+ if not is_store_url and os.path.isfile(entity_id):
bundle_yaml = Path(entity_id).read_text()
- elif is_local and os.path.isdir(entity_id):
+ elif not is_store_url and os.path.isdir(entity_id):
bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
else:
bundle_yaml = await self.charmstore.files(entity_id,
self.bundle = yaml.safe_load(bundle_yaml)
self.bundle = await self._handle_local_charms(self.bundle)
- self.plan = await self.client_facade.GetBundleChanges(
+ self.plan = await self.bundle_facade.GetChanges(
yaml.dump(self.bundle))
+ if self.plan.errors:
+ raise JujuError(self.plan.errors)
+
async def execute_plan(self):
for step in self.plan.changes:
method = getattr(self, step.method)
# Fix up values, as necessary.
if 'parent_id' in params:
- params['parent_id'] = self.resolve(params['parent_id'])
+ if params['parent_id'].startswith('$addUnit'):
+ unit = self.resolve(params['parent_id'])[0]
+ params['parent_id'] = unit.machine.entity_id
+ else:
+ params['parent_id'] = self.resolve(params['parent_id'])
params['constraints'] = parse_constraints(
params.get('constraints'))
return await self.model.add_relation(*endpoints)
async def deploy(self, charm, series, application, options, constraints,
- storage, endpoint_bindings, resources):
+ storage, endpoint_bindings, *args):
"""
:param charm string:
Charm holds the URL of the charm to be used to deploy this
:param endpoint_bindings map[string]string:
EndpointBindings holds the optional endpoint bindings
+ :param devices map[string]string:
+ Devices holds the optional devices constraints.
+ (Only given on Juju 2.5+)
+
:param resources map[string]int:
Resources identifies the revision to use for each resource
of the application's charm.
+
+ :param num_units int:
+ NumUnits holds the number of units required. For IAAS models, this
+ will be 0 and separate AddUnitChanges will be used. For Kubernetes
+ models, this will be used to scale the application.
+ (Only given on Juju 2.5+)
"""
# resolve indirect references
charm = self.resolve(charm)
- # the bundle plan doesn't actually do anything with resources, even
- # though it ostensibly gives us something (None) for that param
+
+ if len(args) == 1:
+ # Juju 2.4 and below only sends the resources
+ resources = args[0]
+ devices, num_units = None, None
+ else:
+ # Juju 2.5+ sends devices before resources, as well as num_units
+ # There might be placement but we need to ignore that.
+ devices, resources, num_units = args[:3]
+
if not charm.startswith('local:'):
- resources = await self.model._add_store_resources(application,
- charm)
+ resources = await self.model._add_store_resources(
+ application, charm, overrides=resources)
await self.model._deploy(
charm_url=charm,
application=application,
endpoint_bindings=endpoint_bindings,
resources=resources,
storage=storage,
+ devices=devices,
+ num_units=num_units,
)
return application
to=placement,
)
+ async def scale(self, application, scale):
+ """
+ Handle a change of scale to a k8s application.
+
+ :param string application:
+ Application holds the application placeholder name for which a unit
+ is added.
+
+ :param int scale:
+ New scale value to use.
+ """
+ application = self.resolve(application)
+ return await self.model.applications[application].scale(scale=scale)
+
async def expose(self, application):
"""
:param application string:
"""
Async wrapper around theblues.charmstore.CharmStore
"""
- def __init__(self, loop):
+ def __init__(self, loop, cs_timeout=20):
self.loop = loop
- self._cs = theblues.charmstore.CharmStore(timeout=5)
+ self._cs = theblues.charmstore.CharmStore(timeout=cs_timeout)
def __getattr__(self, name):
"""
Ignored::
- * build/\* - This is used for packing the charm itself and any
+ * build/* - This is used for packing the charm itself and any
similar tasks.
- * \*/.\* - Hidden files are all ignored for now. This will most
+ * */.* - Hidden files are all ignored for now. This will most
likely be changed into a specific ignore list
(.bzr, etc)