X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=modules%2Flibjuju%2Fjuju%2Fmodel.py;h=bd7d43077d9e238cf40d5170a99850a732c34b23;hp=ac225992fc8fd82209271d07d837e683e93aedf5;hb=b2a07f566be558a8b59b8b5dedfe8da5ae1b0132;hpb=c3e6c2ec9a1fddfc8e9bd31509b366e633b6d99e diff --git a/modules/libjuju/juju/model.py b/modules/libjuju/juju/model.py index ac22599..bd7d430 100644 --- a/modules/libjuju/juju/model.py +++ b/modules/libjuju/juju/model.py @@ -22,12 +22,15 @@ import yaml from . import tag, utils from .client import client, connector from .client.client import ConfigValue +from .client.client import Value from .constraints import parse as parse_constraints from .constraints import normalize_key from .delta import get_entity_class, get_entity_delta from .errors import JujuAPIError, JujuError from .exceptions import DeadEntityException from .placement import parse as parse_placement +from . import provisioner + log = logging.getLogger(__name__) @@ -410,7 +413,7 @@ class Model: `juju.client.connection.Connection.MAX_FRAME_SIZE` :param bakery_client httpbakery.Client: The bakery client to use for macaroon authorization. - :param jujudata JujuData: The source for current controller information. + :param jujudata JujuData: The source for current controller information """ self._connector = connector.Connector( loop=loop, @@ -458,42 +461,101 @@ class Model: async def __aexit__(self, exc_type, exc, tb): await self.disconnect() - async def connect(self, model_name=None, **kwargs): + async def connect(self, *args, **kwargs): """Connect to a juju model. - If any arguments are specified other than model_name, then - model_name must be None and an explicit connection will be made - using Connection.connect using those parameters (the 'uuid' - parameter must be specified). + This supports two calling conventions: - Otherwise, if model_name is None, connect to the current model. + The model and (optionally) authentication information can be taken + from the data files created by the Juju CLI. This convention will + be used if a ``model_name`` is specified, or if the ``endpoint`` + and ``uuid`` are not. - Otherwise, model_name must specify the name of a known - model. + Otherwise, all of the ``endpoint``, ``uuid``, and authentication + information (``username`` and ``password``, or ``bakery_client`` and/or + ``macaroons``) are required. - :param model_name: Format [controller:][user/]model + If a single positional argument is given, it will be assumed to be + the ``model_name``. Otherwise, the first positional argument, if any, + must be the ``endpoint``. + + Available parameters are: + :param model_name: Format [controller:][user/]model + :param str endpoint: The hostname:port of the controller to connect to. + :param str uuid: The model UUID to connect to. + :param str username: The username for controller-local users (or None + to use macaroon-based login.) + :param str password: The password for controller-local users. + :param str cacert: The CA certificate of the controller + (PEM formatted). + :param httpbakery.Client bakery_client: The macaroon bakery client to + to use when performing macaroon-based login. Macaroon tokens + acquired when logging will be saved to bakery_client.cookies. + If this is None, a default bakery_client will be used. + :param list macaroons: List of macaroons to load into the + ``bakery_client``. + :param asyncio.BaseEventLoop loop: The event loop to use for async + operations. + :param int max_frame_size: The maximum websocket frame size to allow. """ await self.disconnect() - if not kwargs: - await self._connector.connect_model(model_name) + if 'endpoint' not in kwargs and len(args) < 2: + if args and 'model_name' in kwargs: + raise TypeError('connect() got multiple values for model_name') + elif args: + model_name = args[0] + else: + model_name = kwargs.pop('model_name', None) + await self._connector.connect_model(model_name, **kwargs) else: - if kwargs.get('uuid') is None: - raise ValueError('no UUID specified when connecting to model') + if 'model_name' in kwargs: + raise TypeError('connect() got values for both ' + 'model_name and endpoint') + if args and 'endpoint' in kwargs: + raise TypeError('connect() got multiple values for endpoint') + if len(args) < 2 and 'uuid' not in kwargs: + raise TypeError('connect() missing value for uuid') + has_userpass = (len(args) >= 4 or + {'username', 'password'}.issubset(kwargs)) + has_macaroons = (len(args) >= 6 or not + {'bakery_client', 'macaroons'}.isdisjoint(kwargs)) + if not (has_userpass or has_macaroons): + raise TypeError('connect() missing auth params') + arg_names = [ + 'endpoint', + 'uuid', + 'username', + 'password', + 'cacert', + 'bakery_client', + 'macaroons', + 'loop', + 'max_frame_size', + ] + for i, arg in enumerate(args): + kwargs[arg_names[i]] = arg + if not {'endpoint', 'uuid'}.issubset(kwargs): + raise ValueError('endpoint and uuid are required ' + 'if model_name not given') + if not ({'username', 'password'}.issubset(kwargs) or + {'bakery_client', 'macaroons'}.intersection(kwargs)): + raise ValueError('Authentication parameters are required ' + 'if model_name not given') await self._connector.connect(**kwargs) await self._after_connect() async def connect_model(self, model_name): """ .. deprecated:: 0.6.2 - Use connect(model_name=model_name) instead. + Use ``connect(model_name=model_name)`` instead. """ return await self.connect(model_name=model_name) async def connect_current(self): """ .. deprecated:: 0.6.2 - Use connect instead. + Use ``connect()`` instead. """ return await self.connect() @@ -528,7 +590,7 @@ class Model: if self.is_connected(): log.debug('Closing model connection') await self._connector.disconnect() - self.info = None + self._info = None async def add_local_charm_dir(self, charm_dir, series): """Upload a local charm to the model. @@ -675,11 +737,19 @@ class Model: """ facade = client.ClientFacade.from_connection(self.connection()) - self.info = await facade.ModelInfo() + self._info = await facade.ModelInfo() log.debug('Got ModelInfo: %s', vars(self.info)) return self.info + @property + def info(self): + """Return the cached client.ModelInfo object for this Model. + + If Model.get_info() has not been called, this will return None. + """ + return self._info + def add_observer( self, callable_, entity_type=None, action=None, entity_id=None, predicate=None): @@ -735,7 +805,7 @@ class Model: results = await utils.run_with_interrupt( allwatcher.Next(), self._watch_stopping, - self._connector.loop) + loop=self._connector.loop) except JujuAPIError as e: if 'watcher was stopped' not in str(e): raise @@ -880,7 +950,8 @@ class Model: (None) - starts a new machine 'lxd' - starts a new machine with one lxd container 'lxd:4' - starts a new lxd container on machine 4 - 'ssh:user@10.10.0.3' - manually provisions a machine with ssh + 'ssh:user@10.10.0.3:/path/to/private/key' - manually provision + a machine with ssh and the private key used for authentication 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS 'maas2.name' - acquire machine maas2.name on MAAS @@ -929,12 +1000,25 @@ class Model: """ params = client.AddMachineParams() - params.jobs = ['JobHostUnits'] if spec: - placement = parse_placement(spec) - if placement: - params.placement = placement[0] + if spec.startswith("ssh:"): + placement, target, private_key_path = spec.split(":") + user, host = target.split("@") + + sshProvisioner = provisioner.SSHProvisioner( + host=host, + user=user, + private_key_path=private_key_path, + ) + + params = sshProvisioner.provision_machine() + else: + placement = parse_placement(spec) + if placement: + params.placement = placement[0] + + params.jobs = ['JobHostUnits'] if constraints: params.constraints = client.Value.from_json(constraints) @@ -953,6 +1037,17 @@ class Model: if error: raise ValueError("Error adding machine: %s" % error.message) machine_id = results.machines[0].machine + + if spec: + if spec.startswith("ssh:"): + # Need to run this after AddMachines has been called, + # as we need the machine_id + await sshProvisioner.install_agent( + self.connection(), + params.nonce, + machine_id, + ) + log.debug('Added new machine %s', machine_id) return await self._wait_for_new('machine', machine_id) @@ -963,7 +1058,8 @@ class Model: :param str relation2: '[:]' """ - app_facade = client.ApplicationFacade.from_connection(self.connection()) + connection = self.connection() + app_facade = client.ApplicationFacade.from_connection(connection) log.debug( 'Adding relation %s <-> %s', relation1, relation2) @@ -998,7 +1094,7 @@ class Model: (optional) list of existing subnet CIDRs with it. :param str name: Name of the space - :param \*cidrs: Optional list of existing subnet CIDRs + :param *cidrs: Optional list of existing subnet CIDRs """ raise NotImplementedError() @@ -1019,7 +1115,7 @@ class Model: :param str cidr_or_id: CIDR or provider ID of the existing subnet :param str space: Network space with which to associate - :param str \*zones: Zone(s) in which the subnet resides + :param str *zones: Zone(s) in which the subnet resides """ raise NotImplementedError() @@ -1033,7 +1129,7 @@ class Model: def block(self, *commands): """Add a new block to this model. - :param str \*commands: The commands to block. Valid values are + :param str *commands: The commands to block. Valid values are 'all-changes', 'destroy-model', 'remove-object' """ @@ -1070,7 +1166,7 @@ class Model: :param str name: Name to give the storage pool :param str provider_type: Pool provider type - :param \*\*pool_config: key/value pool configuration pairs + :param **pool_config: key/value pool configuration pairs """ raise NotImplementedError() @@ -1119,7 +1215,7 @@ class Model: self, entity_url, application_name=None, bind=None, budget=None, channel=None, config=None, constraints=None, force=False, num_units=1, plan=None, resources=None, series=None, storage=None, - to=None): + to=None, devices=None): """Deploy a new service or bundle. :param str entity_url: Charm or bundle url @@ -1170,7 +1266,8 @@ class Model: if is_local: entity_id = entity_url.replace('local:', '') else: - entity = await self.charmstore.entity(entity_url, channel=channel) + entity = await self.charmstore.entity(entity_url, channel=channel, + include_stats=False) entity_id = entity['Id'] client_facade = client.ClientFacade.from_connection(self.connection()) @@ -1208,7 +1305,7 @@ class Model: # actually support them yet anyway resources = await self._add_store_resources(application_name, entity_id, - entity) + entity=entity) else: if not application_name: metadata = yaml.load(metadata_path.read_text()) @@ -1234,13 +1331,16 @@ class Model: storage=storage, channel=channel, num_units=num_units, - placement=parse_placement(to) + placement=parse_placement(to), + devices=devices, ) - async def _add_store_resources(self, application, entity_url, entity=None): + async def _add_store_resources(self, application, entity_url, + overrides=None, entity=None): if not entity: # avoid extra charm store call if one was already made - entity = await self.charmstore.entity(entity_url) + entity = await self.charmstore.entity(entity_url, + include_stats=False) resources = [ { 'description': resource['Description'], @@ -1254,6 +1354,17 @@ class Model: } for resource in entity['Meta']['resources'] ] + if overrides: + names = {r['name'] for r in resources} + unknown = overrides.keys() - names + if unknown: + raise JujuError('Unrecognized resource{}: {}'.format( + 's' if len(unknown) > 1 else '', + ', '.join(unknown))) + for resource in resources: + if resource['name'] in overrides: + resource['revision'] = overrides[resource['name']] + if not resources: return None @@ -1270,7 +1381,8 @@ class Model: async def _deploy(self, charm_url, application, series, config, constraints, endpoint_bindings, resources, storage, - channel=None, num_units=None, placement=None): + channel=None, num_units=None, placement=None, + devices=None): """Logic shared between `Model.deploy` and `BundleHandler.deploy`. """ log.info('Deploying %s', charm_url) @@ -1294,7 +1406,8 @@ class Model: num_units=num_units, resources=resources, storage=storage, - placement=placement + placement=placement, + devices=devices, ) result = await app_facade.Deploy([app]) errors = [r.error.message for r in result.results if r.error] @@ -1312,7 +1425,8 @@ class Model: """Destroy units by name. """ - app_facade = client.ApplicationFacade.from_connection(self.connection()) + connection = self.connection() + app_facade = client.ApplicationFacade.from_connection(connection) log.debug( 'Destroying unit%s %s', @@ -1365,11 +1479,30 @@ class Model: config[key] = ConfigValue.from_json(value) return config - def get_constraints(self): + async def get_constraints(self): """Return the machine constraints for this model. + :returns: A ``dict`` of constraints. """ - raise NotImplementedError() + constraints = {} + client_facade = client.ClientFacade.from_connection(self.connection()) + result = await client_facade.GetModelConstraints() + + # GetModelConstraints returns GetConstraintsResults which has a + # 'constraints' attribute. If no constraints have been set + # GetConstraintsResults.constraints is None. Otherwise + # GetConstraintsResults.constraints has an attribute for each possible + # constraint, each of these in turn will be None if they have not been + # set. + if result.constraints: + constraint_types = [a for a in dir(result.constraints) + if a in Value._toSchema.keys()] + for constraint in constraint_types: + value = getattr(result.constraints, constraint) + if value is not None: + constraints[constraint] = getattr(result.constraints, + constraint) + return constraints def import_ssh_key(self, identity): """Add a public SSH key from a trusted indentity source to this model. @@ -1464,7 +1597,7 @@ class Model: def remove_machine(self, *machine_ids): """Remove a machine from this model. - :param str \*machine_ids: Ids of the machines to remove + :param str *machine_ids: Ids of the machines to remove """ raise NotImplementedError() @@ -1528,31 +1661,79 @@ class Model: config[key] = value.value await config_facade.ModelSet(config) - def set_constraints(self, constraints): + async def set_constraints(self, constraints): """Set machine constraints on this model. - :param :class:`juju.Constraints` constraints: Machine constraints - + :param dict config: Mapping of model constraints """ - raise NotImplementedError() + client_facade = client.ClientFacade.from_connection(self.connection()) + await client_facade.SetModelConstraints( + application='', + constraints=constraints) - def get_action_output(self, action_uuid, wait=-1): + async def get_action_output(self, action_uuid, wait=None): """Get the results of an action by ID. :param str action_uuid: Id of the action - :param int wait: Time in seconds to wait for action to complete - + :param int wait: Time in seconds to wait for action to complete. + :return dict: Output from action + :raises: :class:`JujuError` if invalid action_uuid """ - raise NotImplementedError() + action_facade = client.ActionFacade.from_connection( + self.connection() + ) + entity = [{'tag': tag.action(action_uuid)}] + # Cannot use self.wait_for_action as the action event has probably + # already happened and self.wait_for_action works by processing + # model deltas and checking if they match our type. If the action + # has already occured then the delta has gone. + + async def _wait_for_action_status(): + while True: + action_output = await action_facade.Actions(entity) + if action_output.results[0].status in ('completed', 'failed'): + return + else: + await asyncio.sleep(1) + await asyncio.wait_for( + _wait_for_action_status(), + timeout=wait) + action_output = await action_facade.Actions(entity) + # ActionResult.output is None if the action produced no output + if action_output.results[0].output is None: + output = {} + else: + output = action_output.results[0].output + return output - def get_action_status(self, uuid_or_prefix=None, name=None): - """Get the status of all actions, filtered by ID, ID prefix, or action name. + async def get_action_status(self, uuid_or_prefix=None, name=None): + """Get the status of all actions, filtered by ID, ID prefix, or name. :param str uuid_or_prefix: Filter by action uuid or prefix :param str name: Filter by action name """ - raise NotImplementedError() + results = {} + action_results = [] + action_facade = client.ActionFacade.from_connection( + self.connection() + ) + if name: + name_results = await action_facade.FindActionsByNames([name]) + action_results.extend(name_results.actions[0].actions) + if uuid_or_prefix: + # Collect list of actions matching uuid or prefix + matching_actions = await action_facade.FindActionTagsByPrefix( + [uuid_or_prefix]) + entities = [] + for actions in matching_actions.matches.values(): + entities = [{'tag': a.tag} for a in actions] + # Get action results matching action tags + uuid_results = await action_facade.Actions(entities) + action_results.extend(uuid_results.results) + for a in action_results: + results[tag.untag('action-', a.action.tag)] = a.status + return results def get_budget(self, budget_name): """Get budget usage info. @@ -1593,7 +1774,7 @@ class Model: def unblock(self, *commands): """Unblock an operation that would alter this model. - :param str \*commands: The commands to unblock. Valid values are + :param str *commands: The commands to unblock. Valid values are 'all-changes', 'destroy-model', 'remove-object' """ @@ -1602,7 +1783,7 @@ class Model: def unset_config(self, *keys): """Unset configuration on this model. - :param str \*keys: The keys to unset + :param str *keys: The keys to unset """ raise NotImplementedError() @@ -1642,7 +1823,7 @@ class Model: async def get_metrics(self, *tags): """Retrieve metrics. - :param str \*tags: Tags of entities from which to retrieve metrics. + :param str *tags: Tags of entities from which to retrieve metrics. No tags retrieves the metrics of all units in the model. :return: Dictionary of unit_name:metrics @@ -1701,6 +1882,8 @@ class BundleHandler: for unit_name, unit in model.units.items(): app_units = self._units_by_app.setdefault(unit.application, []) app_units.append(unit_name) + self.bundle_facade = client.BundleFacade.from_connection( + model.connection()) self.client_facade = client.ClientFacade.from_connection( model.connection()) self.app_facade = client.ApplicationFacade.from_connection( @@ -1758,11 +1941,11 @@ class BundleHandler: return bundle async def fetch_plan(self, entity_id): - is_local = not entity_id.startswith('cs:') + is_store_url = entity_id.startswith('cs:') - if is_local and os.path.isfile(entity_id): + if not is_store_url and os.path.isfile(entity_id): bundle_yaml = Path(entity_id).read_text() - elif is_local and os.path.isdir(entity_id): + elif not is_store_url and os.path.isdir(entity_id): bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text() else: bundle_yaml = await self.charmstore.files(entity_id, @@ -1771,9 +1954,12 @@ class BundleHandler: self.bundle = yaml.safe_load(bundle_yaml) self.bundle = await self._handle_local_charms(self.bundle) - self.plan = await self.client_facade.GetBundleChanges( + self.plan = await self.bundle_facade.GetChanges( yaml.dump(self.bundle)) + if self.plan.errors: + raise JujuError(self.plan.errors) + async def execute_plan(self): for step in self.plan.changes: method = getattr(self, step.method) @@ -1839,7 +2025,11 @@ class BundleHandler: # Fix up values, as necessary. if 'parent_id' in params: - params['parent_id'] = self.resolve(params['parent_id']) + if params['parent_id'].startswith('$addUnit'): + unit = self.resolve(params['parent_id'])[0] + params['parent_id'] = unit.machine.entity_id + else: + params['parent_id'] = self.resolve(params['parent_id']) params['constraints'] = parse_constraints( params.get('constraints')) @@ -1880,7 +2070,7 @@ class BundleHandler: return await self.model.add_relation(*endpoints) async def deploy(self, charm, series, application, options, constraints, - storage, endpoint_bindings, resources): + storage, endpoint_bindings, *args): """ :param charm string: Charm holds the URL of the charm to be used to deploy this @@ -1905,17 +2095,35 @@ class BundleHandler: :param endpoint_bindings map[string]string: EndpointBindings holds the optional endpoint bindings + :param devices map[string]string: + Devices holds the optional devices constraints. + (Only given on Juju 2.5+) + :param resources map[string]int: Resources identifies the revision to use for each resource of the application's charm. + + :param num_units int: + NumUnits holds the number of units required. For IAAS models, this + will be 0 and separate AddUnitChanges will be used. For Kubernetes + models, this will be used to scale the application. + (Only given on Juju 2.5+) """ # resolve indirect references charm = self.resolve(charm) - # the bundle plan doesn't actually do anything with resources, even - # though it ostensibly gives us something (None) for that param + + if len(args) == 1: + # Juju 2.4 and below only sends the resources + resources = args[0] + devices, num_units = None, None + else: + # Juju 2.5+ sends devices before resources, as well as num_units + # There might be placement but we need to ignore that. + devices, resources, num_units = args[:3] + if not charm.startswith('local:'): - resources = await self.model._add_store_resources(application, - charm) + resources = await self.model._add_store_resources( + application, charm, overrides=resources) await self.model._deploy( charm_url=charm, application=application, @@ -1925,6 +2133,8 @@ class BundleHandler: endpoint_bindings=endpoint_bindings, resources=resources, storage=storage, + devices=devices, + num_units=num_units, ) return application @@ -1956,6 +2166,20 @@ class BundleHandler: to=placement, ) + async def scale(self, application, scale): + """ + Handle a change of scale to a k8s application. + + :param string application: + Application holds the application placeholder name for which a unit + is added. + + :param int scale: + New scale value to use. + """ + application = self.resolve(application) + return await self.model.applications[application].scale(scale=scale) + async def expose(self, application): """ :param application string: @@ -1991,9 +2215,9 @@ class CharmStore: """ Async wrapper around theblues.charmstore.CharmStore """ - def __init__(self, loop): + def __init__(self, loop, cs_timeout=20): self.loop = loop - self._cs = theblues.charmstore.CharmStore(timeout=5) + self._cs = theblues.charmstore.CharmStore(timeout=cs_timeout) def __getattr__(self, name): """ @@ -2036,9 +2260,9 @@ class CharmArchiveGenerator: Ignored:: - * build/\* - This is used for packing the charm itself and any + * build/* - This is used for packing the charm itself and any similar tasks. - * \*/.\* - Hidden files are all ignored for now. This will most + * */.* - Hidden files are all ignored for now. This will most likely be changed into a specific ignore list (.bzr, etc)