Add support for deploying bundles
authorCory Johns <johnsca@gmail.com>
Fri, 14 Oct 2016 00:23:56 +0000 (20:23 -0400)
committerCory Johns <johnsca@gmail.com>
Fri, 14 Oct 2016 19:51:57 +0000 (15:51 -0400)
juju/client/connection.py
juju/errors.py [new file with mode: 0644]
juju/model.py
setup.py

index 3b5bfc4..cdd93d9 100644 (file)
@@ -11,6 +11,8 @@ import websockets
 
 import yaml
 
+from juju.errors import JujuAPIError
+
 log = logging.getLogger("websocket")
 
 
@@ -81,7 +83,7 @@ class Connection:
         #log.debug("Send: %s", outgoing)
         #log.debug("Recv: %s", result)
         if result and 'error' in result:
-            raise RuntimeError(result)
+            raise JujuAPIError(result)
         return result
 
     async def clone(self):
diff --git a/juju/errors.py b/juju/errors.py
new file mode 100644 (file)
index 0000000..9295267
--- /dev/null
@@ -0,0 +1,7 @@
+
+class JujuAPIError(Exception):
+    def __init__(self, result):
+        self.message = result['error']
+        self.response = result['response']
+        self.request_id = result['request-id']
+        super().__init__(self.message)
index 6bdb261..fe07f80 100644 (file)
@@ -2,6 +2,9 @@ import asyncio
 import collections
 import logging
 from concurrent.futures import CancelledError
+from functools import partial
+
+from theblues import charmstore
 
 from .client import client
 from .client import watcher
@@ -9,6 +12,7 @@ from .client import connection
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
+from .errors import JujuAPIError
 
 log = logging.getLogger(__name__)
 
@@ -277,6 +281,7 @@ class Model(object):
         self._watcher_task = None
         self._watch_shutdown = asyncio.Event(loop=loop)
         self._watch_received = asyncio.Event(loop=loop)
+        self._charmstore = CharmStore(self.loop)
 
     async def connect_current(self):
         """Connect to the current Juju model.
@@ -651,30 +656,37 @@ class Model(object):
                 for k, v in storage.items()
             }
 
+        entity_id = await self.charmstore.entityId(entity_url)
+
         app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
         app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
-        log.debug(
-            'Deploying %s', entity_url)
-
-        await client_facade.AddCharm(channel, entity_url)
-        app = client.ApplicationDeploy(
-            application=service_name,
-            channel=channel,
-            charm_url=entity_url,
-            config=config,
-            constraints=constraints,
-            endpoint_bindings=bind,
-            num_units=num_units,
-            placement=placement,
-            resources=resources,
-            series=series,
-            storage=storage,
-        )
-
-        return await app_facade.Deploy([app])
+        if 'bundle/' in entity_id:
+            handler = BundleHandler(self)
+            await handler.fetch_plan(entity_id)
+            await handler.execute_plan()
+        else:
+            log.debug(
+                'Deploying %s', entity_id)
+
+            await client_facade.AddCharm(channel, entity_id)
+            app = client.ApplicationDeploy(
+                application=service_name,
+                channel=channel,
+                charm_url=entity_id,
+                config=config,
+                constraints=constraints,
+                endpoint_bindings=bind,
+                num_units=num_units,
+                placement=placement,
+                resources=resources,
+                series=series,
+                storage=storage,
+            )
+
+            return await app_facade.Deploy([app])
 
     def destroy(self):
         """Terminate all machines and resources for this model.
@@ -989,3 +1001,253 @@ class Model(object):
 
         """
         pass
+
+    @property
+    def charmstore(self):
+        return self._charmstore
+
+
+class BundleHandler(object):
+    """
+    Handle bundles by using the API to translate bundle YAML into a plan of
+    steps and then dispatching each of those using the API.
+    """
+    def __init__(self, model):
+        self.model = model
+        self.charmstore = model.charmstore
+        self.plan = []
+        self.references = {}
+        self._units_by_app = {}
+        for unit_name, unit in model.units.items():
+            app_units = self._units_by_app.setdefault(unit.application, [])
+            app_units.append(unit_name)
+        self.client_facade = client.ClientFacade()
+        self.client_facade.connect(model.connection)
+        self.app_facade = client.ApplicationFacade()
+        self.app_facade.connect(model.connection)
+        self.ann_facade = client.AnnotationsFacade()
+        self.ann_facade.connect(model.connection)
+
+    async def fetch_plan(self, entity_id):
+        yaml = await self.charmstore.files(entity_id,
+                                           filename='bundle.yaml',
+                                           read_file=True)
+        self.plan = await self.client_facade.GetBundleChanges(yaml)
+
+    async def execute_plan(self):
+        for step in self.plan.changes:
+            method = getattr(self, step.method)
+            result = await method(*step.args)
+            self.references[step.id_] = result
+
+    def resolve(self, reference):
+        if reference and reference.startswith('$'):
+            reference = self.references[reference[1:]]
+        return reference
+
+    async def addCharm(self, charm, series):
+        """
+        :param charm string:
+            Charm holds the URL of the charm to be added.
+
+        :param series string:
+            Series holds the series of the charm to be added
+            if the charm default is not sufficient.
+        """
+        entity_id = await self.charmstore.entityId(charm)
+        log.debug('Adding %s', entity_id)
+        await self.client_facade.AddCharm(None, entity_id)
+        return entity_id
+
+    async def addMachines(self, series, constraints, container_type,
+                          parent_id):
+        """
+        :param series string:
+            Series holds the optional machine OS series.
+
+        :param constraints string:
+            Constraints holds the optional machine constraints.
+
+        :param Container_type string:
+            ContainerType optionally holds the type of the container (for
+            instance ""lxc" or kvm"). It is not specified for top level
+            machines.
+
+        :param parent_id string:
+            ParentId optionally holds a placeholder pointing to another machine
+            change or to a unit change. This value is only specified in the
+            case this machine is a container, in which case also ContainerType
+            is set.
+        """
+        params = client.AddMachineParams(
+            series=series,
+            constraints=constraints,
+            container_type=container_type,
+            parent_id=self.resolve(parent_id),
+        )
+        results = await self.client_facade.AddMachines(params)
+        log.debug('Added new machine %s', results[0].machine)
+        return results[0].machine
+
+    async def addRelation(self, endpoint1, endpoint2):
+        """
+        :param endpoint1 string:
+        :param endpoint2 string:
+            Endpoint1 and Endpoint2 hold relation endpoints in the
+            "application:interface" form, where the application is always a
+            placeholder pointing to an application change, and the interface is
+            optional. Examples are "$deploy-42:web" or just "$deploy-42".
+        """
+        endpoints = [endpoint1, endpoint2]
+        # resolve indirect references
+        for i in range(len(endpoints)):
+            parts = endpoints[i].split(':')
+            parts[0] = self.resolve(parts[0])
+            endpoints[i] = ':'.join(parts)
+        try:
+            await self.app_facade.AddRelation(endpoints)
+            log.debug('Added relation %s <-> %s', *endpoints)
+        except JujuAPIError as e:
+            if 'relation already exists' not in e.message:
+                raise
+            log.debug('Relation %s <-> %s already exists', *endpoints)
+        return None
+
+    async def deploy(self, charm, series, application, options, constraints,
+                     storage, endpoint_bindings, resources):
+        """
+        :param charm string:
+            Charm holds the URL of the charm to be used to deploy this
+            application.
+
+        :param series string:
+            Series holds the series of the application to be deployed
+            if the charm default is not sufficient.
+
+        :param application string:
+            Application holds the application name.
+
+        :param options map[string]interface{}:
+            Options holds application options.
+
+        :param constraints string:
+            Constraints holds the optional application constraints.
+
+        :param storage map[string]string:
+            Storage holds the optional storage constraints.
+
+        :param endpoint_bindings map[string]string:
+            EndpointBindings holds the optional endpoint bindings
+
+        :param resources map[string]int:
+            Resources identifies the revision to use for each resource
+            of the application's charm.
+        """
+        # resolve indirect references
+        charm = self.resolve(charm)
+        # stringify all config values for API
+        options = {k: str(v) for k, v in options.items()}
+        # build param object
+        app = client.ApplicationDeploy(
+            charm_url=charm,
+            series=series,
+            application=application,
+            config=options,
+            constraints=constraints,
+            storage=storage,
+            endpoint_bindings=endpoint_bindings,
+            resources=resources,
+        )
+        # do the do
+        log.debug('Deploying %s', charm)
+        await self.app_facade.Deploy([app])
+        return application
+
+    async def addUnit(self, application, to):
+        """
+        :param application string:
+            Application holds the application placeholder name for which a unit
+            is added.
+
+        :param to string:
+            To holds the optional location where to add the unit, as a
+            placeholder pointing to another unit change or to a machine change.
+        """
+        application = self.resolve(application)
+        placement = self.resolve(to)
+        if self._units_by_app.get(application):
+            # enough units for this application already exist;
+            # claim one, and carry on
+            # NB: this should probably honor placement, but the juju client
+            # doesn't, so we're not bothering, either
+            unit_name = self._units_by_app[application].pop()
+            log.debug('Reusing unit %s for %s', unit_name, application)
+            return unit_name
+        log.debug('Adding unit of %s%s',
+                  application,
+                  (' to %s' % placement) if placement else '')
+        result = await self.app_facade.AddUnits(
+            application=application,
+            placement=placement,
+            num_units=1,
+        )
+        return result.units[0]
+
+    async def expose(self, application):
+        """
+        :param application string:
+            Application holds the placeholder name of the application that must
+            be exposed.
+        """
+        application = self.resolve(application)
+        log.debug('Exposing %s', application)
+        await self.app_facade.Expose(application)
+        return None
+
+    async def setAnnotations(self, id_, entity_type, annotations):
+        """
+        :param id_ string:
+            Id is the placeholder for the application or machine change
+            corresponding to the entity to be annotated.
+
+        :param entity_type EntityType:
+            EntityType holds the type of the entity, "application" or
+            "machine".
+
+        :param annotations map[string]string:
+            Annotations holds the annotations as key/value pairs.
+        """
+        entity_id = self.resolve(id_)
+        log.debug('Updating annotations of %s', entity_id)
+        ann = client.EntityAnnotations(
+            entity=entity_id,
+            annotations=annotations,
+        )
+        await self.ann_facade.Set([ann])
+        return None
+
+
+class CharmStore(object):
+    """
+    Async wrapper around theblues.charmstore.CharmStore
+    """
+    def __init__(self, loop):
+        self.loop = loop
+        self._cs = charmstore.CharmStore()
+
+    def __getattr__(self, name):
+        """
+        Wrap method calls in coroutines that use run_in_executor to make them
+        async.
+        """
+        attr = getattr(self._cs, name)
+        if not callable(attr):
+            wrapper = partial(getattr, self._cs, name)
+            setattr(self, name, wrapper)
+        else:
+            async def coro(*args, **kwargs):
+                method = partial(attr, *args, **kwargs)
+                return await self.loop.run_in_executor(None, method)
+            setattr(self, name, coro)
+            wrapper = coro
+        return wrapper
index 7a7e110..507f344 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -24,6 +24,7 @@ setup(
     install_requires=[
         'websockets',
         'pyyaml',
+        'theblues',
     ],
     include_package_data=True,
     maintainer='Juju Ecosystem Engineering',