+ raise NotImplementedError()
+
+ @property
+ def charmstore(self):
+ return self._charmstore
+
+ async def get_metrics(self, *tags):
+ """Retrieve metrics.
+
+ :param str \*tags: Tags of entities from which to retrieve metrics.
+ No tags retrieves the metrics of all units in the model.
+ :return: Dictionary of unit_name:metrics
+
+ """
+ log.debug("Retrieving metrics for %s",
+ ', '.join(tags) if tags else "all units")
+
+ metrics_facade = client.MetricsDebugFacade.from_connection(
+ self.connection)
+
+ entities = [client.Entity(tag) for tag in tags]
+ metrics_result = await metrics_facade.GetMetrics(entities)
+
+ metrics = collections.defaultdict(list)
+
+ for entity_metrics in metrics_result.results:
+ error = entity_metrics.error
+ if error:
+ if "is not a valid tag" in error:
+ raise ValueError(error.message)
+ else:
+ raise Exception(error.message)
+
+ for metric in entity_metrics.metrics:
+ metrics[metric.unit].append(vars(metric))
+
+ return metrics
+
+
+def get_charm_series(path):
+ """Inspects the charm directory at ``path`` and returns a default
+ series from its metadata.yaml (the first item in the 'series' list).
+
+ Returns None if no series can be determined.
+
+ """
+ md = Path(path) / "metadata.yaml"
+ if not md.exists():
+ return None
+ data = yaml.load(md.open())
+ series = data.get('series')
+ return series[0] if series else None
+
+
+class BundleHandler(object):
+ """
+ Handle bundles by using the API to translate bundle YAML into a plan of
+ steps and then dispatching each of those using the API.
+ """
+ def __init__(self, model):
+ self.model = model
+ self.charmstore = model.charmstore
+ self.plan = []
+ self.references = {}
+ self._units_by_app = {}
+ for unit_name, unit in model.units.items():
+ app_units = self._units_by_app.setdefault(unit.application, [])
+ app_units.append(unit_name)
+ self.client_facade = client.ClientFacade.from_connection(
+ model.connection)
+ self.app_facade = client.ApplicationFacade.from_connection(
+ model.connection)
+ self.ann_facade = client.AnnotationsFacade.from_connection(
+ model.connection)
+
+ async def _handle_local_charms(self, bundle):
+ """Search for references to local charms (i.e. filesystem paths)
+ in the bundle. Upload the local charms to the model, and replace
+ the filesystem paths with appropriate 'local:' paths in the bundle.
+
+ Return the modified bundle.
+
+ :param dict bundle: Bundle dictionary
+ :return: Modified bundle dictionary
+
+ """
+ apps, args = [], []
+
+ default_series = bundle.get('series')
+ for app_name in self.applications:
+ app_dict = bundle['services'][app_name]
+ charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
+ if not os.path.isdir(charm_dir):
+ continue
+ series = (
+ app_dict.get('series') or
+ default_series or
+ get_charm_series(charm_dir)
+ )
+ if not series:
+ raise JujuError(
+ "Couldn't determine series for charm at {}. "
+ "Add a 'series' key to the bundle.".format(charm_dir))
+
+ # Keep track of what we need to update. We keep a list of apps
+ # that need to be updated, and a corresponding list of args
+ # needed to update those apps.
+ apps.append(app_name)
+ args.append((charm_dir, series))
+
+ if apps:
+ # If we have apps to update, spawn all the coroutines concurrently
+ # and wait for them to finish.
+ charm_urls = await asyncio.gather(*[
+ self.model.add_local_charm_dir(*params)
+ for params in args
+ ], loop=self.model.loop)
+ # Update the 'charm:' entry for each app with the new 'local:' url.
+ for app_name, charm_url in zip(apps, charm_urls):
+ bundle['services'][app_name]['charm'] = charm_url
+
+ return bundle
+
+ async def fetch_plan(self, entity_id):
+ is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
+ if is_local:
+ bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
+ else:
+ bundle_yaml = await self.charmstore.files(entity_id,
+ filename='bundle.yaml',
+ read_file=True)
+ self.bundle = yaml.safe_load(bundle_yaml)
+ self.bundle = await self._handle_local_charms(self.bundle)
+
+ self.plan = await self.client_facade.GetBundleChanges(
+ yaml.dump(self.bundle))
+
+ async def execute_plan(self):
+ for step in self.plan.changes:
+ method = getattr(self, step.method)
+ result = await method(*step.args)
+ self.references[step.id_] = result
+
+ @property
+ def applications(self):
+ return list(self.bundle['services'].keys())
+
+ def resolve(self, reference):
+ if reference and reference.startswith('$'):
+ reference = self.references[reference[1:]]
+ return reference
+
+ async def addCharm(self, charm, series):
+ """
+ :param charm string:
+ Charm holds the URL of the charm to be added.
+
+ :param series string:
+ Series holds the series of the charm to be added
+ if the charm default is not sufficient.
+ """
+ # We don't add local charms because they've already been added
+ # by self._handle_local_charms
+ if charm.startswith('local:'):
+ return charm
+
+ entity_id = await self.charmstore.entityId(charm)
+ log.debug('Adding %s', entity_id)
+ await self.client_facade.AddCharm(None, entity_id)
+ return entity_id
+
+ async def addMachines(self, params=None):
+ """
+ :param params dict:
+ Dictionary specifying the machine to add. All keys are optional.
+ Keys include:
+
+ series: string specifying the machine OS series.
+
+ constraints: string holding machine constraints, if any. We'll
+ parse this into the json friendly dict that the juju api
+ expects.
+
+ container_type: string holding the type of the container (for
+ instance ""lxd" or kvm"). It is not specified for top level
+ machines.
+
+ parent_id: string holding a placeholder pointing to another
+ machine change or to a unit change. This value is only
+ specified in the case this machine is a container, in
+ which case also ContainerType is set.
+
+ """
+ params = params or {}
+
+ # Normalize keys
+ params = {normalize_key(k): params[k] for k in params.keys()}
+
+ # Fix up values, as necessary.
+ if 'parent_id' in params:
+ params['parent_id'] = self.resolve(params['parent_id'])
+
+ params['constraints'] = parse_constraints(
+ params.get('constraints'))
+ params['jobs'] = params.get('jobs', ['JobHostUnits'])
+
+ if params.get('container_type') == 'lxc':
+ log.warning('Juju 2.0 does not support lxc containers. '
+ 'Converting containers to lxd.')
+ params['container_type'] = 'lxd'
+
+ # Submit the request.
+ params = client.AddMachineParams(**params)
+ results = await self.client_facade.AddMachines([params])
+ error = results.machines[0].error
+ if error:
+ raise ValueError("Error adding machine: %s" % error.message)
+ machine = results.machines[0].machine
+ log.debug('Added new machine %s', machine)
+ return machine
+
+ async def addRelation(self, endpoint1, endpoint2):
+ """
+ :param endpoint1 string:
+ :param endpoint2 string:
+ Endpoint1 and Endpoint2 hold relation endpoints in the
+ "application:interface" form, where the application is always a
+ placeholder pointing to an application change, and the interface is
+ optional. Examples are "$deploy-42:web" or just "$deploy-42".
+ """
+ endpoints = [endpoint1, endpoint2]
+ # resolve indirect references
+ for i in range(len(endpoints)):
+ parts = endpoints[i].split(':')
+ parts[0] = self.resolve(parts[0])
+ endpoints[i] = ':'.join(parts)
+
+ log.info('Relating %s <-> %s', *endpoints)
+ return await self.model.add_relation(*endpoints)
+
+ async def deploy(self, charm, series, application, options, constraints,
+ storage, endpoint_bindings, resources):
+ """
+ :param charm string:
+ Charm holds the URL of the charm to be used to deploy this
+ application.
+
+ :param series string:
+ Series holds the series of the application to be deployed
+ if the charm default is not sufficient.
+
+ :param application string:
+ Application holds the application name.
+
+ :param options map[string]interface{}:
+ Options holds application options.
+
+ :param constraints string:
+ Constraints holds the optional application constraints.
+
+ :param storage map[string]string:
+ Storage holds the optional storage constraints.
+
+ :param endpoint_bindings map[string]string:
+ EndpointBindings holds the optional endpoint bindings
+
+ :param resources map[string]int:
+ Resources identifies the revision to use for each resource
+ of the application's charm.
+ """
+ # resolve indirect references
+ charm = self.resolve(charm)
+ # the bundle plan doesn't actually do anything with resources, even
+ # though it ostensibly gives us something (None) for that param
+ if not charm.startswith('local:'):
+ resources = await self.model._add_store_resources(application,
+ charm)
+ await self.model._deploy(
+ charm_url=charm,
+ application=application,
+ series=series,
+ config=options,
+ constraints=constraints,
+ endpoint_bindings=endpoint_bindings,
+ resources=resources,
+ storage=storage,
+ )
+ return application
+
+ async def addUnit(self, application, to):
+ """
+ :param application string:
+ Application holds the application placeholder name for which a unit
+ is added.
+
+ :param to string:
+ To holds the optional location where to add the unit, as a
+ placeholder pointing to another unit change or to a machine change.
+ """
+ application = self.resolve(application)
+ placement = self.resolve(to)
+ if self._units_by_app.get(application):
+ # enough units for this application already exist;
+ # claim one, and carry on
+ # NB: this should probably honor placement, but the juju client
+ # doesn't, so we're not bothering, either
+ unit_name = self._units_by_app[application].pop()
+ log.debug('Reusing unit %s for %s', unit_name, application)
+ return self.model.units[unit_name]
+
+ log.debug('Adding new unit for %s%s', application,
+ ' to %s' % placement if placement else '')
+ return await self.model.applications[application].add_unit(
+ count=1,
+ to=placement,
+ )
+
+ async def expose(self, application):
+ """
+ :param application string:
+ Application holds the placeholder name of the application that must
+ be exposed.
+ """
+ application = self.resolve(application)
+ log.info('Exposing %s', application)
+ return await self.model.applications[application].expose()
+
+ async def setAnnotations(self, id_, entity_type, annotations):
+ """
+ :param id_ string:
+ Id is the placeholder for the application or machine change
+ corresponding to the entity to be annotated.
+
+ :param entity_type EntityType:
+ EntityType holds the type of the entity, "application" or
+ "machine".
+
+ :param annotations map[string]string:
+ Annotations holds the annotations as key/value pairs.
+ """
+ entity_id = self.resolve(id_)
+ try:
+ entity = self.model.state.get_entity(entity_type, entity_id)
+ except KeyError:
+ entity = await self.model._wait_for_new(entity_type, entity_id)
+ return await entity.set_annotations(annotations)
+
+
+class CharmStore(object):
+ """
+ Async wrapper around theblues.charmstore.CharmStore
+ """
+ def __init__(self, loop):
+ self.loop = loop
+ self._cs = theblues.charmstore.CharmStore(timeout=5)
+
+ def __getattr__(self, name):
+ """
+ Wrap method calls in coroutines that use run_in_executor to make them
+ async.
+ """
+ attr = getattr(self._cs, name)
+ if not callable(attr):
+ wrapper = partial(getattr, self._cs, name)
+ setattr(self, name, wrapper)
+ else:
+ async def coro(*args, **kwargs):
+ method = partial(attr, *args, **kwargs)
+ for attempt in range(1, 4):
+ try:
+ return await self.loop.run_in_executor(None, method)
+ except theblues.errors.ServerError:
+ if attempt == 3:
+ raise
+ await asyncio.sleep(1, loop=self.loop)
+ setattr(self, name, coro)
+ wrapper = coro
+ return wrapper
+
+
+class CharmArchiveGenerator(object):
+ def __init__(self, path):
+ self.path = os.path.abspath(os.path.expanduser(path))
+
+ def make_archive(self, path):
+ """Create archive of directory and write to ``path``.
+
+ :param path: Path to archive
+
+ Ignored::
+
+ * build/\* - This is used for packing the charm itself and any
+ similar tasks.
+ * \*/.\* - Hidden files are all ignored for now. This will most
+ likely be changed into a specific ignore list
+ (.bzr, etc)
+
+ """
+ zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
+ for dirpath, dirnames, filenames in os.walk(self.path):
+ relative_path = dirpath[len(self.path) + 1:]
+ if relative_path and not self._ignore(relative_path):
+ zf.write(dirpath, relative_path)
+ for name in filenames:
+ archive_name = os.path.join(relative_path, name)
+ if not self._ignore(archive_name):
+ real_path = os.path.join(dirpath, name)
+ self._check_type(real_path)
+ if os.path.islink(real_path):
+ self._check_link(real_path)
+ self._write_symlink(
+ zf, os.readlink(real_path), archive_name)
+ else:
+ zf.write(real_path, archive_name)
+ zf.close()
+ return path
+
+ def _check_type(self, path):
+ """Check the path
+ """
+ s = os.stat(path)
+ if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
+ return path
+ raise ValueError("Invalid Charm at % %s" % (
+ path, "Invalid file type for a charm"))
+
+ def _check_link(self, path):
+ link_path = os.readlink(path)
+ if link_path[0] == "/":
+ raise ValueError(
+ "Invalid Charm at %s: %s" % (
+ path, "Absolute links are invalid"))
+ path_dir = os.path.dirname(path)
+ link_path = os.path.join(path_dir, link_path)
+ if not link_path.startswith(os.path.abspath(self.path)):
+ raise ValueError(
+ "Invalid charm at %s %s" % (
+ path, "Only internal symlinks are allowed"))
+
+ def _write_symlink(self, zf, link_target, link_path):
+ """Package symlinks with appropriate zipfile metadata."""
+ info = zipfile.ZipInfo()
+ info.filename = link_path
+ info.create_system = 3
+ # Magic code for symlinks / py2/3 compat
+ # 27166663808 = (stat.S_IFLNK | 0755) << 16
+ info.external_attr = 2716663808
+ zf.writestr(info, link_target)
+
+ def _ignore(self, path):
+ if path == "build" or path.startswith("build/"):
+ return True
+ if path.startswith('.'):
+ return True