X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=ef6bc85115c3e213938bebec3c8fce31cf07263d;hb=5fef7503b13f145adc7cd4ee31c2d684e09a6a85;hp=09225209f519ce92cc6a0d9a5c82840122dde4a2;hpb=ea587bcc29128652a98b0a7c1327dd3336f4f484;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index 0922520..ef6bc85 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,10 +1,16 @@ import asyncio import collections +import json import logging +import os import re +import stat +import tempfile import weakref +import zipfile from concurrent.futures import CancelledError from functools import partial +from pathlib import Path import yaml from theblues import charmstore @@ -12,11 +18,12 @@ from theblues import charmstore from .client import client from .client import watcher from .client import connection -from .constraints import parse as parse_constraints +from .constraints import parse as parse_constraints, normalize_key from .delta import get_entity_delta from .delta import get_entity_class from .exceptions import DeadEntityException -from .errors import JujuAPIError +from .errors import JujuError, JujuAPIError +from .placement import parse as parse_placement log = logging.getLogger(__name__) @@ -72,6 +79,14 @@ class ModelObserver(object): await method(delta, old, new, model) async def on_change(self, delta, old, new, model): + """Generic model-change handler. + + :param delta: :class:`juju.client.overrides.Delta` + :param old: :class:`juju.model.ModelEntity` + :param new: :class:`juju.model.ModelEntity` + :param model: :class:`juju.model.Model` + + """ pass @@ -205,18 +220,16 @@ class ModelEntity(object): self.connected = connected self.connection = model.connection + def __repr__(self): + return '<{} entity_id="{}">'.format(type(self).__name__, + self.entity_id) + def __getattr__(self, name): """Fetch object attributes from the underlying data dict held in the model. """ - if self.data is None: - raise DeadEntityException( - "Entity {}:{} is dead - its attributes can no longer be " - "accessed. Use the .previous() method on this object to get " - "a copy of the object at its previous state.".format( - self.entity_type, self.entity_id)) - return self.data[name] + return self.safe_data[name] def __bool__(self): return bool(self.data) @@ -283,6 +296,22 @@ class ModelEntity(object): return self.model.state.entity_data( self.entity_type, self.entity_id, self._history_index) + @property + def safe_data(self): + """The data dictionary for this entity. + + If this `ModelEntity` points to the dead state, it will + raise `DeadEntityException`. + + """ + if self.data is None: + raise DeadEntityException( + "Entity {}:{} is dead - its attributes can no longer be " + "accessed. Use the .previous() method on this object to get " + "a copy of the object at its previous state.".format( + self.entity_type, self.entity_id)) + return self.data + def previous(self): """Return a copy of this object as was at its previous state in history. @@ -345,6 +374,7 @@ class Model(object): self.connection = None self.observers = weakref.WeakValueDictionary() self.state = ModelState(self) + self.info = None self._watcher_task = None self._watch_shutdown = asyncio.Event(loop=loop) self._watch_received = asyncio.Event(loop=loop) @@ -357,25 +387,31 @@ class Model(object): """ self.connection = await connection.Connection.connect(*args, **kw) - self._watch() - await self._watch_received.wait() + await self._after_connect() async def connect_current(self): """Connect to the current Juju model. """ self.connection = await connection.Connection.connect_current() - self._watch() - await self._watch_received.wait() + await self._after_connect() + + async def connect_model(self, model_name): + """Connect to a specific Juju model by name. - async def connect_model(self, arg): - """Connect to a specific Juju model. - :param arg: : + :param model_name: Format [controller:][user/]model + + """ + self.connection = await connection.Connection.connect_model(model_name) + await self._after_connect() + + async def _after_connect(self): + """Run initialization steps after connecting to websocket. """ - self.connection = await connection.Connection.connect_model(arg) self._watch() await self._watch_received.wait() + await self.get_info() async def disconnect(self): """Shut down the watcher task and close websockets. @@ -388,6 +424,59 @@ class Model(object): await self.connection.close() self.connection = None + async def add_local_charm_dir(self, charm_dir, series): + """Upload a local charm to the model. + + This will automatically generate an archive from + the charm dir. + + :param charm_dir: Path to the charm directory + :param series: Charm series + + """ + fh = tempfile.NamedTemporaryFile() + CharmArchiveGenerator(charm_dir).make_archive(fh.name) + with fh: + func = partial( + self.add_local_charm, fh, series, os.stat(fh.name).st_size) + charm_url = await self.loop.run_in_executor(None, func) + + log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url) + return charm_url + + def add_local_charm(self, charm_file, series, size=None): + """Upload a local charm archive to the model. + + Returns the 'local:...' url that should be used to deploy the charm. + + :param charm_file: Path to charm zip archive + :param series: Charm series + :param size: Size of the archive, in bytes + :return str: 'local:...' url for deploying the charm + :raises: :class:`JujuError` if the upload fails + + Uses an https endpoint at the same host:port as the wss. + Supports large file uploads. + + .. warning:: + + This method will block. Consider using :meth:`add_local_charm_dir` + instead. + + """ + conn, headers, path_prefix = self.connection.https_connection() + path = "%s/charms?series=%s" % (path_prefix, series) + headers['Content-Type'] = 'application/zip' + if size: + headers['Content-Length'] = size + conn.request("POST", path, charm_file, headers) + response = conn.getresponse() + result = response.read().decode() + if not response.status == 200: + raise JujuError(result) + result = json.loads(result) + return result['charm-url'] + def all_units_idle(self): """Return True if all units are idle. @@ -416,13 +505,13 @@ class Model(object): lambda: len(self.machines) == 0 ) - async def block_until(self, *conditions, timeout=None): + async def block_until(self, *conditions, timeout=None, wait_period=0.5): """Return only after all conditions are true. """ async def _block(): while not all(c() for c in conditions): - await asyncio.sleep(0) + await asyncio.sleep(wait_period) await asyncio.wait_for(_block(), timeout) @property @@ -449,13 +538,34 @@ class Model(object): """ return self.state.units + async def get_info(self): + """Return a client.ModelInfo object for this Model. + + Retrieves latest info for this Model from the api server. The + return value is cached on the Model.info attribute so that the + valued may be accessed again without another api call, if + desired. + + This method is called automatically when the Model is connected, + resulting in Model.info being initialized without requiring an + explicit call to this method. + + """ + facade = client.ClientFacade() + facade.connect(self.connection) + + self.info = await facade.ModelInfo() + log.debug('Got ModelInfo: %s', vars(self.info)) + + return self.info + def add_observer( self, callable_, entity_type=None, action=None, entity_id=None, predicate=None): """Register an "on-model-change" callback Once the model is connected, ``callable_`` - will be called each time the model changes. callable_ should + will be called each time the model changes. ``callable_`` should be Awaitable and accept the following positional arguments: delta - An instance of :class:`juju.delta.EntityDelta` @@ -474,14 +584,15 @@ class Model(object): model - The :class:`Model` itself. Events for which ``callable_`` is called can be specified by passing - entity_type, action, and/or id_ filter criteria, e.g.: + entity_type, action, and/or entitiy_id filter criteria, e.g.:: add_observer( - myfunc, entity_type='application', action='add', id_='ubuntu') + myfunc, + entity_type='application', action='add', entity_id='ubuntu') For more complex filtering conditions, pass a predicate function. It will be called with a delta as its only argument. If the predicate - function returns True, the callable_ will be called. + function returns True, the ``callable_`` will be called. """ observer = _Observer( @@ -577,15 +688,24 @@ class Model(object): entity_id = await q.get() return self.state._live_entity_map(entity_type)[entity_id] - async def _wait_for_new(self, entity_type, entity_id, predicate=None): + async def _wait_for_new(self, entity_type, entity_id=None, predicate=None): """Wait for a new object to appear in the Model and return it. Waits for an object of type ``entity_type`` with id ``entity_id``. + If ``entity_id`` is ``None``, it will wait for the first new entity + of the correct type. This coroutine blocks until the new object appears in the model. """ - return await self._wait(entity_type, entity_id, 'add', predicate) + # if the entity is already in the model, just return it + if entity_id in self.state._live_entity_map(entity_type): + return self.state._live_entity_map(entity_type)[entity_id] + # if we know the entity_id, we can trigger on any action that puts + # the enitty into the model; otherwise, we have to watch for the + # next "add" action on that entity_type + action = 'add' if entity_id is None else None + return await self._wait(entity_type, entity_id, action, predicate) async def wait_for_action(self, action_id): """Given an action, wait for it to complete.""" @@ -600,9 +720,8 @@ class Model(object): return await self._wait('action', action_id, 'change', predicate) - def add_machine( - self, spec=None, constraints=None, disks=None, series=None, - count=1): + async def add_machine( + self, spec=None, constraints=None, disks=None, series=None): """Start a new, empty machine and optionally a container, or add a container to a machine. @@ -610,25 +729,64 @@ class Model(object): Examples:: (None) - starts a new machine - 'lxc' - starts a new machine with on lxc container - 'lxc:4' - starts a new lxc container on machine 4 + 'lxd' - starts a new machine with one lxd container + 'lxd:4' - starts a new lxd container on machine 4 'ssh:user@10.10.0.3' - manually provisions a machine with ssh 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS 'maas2.name' - acquire machine maas2.name on MAAS - :param constraints: Machine constraints - :type constraints: :class:`juju.Constraints` - :param list disks: List of disk :class:`constraints ` - :param str series: Series - :param int count: Number of machines to deploy - Supported container types are: lxc, lxd, kvm + :param dict constraints: Machine constraints + Example:: + + constraints={ + 'mem': 256 * MB, + } + + :param list disks: List of disk constraint dictionaries + Example:: + + disks=[{ + 'pool': 'rootfs', + 'size': 10 * GB, + 'count': 1, + }] + + :param str series: Series, e.g. 'xenial' + + Supported container types are: lxd, kvm When deploying a container to an existing machine, constraints cannot be used. """ - pass - add_machines = add_machine + params = client.AddMachineParams() + params.jobs = ['JobHostUnits'] + + if spec: + placement = parse_placement(spec) + if placement: + params.placement = placement[0] + + if constraints: + params.constraints = client.Value.from_json(constraints) + + if disks: + params.disks = [ + client.Constraints.from_json(o) for o in disks] + + if series: + params.series = series + + # Submit the request. + client_facade = client.ClientFacade() + client_facade.connect(self.connection) + results = await client_facade.AddMachines([params]) + error = results.machines[0].error + if error: + raise ValueError("Error adding machine: %s", error.message) + machine_id = results.machines[0].machine + log.debug('Added new machine %s', machine_id) + return await self._wait_for_new('machine', machine_id) async def add_relation(self, relation1, relation2): """Add a relation between two applications. @@ -769,14 +927,14 @@ class Model(object): pass async def deploy( - self, entity_url, service_name=None, bind=None, budget=None, + self, entity_url, application_name=None, bind=None, budget=None, channel=None, config=None, constraints=None, force=False, num_units=1, plan=None, resources=None, series=None, storage=None, to=None): """Deploy a new service or bundle. :param str entity_url: Charm or bundle url - :param str service_name: Name to give the service + :param str application_name: Name to give the service :param dict bind: : pairs :param dict budget: : pairs :param str channel: Charm store channel from which to retrieve @@ -791,26 +949,24 @@ class Model(object): :param dict resources: : pairs :param str series: Series on which to deploy :param dict storage: Storage constraints TODO how do these look? - :param str to: Placement directive, e.g.:: + :param to: Placement directive as a string. For example: - '23' - machine 23 - 'lxc:7' - new lxc container on machine 7 - '24/lxc/3' - lxc container 3 or machine 24 + '23' - place on machine 23 + 'lxd:7' - place in new lxd container on machine 7 + '24/lxd/3' - place in container 3 on machine 24 If None, a new machine is provisioned. TODO:: - - service_name is required; fill this in automatically if not + - application_name is required; fill this in automatically if not provided by caller - series is required; how do we pick a default? """ if to: - placement = [ - client.Placement(**p) for p in to - ] + placement = parse_placement(to) else: placement = [] @@ -820,14 +976,23 @@ class Model(object): for k, v in storage.items() } - entity_id = await self.charmstore.entityId(entity_url) + is_local = ( + entity_url.startswith('local:') or + os.path.isdir(entity_url) + ) + entity_id = await self.charmstore.entityId(entity_url) \ + if not is_local else entity_url app_facade = client.ApplicationFacade() client_facade = client.ClientFacade() app_facade.connect(self.connection) client_facade.connect(self.connection) - if 'bundle/' in entity_id: + is_bundle = ((is_local and + (Path(entity_id) / 'bundle.yaml').exists()) or + (not is_local and 'bundle/' in entity_id)) + + if is_bundle: handler = BundleHandler(self) await handler.fetch_plan(entity_id) await handler.execute_plan() @@ -838,7 +1003,7 @@ class Model(object): # haven't made it yet we'll need to wait on them to be added await asyncio.gather(*[ asyncio.ensure_future( - self.model._wait_for_new('application', app_name)) + self._wait_for_new('application', app_name)) for app_name in pending_apps ]) return [app for name, app in self.applications.items() @@ -847,23 +1012,39 @@ class Model(object): log.debug( 'Deploying %s', entity_id) - await client_facade.AddCharm(channel, entity_id) + if not is_local: + await client_facade.AddCharm(channel, entity_id) + elif not entity_id.startswith('local:'): + # We have a local charm dir that needs to be uploaded + charm_dir = os.path.abspath( + os.path.expanduser(entity_id)) + series = series or get_charm_series(charm_dir) + if not series: + raise JujuError( + "Couldn't determine series for charm at {}. " + "Pass a 'series' kwarg to Model.deploy().".format( + charm_dir)) + entity_id = await self.add_local_charm_dir(charm_dir, series) + app = client.ApplicationDeploy( - application=service_name, + application=application_name, channel=channel, charm_url=entity_id, config=config, - constraints=constraints, + constraints=parse_constraints(constraints), endpoint_bindings=bind, num_units=num_units, - placement=placement, resources=resources, series=series, storage=storage, ) + app.placement = placement - await app_facade.Deploy([app]) - return await self._wait_for_new('application', service_name) + result = await app_facade.Deploy([app]) + errors = [r.error.message for r in result.results if r.error] + if errors: + raise JujuError('\n'.join(errors)) + return await self._wait_for_new('application', application_name) def destroy(self): """Terminate all machines and resources for this model. @@ -883,7 +1064,7 @@ class Model(object): 's' if len(unit_names) == 1 else '', ' '.join(unit_names)) - return await app_facade.Destroy(self.name) + return await app_facade.DestroyUnits(list(unit_names)) destroy_units = destroy_unit def get_backup(self, archive_id): @@ -1198,6 +1379,53 @@ class Model(object): def charmstore(self): return self._charmstore + async def get_metrics(self, *tags): + """Retrieve metrics. + + :param str \*tags: Tags of entities from which to retrieve metrics. + No tags retrieves the metrics of all units in the model. + :return: Dictionary of unit_name:metrics + + """ + log.debug("Retrieving metrics for %s", + ', '.join(tags) if tags else "all units") + + metrics_facade = client.MetricsDebugFacade() + metrics_facade.connect(self.connection) + + entities = [client.Entity(tag) for tag in tags] + metrics_result = await metrics_facade.GetMetrics(entities) + + metrics = collections.defaultdict(list) + + for entity_metrics in metrics_result.results: + error = entity_metrics.error + if error: + if "is not a valid tag" in error: + raise ValueError(error.message) + else: + raise Exception(error.message) + + for metric in entity_metrics.metrics: + metrics[metric.unit].append(vars(metric)) + + return metrics + + +def get_charm_series(path): + """Inspects the charm directory at ``path`` and returns a default + series from its metadata.yaml (the first item in the 'series' list). + + Returns None if no series can be determined. + + """ + md = Path(path) / "metadata.yaml" + if not md.exists(): + return None + data = yaml.load(md.open()) + series = data.get('series') + return series[0] if series else None + class BundleHandler(object): """ @@ -1220,12 +1448,70 @@ class BundleHandler(object): self.ann_facade = client.AnnotationsFacade() self.ann_facade.connect(model.connection) + async def _handle_local_charms(self, bundle): + """Search for references to local charms (i.e. filesystem paths) + in the bundle. Upload the local charms to the model, and replace + the filesystem paths with appropriate 'local:' paths in the bundle. + + Return the modified bundle. + + :param dict bundle: Bundle dictionary + :return: Modified bundle dictionary + + """ + apps, args = [], [] + + default_series = bundle.get('series') + for app_name in self.applications: + app_dict = bundle['services'][app_name] + charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm'])) + if not os.path.isdir(charm_dir): + continue + series = ( + app_dict.get('series') or + default_series or + get_charm_series(charm_dir) + ) + if not series: + raise JujuError( + "Couldn't determine series for charm at {}. " + "Add a 'series' key to the bundle.".format(charm_dir)) + + # Keep track of what we need to update. We keep a list of apps + # that need to be updated, and a corresponding list of args + # needed to update those apps. + apps.append(app_name) + args.append((charm_dir, series)) + + if apps: + # If we have apps to update, spawn all the coroutines concurrently + # and wait for them to finish. + charm_urls = await asyncio.gather(*[ + self.model.add_local_charm_dir(*params) + for params in args + ]) + # Update the 'charm:' entry for each app with the new 'local:' url. + for app_name, charm_url in zip(apps, charm_urls): + bundle['services'][app_name]['charm'] = charm_url + + return bundle + async def fetch_plan(self, entity_id): - bundle_yaml = await self.charmstore.files(entity_id, - filename='bundle.yaml', - read_file=True) + is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id) + if is_local: + bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text() + else: + bundle_yaml = await self.charmstore.files(entity_id, + filename='bundle.yaml', + read_file=True) self.bundle = yaml.safe_load(bundle_yaml) - self.plan = await self.client_facade.GetBundleChanges(bundle_yaml) + self.bundle = await self._handle_local_charms(self.bundle) + + self.plan = await self.client_facade.GetBundleChanges( + yaml.dump(self.bundle)) + + if self.plan.errors: + raise JujuError('\n'.join(self.plan.errors)) async def execute_plan(self): for step in self.plan.changes: @@ -1251,28 +1537,44 @@ class BundleHandler(object): Series holds the series of the charm to be added if the charm default is not sufficient. """ + # We don't add local charms because they've already been added + # by self._handle_local_charms + if charm.startswith('local:'): + return charm + entity_id = await self.charmstore.entityId(charm) log.debug('Adding %s', entity_id) await self.client_facade.AddCharm(None, entity_id) return entity_id - async def addMachines(self, params): + async def addMachines(self, params=None): """ :param params dict: - Dictionary specifying the machine to add. Keys include: + Dictionary specifying the machine to add. All keys are optional. + Keys include: series: string specifying the machine OS series. - constraints: string holding optional machine constraints. We'll + + constraints: string holding machine constraints, if any. We'll parse this into the json friendly dict that the juju api expects. - Container_type: string holding the type of the container (for - instance ""lxc" or kvm"). It is not specified for top level + + container_type: string holding the type of the container (for + instance ""lxd" or kvm"). It is not specified for top level machines. + parent_id: string holding a placeholder pointing to another machine change or to a unit change. This value is only specified in the case this machine is a container, in which case also ContainerType is set. + """ + params = params or {} + + # Normalize keys + params = {normalize_key(k): params[k] for k in params.keys()} + + # Fix up values, as necessary. if 'parent_id' in params: params['parent_id'] = self.resolve(params['parent_id']) @@ -1280,6 +1582,12 @@ class BundleHandler(object): params.get('constraints')) params['jobs'] = params.get('jobs', ['JobHostUnits']) + if params.get('container_type') == 'lxc': + log.warning('Juju 2.0 does not support lxc containers. ' + 'Converting containers to lxd.') + params['container_type'] = 'lxd' + + # Submit the request. params = client.AddMachineParams(**params) results = await self.client_facade.AddMachines([params]) error = results.machines[0].error @@ -1348,7 +1656,7 @@ class BundleHandler(object): series=series, application=application, config=options, - constraints=constraints, + constraints=parse_constraints(constraints), storage=storage, endpoint_bindings=endpoint_bindings, resources=resources, @@ -1356,6 +1664,8 @@ class BundleHandler(object): # do the do log.info('Deploying %s', charm) await self.app_facade.Deploy([app]) + # ensure the app is in the model for future operations + await self.model._wait_for_new('application', application) return application async def addUnit(self, application, to): @@ -1441,3 +1751,79 @@ class CharmStore(object): setattr(self, name, coro) wrapper = coro return wrapper + + +class CharmArchiveGenerator(object): + def __init__(self, path): + self.path = os.path.abspath(os.path.expanduser(path)) + + def make_archive(self, path): + """Create archive of directory and write to ``path``. + + :param path: Path to archive + + Ignored:: + + * build/\* - This is used for packing the charm itself and any + similar tasks. + * \*/.\* - Hidden files are all ignored for now. This will most + likely be changed into a specific ignore list + (.bzr, etc) + + """ + zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) + for dirpath, dirnames, filenames in os.walk(self.path): + relative_path = dirpath[len(self.path) + 1:] + if relative_path and not self._ignore(relative_path): + zf.write(dirpath, relative_path) + for name in filenames: + archive_name = os.path.join(relative_path, name) + if not self._ignore(archive_name): + real_path = os.path.join(dirpath, name) + self._check_type(real_path) + if os.path.islink(real_path): + self._check_link(real_path) + self._write_symlink( + zf, os.readlink(real_path), archive_name) + else: + zf.write(real_path, archive_name) + zf.close() + return path + + def _check_type(self, path): + """Check the path + """ + s = os.stat(path) + if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode): + return path + raise ValueError("Invalid Charm at % %s" % ( + path, "Invalid file type for a charm")) + + def _check_link(self, path): + link_path = os.readlink(path) + if link_path[0] == "/": + raise ValueError( + "Invalid Charm at %s: %s" % ( + path, "Absolute links are invalid")) + path_dir = os.path.dirname(path) + link_path = os.path.join(path_dir, link_path) + if not link_path.startswith(os.path.abspath(self.path)): + raise ValueError( + "Invalid charm at %s %s" % ( + path, "Only internal symlinks are allowed")) + + def _write_symlink(self, zf, link_target, link_path): + """Package symlinks with appropriate zipfile metadata.""" + info = zipfile.ZipInfo() + info.filename = link_path + info.create_system = 3 + # Magic code for symlinks / py2/3 compat + # 27166663808 = (stat.S_IFLNK | 0755) << 16 + info.external_attr = 2716663808 + zf.writestr(info, link_target) + + def _ignore(self, path): + if path == "build" or path.startswith("build/"): + return True + if path.startswith('.'): + return True