import collections
import logging
from concurrent.futures import CancelledError
+from functools import partial
+from theblues import charmstore
from .client import client
from .client import watcher
from .delta import get_entity_delta
from .delta import get_entity_class
from .exceptions import DeadEntityException
+from .errors import JujuAPIError
log = logging.getLogger(__name__)
self._watcher_task = None
self._watch_shutdown = asyncio.Event(loop=loop)
self._watch_received = asyncio.Event(loop=loop)
+ self._charmstore = CharmStore(self.loop)
async def connect_current(self):
"""Connect to the current Juju model.
for k, v in storage.items()
+ entity_id = await self.charmstore.entityId(entity_url)
app_facade = client.ApplicationFacade()
client_facade = client.ClientFacade()
- log.debug(
- 'Deploying %s', entity_url)
- await client_facade.AddCharm(channel, entity_url)
- app = client.ApplicationDeploy(
- application=service_name,
- channel=channel,
- charm_url=entity_url,
- config=config,
- constraints=constraints,
- endpoint_bindings=bind,
- num_units=num_units,
- placement=placement,
- resources=resources,
- series=series,
- storage=storage,
- )
- return await app_facade.Deploy([app])
+ if 'bundle/' in entity_id:
+ handler = BundleHandler(self)
+ await handler.fetch_plan(entity_id)
+ await handler.execute_plan()
+ else:
+ log.debug(
+ 'Deploying %s', entity_id)
+ await client_facade.AddCharm(channel, entity_id)
+ app = client.ApplicationDeploy(
+ application=service_name,
+ channel=channel,
+ charm_url=entity_id,
+ config=config,
+ constraints=constraints,
+ endpoint_bindings=bind,
+ num_units=num_units,
+ placement=placement,
+ resources=resources,
+ series=series,
+ storage=storage,
+ )
+ return await app_facade.Deploy([app])
def destroy(self):
"""Terminate all machines and resources for this model.
+ @property
+ def charmstore(self):
+ return self._charmstore
+class BundleHandler(object):
+ """
+ Handle bundles by using the API to translate bundle YAML into a plan of
+ steps and then dispatching each of those using the API.
+ """
+ def __init__(self, model):
+ self.model = model
+ self.charmstore = model.charmstore
+ self.plan = []
+ self.references = {}
+ self._units_by_app = {}
+ for unit_name, unit in model.units.items():
+ app_units = self._units_by_app.setdefault(unit.application, [])
+ app_units.append(unit_name)
+ self.client_facade = client.ClientFacade()
+ self.client_facade.connect(model.connection)
+ self.app_facade = client.ApplicationFacade()
+ self.app_facade.connect(model.connection)
+ self.ann_facade = client.AnnotationsFacade()
+ self.ann_facade.connect(model.connection)
+ async def fetch_plan(self, entity_id):
+ yaml = await self.charmstore.files(entity_id,
+ filename='bundle.yaml',
+ read_file=True)
+ self.plan = await self.client_facade.GetBundleChanges(yaml)
+ async def execute_plan(self):
+ for step in self.plan.changes:
+ method = getattr(self, step.method)
+ result = await method(*step.args)
+ self.references[step.id_] = result
+ def resolve(self, reference):
+ if reference and reference.startswith('$'):
+ reference = self.references[reference[1:]]
+ return reference
+ async def addCharm(self, charm, series):
+ """
+ :param charm string:
+ Charm holds the URL of the charm to be added.
+ :param series string:
+ Series holds the series of the charm to be added
+ if the charm default is not sufficient.
+ """
+ entity_id = await self.charmstore.entityId(charm)
+ log.debug('Adding %s', entity_id)
+ await self.client_facade.AddCharm(None, entity_id)
+ return entity_id
+ async def addMachines(self, series, constraints, container_type,
+ parent_id):
+ """
+ :param series string:
+ Series holds the optional machine OS series.
+ :param constraints string:
+ Constraints holds the optional machine constraints.
+ :param Container_type string:
+ ContainerType optionally holds the type of the container (for
+ instance ""lxc" or kvm"). It is not specified for top level
+ machines.
+ :param parent_id string:
+ ParentId optionally holds a placeholder pointing to another machine
+ change or to a unit change. This value is only specified in the
+ case this machine is a container, in which case also ContainerType
+ is set.
+ """
+ params = client.AddMachineParams(
+ series=series,
+ constraints=constraints,
+ container_type=container_type,
+ parent_id=self.resolve(parent_id),
+ )
+ results = await self.client_facade.AddMachines(params)
+ log.debug('Added new machine %s', results[0].machine)
+ return results[0].machine
+ async def addRelation(self, endpoint1, endpoint2):
+ """
+ :param endpoint1 string:
+ :param endpoint2 string:
+ Endpoint1 and Endpoint2 hold relation endpoints in the
+ "application:interface" form, where the application is always a
+ placeholder pointing to an application change, and the interface is
+ optional. Examples are "$deploy-42:web" or just "$deploy-42".
+ """
+ endpoints = [endpoint1, endpoint2]
+ # resolve indirect references
+ for i in range(len(endpoints)):
+ parts = endpoints[i].split(':')
+ parts[0] = self.resolve(parts[0])
+ endpoints[i] = ':'.join(parts)
+ try:
+ await self.app_facade.AddRelation(endpoints)
+ log.debug('Added relation %s <-> %s', *endpoints)
+ except JujuAPIError as e:
+ if 'relation already exists' not in e.message:
+ raise
+ log.debug('Relation %s <-> %s already exists', *endpoints)
+ return None
+ async def deploy(self, charm, series, application, options, constraints,
+ storage, endpoint_bindings, resources):
+ """
+ :param charm string:
+ Charm holds the URL of the charm to be used to deploy this
+ application.
+ :param series string:
+ Series holds the series of the application to be deployed
+ if the charm default is not sufficient.
+ :param application string:
+ Application holds the application name.
+ :param options map[string]interface{}:
+ Options holds application options.
+ :param constraints string:
+ Constraints holds the optional application constraints.
+ :param storage map[string]string:
+ Storage holds the optional storage constraints.
+ :param endpoint_bindings map[string]string:
+ EndpointBindings holds the optional endpoint bindings
+ :param resources map[string]int:
+ Resources identifies the revision to use for each resource
+ of the application's charm.
+ """
+ # resolve indirect references
+ charm = self.resolve(charm)
+ # stringify all config values for API
+ options = {k: str(v) for k, v in options.items()}
+ # build param object
+ app = client.ApplicationDeploy(
+ charm_url=charm,
+ series=series,
+ application=application,
+ config=options,
+ constraints=constraints,
+ storage=storage,
+ endpoint_bindings=endpoint_bindings,
+ resources=resources,
+ )
+ # do the do
+ log.debug('Deploying %s', charm)
+ await self.app_facade.Deploy([app])
+ return application
+ async def addUnit(self, application, to):
+ """
+ :param application string:
+ Application holds the application placeholder name for which a unit
+ is added.
+ :param to string:
+ To holds the optional location where to add the unit, as a
+ placeholder pointing to another unit change or to a machine change.
+ """
+ application = self.resolve(application)
+ placement = self.resolve(to)
+ if self._units_by_app.get(application):
+ # enough units for this application already exist;
+ # claim one, and carry on
+ # NB: this should probably honor placement, but the juju client
+ # doesn't, so we're not bothering, either
+ unit_name = self._units_by_app[application].pop()
+ log.debug('Reusing unit %s for %s', unit_name, application)
+ return unit_name
+ log.debug('Adding unit of %s%s',
+ application,
+ (' to %s' % placement) if placement else '')
+ result = await self.app_facade.AddUnits(
+ application=application,
+ placement=placement,
+ num_units=1,
+ )
+ return result.units[0]
+ async def expose(self, application):
+ """
+ :param application string:
+ Application holds the placeholder name of the application that must
+ be exposed.
+ """
+ application = self.resolve(application)
+ log.debug('Exposing %s', application)
+ await self.app_facade.Expose(application)
+ return None
+ async def setAnnotations(self, id_, entity_type, annotations):
+ """
+ :param id_ string:
+ Id is the placeholder for the application or machine change
+ corresponding to the entity to be annotated.
+ :param entity_type EntityType:
+ EntityType holds the type of the entity, "application" or
+ "machine".
+ :param annotations map[string]string:
+ Annotations holds the annotations as key/value pairs.
+ """
+ entity_id = self.resolve(id_)
+ log.debug('Updating annotations of %s', entity_id)
+ ann = client.EntityAnnotations(
+ entity=entity_id,
+ annotations=annotations,
+ )
+ await self.ann_facade.Set([ann])
+ return None
+class CharmStore(object):
+ """
+ Async wrapper around theblues.charmstore.CharmStore
+ """
+ def __init__(self, loop):
+ self.loop = loop
+ self._cs = charmstore.CharmStore()
+ def __getattr__(self, name):
+ """
+ Wrap method calls in coroutines that use run_in_executor to make them
+ async.
+ """
+ attr = getattr(self._cs, name)
+ if not callable(attr):
+ wrapper = partial(getattr, self._cs, name)
+ setattr(self, name, wrapper)
+ else:
+ async def coro(*args, **kwargs):
+ method = partial(attr, *args, **kwargs)
+ return await self.loop.run_in_executor(None, method)
+ setattr(self, name, coro)
+ wrapper = coro
+ return wrapper