Return app ModelEntities from bundle deploy
[osm/N2VC.git] / juju / model.py
index 6bdb261..7fe0514 100644 (file)
@@ -1,7 +1,13 @@
 import asyncio
 import collections
 import logging
+import re
+import weakref
 from concurrent.futures import CancelledError
+from functools import partial
+
+import yaml
+from theblues import charmstore
 
 from .client import client
 from .client import watcher
@@ -9,17 +15,58 @@ from .client import connection
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
+from .errors import JujuAPIError
 
 log = logging.getLogger(__name__)
 
 
+class _Observer(object):
+    """Wrapper around an observer callable.
+
+    This wrapper allows filter criteria to be associated with the
+    callable so that it's only called for changes that meet the criteria.
+
+    """
+    def __init__(self, callable_, entity_type, action, entity_id, predicate):
+        self.callable_ = callable_
+        self.entity_type = entity_type
+        self.action = action
+        self.entity_id = entity_id
+        self.predicate = predicate
+        if self.entity_id:
+            self.entity_id = str(self.entity_id)
+            if not self.entity_id.startswith('^'):
+                self.entity_id = '^' + self.entity_id
+            if not self.entity_id.endswith('$'):
+                self.entity_id += '$'
+
+    async def __call__(self, delta, old, new, model):
+        await self.callable_(delta, old, new, model)
+
+    def cares_about(self, delta):
+        """Return True if this observer "cares about" (i.e. wants to be
+        called) for a this delta.
+
+        """
+        if (self.entity_id and delta.get_id() and
+                not re.match(self.entity_id, str(delta.get_id()))):
+            return False
+
+        if self.entity_type and self.entity_type != delta.entity:
+            return False
+
+        if self.action and self.action != delta.type:
+            return False
+
+        if self.predicate and not self.predicate(delta):
+            return False
+
+        return True
+
+
 class ModelObserver(object):
     async def __call__(self, delta, old, new, model):
-        if old is None and new is not None:
-            type_ = 'add'
-        else:
-            type_ = delta.type
-        handler_name = 'on_{}_{}'.format(delta.entity, type_)
+        handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
         method = getattr(self, handler_name, self.on_change)
         await method(delta, old, new, model)
 
@@ -116,8 +163,16 @@ class ModelState(object):
         updated by ``delta``
 
         """
+        """
+        log.debug(
+            'Getting %s:%s at index %s',
+            entity_type, entity_id, history_index)
+        """
+
         if history_index < 0 and history_index != -1:
             history_index += len(self.entity_history(entity_type, entity_id))
+            if history_index < 0:
+                return None
 
         try:
             self.entity_data(entity_type, entity_id, history_index)
@@ -164,6 +219,23 @@ class ModelEntity(object):
                     self.entity_type, self.entity_id))
         return self.data[name]
 
+    def __bool__(self):
+        return bool(self.data)
+
+    def on_change(self, callable_):
+        """Add a change observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'change', self.entity_id)
+
+    def on_remove(self, callable_):
+        """Add a remove observer to this entity.
+
+        """
+        self.model.add_observer(
+            callable_, self.entity_type, 'remove', self.entity_id)
+
     @property
     def entity_type(self):
         """A string identifying the entity type of this object, e.g.
@@ -178,7 +250,7 @@ class ModelEntity(object):
         entity in the underlying model.
 
         This will be True except when the object represents an entity at a
-        prior state in history, e.g. if the object was obtained by calling
+        non-latest state in history, e.g. if the object was obtained by calling
         .previous() on another object.
 
         """
@@ -272,11 +344,12 @@ class Model(object):
         """
         self.loop = loop or asyncio.get_event_loop()
         self.connection = None
-        self.observers = set()
+        self.observers = weakref.WeakValueDictionary()
         self.state = ModelState(self)
         self._watcher_task = None
         self._watch_shutdown = asyncio.Event(loop=loop)
         self._watch_received = asyncio.Event(loop=loop)
+        self._charmstore = CharmStore(self.loop)
 
     async def connect_current(self):
         """Connect to the current Juju model.
@@ -331,7 +404,7 @@ class Model(object):
         """
         async def _block():
             while not all(c() for c in conditions):
-                await asyncio.sleep(.1)
+                await asyncio.sleep(0)
         await asyncio.wait_for(_block(), timeout)
 
     @property
@@ -358,10 +431,12 @@ class Model(object):
         """
         return self.state.units
 
-    def add_observer(self, callable_):
+    def add_observer(
+            self, callable_, entity_type=None, action=None, entity_id=None,
+            predicate=None):
         """Register an "on-model-change" callback
 
-        Once a watch is started (Model.watch() is called), ``callable_``
+        Once the model is connected, ``callable_``
         will be called each time the model changes. callable_ should
         be Awaitable and accept the following positional arguments:
 
@@ -380,8 +455,20 @@ class Model(object):
 
             model - The :class:`Model` itself.
 
+        Events for which ``callable_`` is called can be specified by passing
+        entity_type, action, and/or id_ filter criteria, e.g.:
+
+            add_observer(
+                myfunc, entity_type='application', action='add', id_='ubuntu')
+
+        For more complex filtering conditions, pass a predicate function. It
+        will be called with a delta as its only argument. If the predicate
+        function returns True, the callable_ will be called.
+
         """
-        self.observers.add(callable_)
+        observer = _Observer(
+            callable_, entity_type, action, entity_id, predicate)
+        self.observers[observer] = callable_
 
     def _watch(self):
         """Start an asynchronous watch against this model.
@@ -437,11 +524,33 @@ class Model(object):
             by applying this delta.
 
         """
+        if new_obj and not old_obj:
+            delta.type = 'add'
+
         log.debug(
             'Model changed: %s %s %s',
             delta.entity, delta.type, delta.get_id())
+
         for o in self.observers:
-            asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+            if o.cares_about(delta):
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+
+    async def _wait_for_new(self, entity_type, entity_id, predicate=None):
+        """Wait for a new object to appear in the Model and return it.
+
+        Waits for an object of type ``entity_type`` with id ``entity_id``.
+
+        This coroutine blocks until the new object appears in the model.
+
+        """
+        entity_added = asyncio.Queue(loop=self.loop)
+
+        async def callback(delta, old, new, model):
+            await entity_added.put(delta.get_id())
+
+        self.add_observer(callback, entity_type, 'add', entity_id, predicate)
+        entity_id = await entity_added.get()
+        return self.state._live_entity_map(entity_type)[entity_id]
 
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
@@ -486,7 +595,24 @@ class Model(object):
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
-        return await app_facade.AddRelation([relation1, relation2])
+        try:
+            result = await app_facade.AddRelation([relation1, relation2])
+        except JujuAPIError as e:
+            if 'relation already exists' not in e.message:
+                raise
+            log.debug(
+                'Relation %s <-> %s already exists', relation1, relation2)
+            # TODO: if relation already exists we should return the
+            # Relation ModelEntity here
+            return None
+
+        def predicate(delta):
+            endpoints = {}
+            for endpoint in delta.data['endpoints']:
+                endpoints[endpoint['application-name']] = endpoint['relation']
+            return endpoints == result.endpoints
+
+        return await self._wait_for_new('relation', None, predicate)
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
@@ -628,8 +754,6 @@ class Model(object):
 
         TODO::
 
-            - entity_url must have a revision; look up latest automatically if
-              not provided by caller
             - service_name is required; fill this in automatically if not
               provided by caller
             - series is required; how do we pick a default?
@@ -651,30 +775,47 @@ class Model(object):
                 for k, v in storage.items()
             }
 
+        entity_id = await self.charmstore.entityId(entity_url)
+
         app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
         app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
-        log.debug(
-            'Deploying %s', entity_url)
-
-        await client_facade.AddCharm(channel, entity_url)
-        app = client.ApplicationDeploy(
-            application=service_name,
-            channel=channel,
-            charm_url=entity_url,
-            config=config,
-            constraints=constraints,
-            endpoint_bindings=bind,
-            num_units=num_units,
-            placement=placement,
-            resources=resources,
-            series=series,
-            storage=storage,
-        )
-
-        return await app_facade.Deploy([app])
+        if 'bundle/' in entity_id:
+            handler = BundleHandler(self)
+            await handler.fetch_plan(entity_id)
+            await handler.execute_plan()
+            extant_apps = {app for app in self.applications}
+            pending_apps = set(handler.applications) - extant_apps
+            if pending_apps:
+                # new apps will usually be in the model by now, but if some
+                # haven't made it yet we'll need to wait on them to be added
+                await asyncio.wait([self._wait_for_new('application', app_name)
+                                    for app_name in pending_apps])
+            return [app for name, app in self.applications.items()
+                    if name in handler.applications]
+        else:
+            log.debug(
+                'Deploying %s', entity_id)
+
+            await client_facade.AddCharm(channel, entity_id)
+            app = client.ApplicationDeploy(
+                application=service_name,
+                channel=channel,
+                charm_url=entity_id,
+                config=config,
+                constraints=constraints,
+                endpoint_bindings=bind,
+                num_units=num_units,
+                placement=placement,
+                resources=resources,
+                series=series,
+                storage=storage,
+            )
+
+            await app_facade.Deploy([app])
+            return [await self._wait_for_new('application', service_name)]
 
     def destroy(self):
         """Terminate all machines and resources for this model.
@@ -989,3 +1130,248 @@ class Model(object):
 
         """
         pass
+
+    @property
+    def charmstore(self):
+        return self._charmstore
+
+
+class BundleHandler(object):
+    """
+    Handle bundles by using the API to translate bundle YAML into a plan of
+    steps and then dispatching each of those using the API.
+    """
+    def __init__(self, model):
+        self.model = model
+        self.charmstore = model.charmstore
+        self.plan = []
+        self.references = {}
+        self._units_by_app = {}
+        for unit_name, unit in model.units.items():
+            app_units = self._units_by_app.setdefault(unit.application, [])
+            app_units.append(unit_name)
+        self.client_facade = client.ClientFacade()
+        self.client_facade.connect(model.connection)
+        self.app_facade = client.ApplicationFacade()
+        self.app_facade.connect(model.connection)
+        self.ann_facade = client.AnnotationsFacade()
+        self.ann_facade.connect(model.connection)
+
+    async def fetch_plan(self, entity_id):
+        bundle_yaml = await self.charmstore.files(entity_id,
+                                                  filename='bundle.yaml',
+                                                  read_file=True)
+        self.bundle = yaml.safe_load(bundle_yaml)
+        self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
+
+    async def execute_plan(self):
+        for step in self.plan.changes:
+            method = getattr(self, step.method)
+            result = await method(*step.args)
+            self.references[step.id_] = result
+
+    @property
+    def applications(self):
+        return list(self.bundle['services'].keys())
+
+    def resolve(self, reference):
+        if reference and reference.startswith('$'):
+            reference = self.references[reference[1:]]
+        return reference
+
+    async def addCharm(self, charm, series):
+        """
+        :param charm string:
+            Charm holds the URL of the charm to be added.
+
+        :param series string:
+            Series holds the series of the charm to be added
+            if the charm default is not sufficient.
+        """
+        entity_id = await self.charmstore.entityId(charm)
+        log.debug('Adding %s', entity_id)
+        await self.client_facade.AddCharm(None, entity_id)
+        return entity_id
+
+    async def addMachines(self, series, constraints, container_type,
+                          parent_id):
+        """
+        :param series string:
+            Series holds the optional machine OS series.
+
+        :param constraints string:
+            Constraints holds the optional machine constraints.
+
+        :param Container_type string:
+            ContainerType optionally holds the type of the container (for
+            instance ""lxc" or kvm"). It is not specified for top level
+            machines.
+
+        :param parent_id string:
+            ParentId optionally holds a placeholder pointing to another machine
+            change or to a unit change. This value is only specified in the
+            case this machine is a container, in which case also ContainerType
+            is set.
+        """
+        params = client.AddMachineParams(
+            series=series,
+            constraints=constraints,
+            container_type=container_type,
+            parent_id=self.resolve(parent_id),
+        )
+        results = await self.client_facade.AddMachines(params)
+        log.debug('Added new machine %s', results[0].machine)
+        return results[0].machine
+
+    async def addRelation(self, endpoint1, endpoint2):
+        """
+        :param endpoint1 string:
+        :param endpoint2 string:
+            Endpoint1 and Endpoint2 hold relation endpoints in the
+            "application:interface" form, where the application is always a
+            placeholder pointing to an application change, and the interface is
+            optional. Examples are "$deploy-42:web" or just "$deploy-42".
+        """
+        endpoints = [endpoint1, endpoint2]
+        # resolve indirect references
+        for i in range(len(endpoints)):
+            parts = endpoints[i].split(':')
+            parts[0] = self.resolve(parts[0])
+            endpoints[i] = ':'.join(parts)
+
+        log.info('Relating %s <-> %s', *endpoints)
+        return await self.model.add_relation(*endpoints)
+
+    async def deploy(self, charm, series, application, options, constraints,
+                     storage, endpoint_bindings, resources):
+        """
+        :param charm string:
+            Charm holds the URL of the charm to be used to deploy this
+            application.
+
+        :param series string:
+            Series holds the series of the application to be deployed
+            if the charm default is not sufficient.
+
+        :param application string:
+            Application holds the application name.
+
+        :param options map[string]interface{}:
+            Options holds application options.
+
+        :param constraints string:
+            Constraints holds the optional application constraints.
+
+        :param storage map[string]string:
+            Storage holds the optional storage constraints.
+
+        :param endpoint_bindings map[string]string:
+            EndpointBindings holds the optional endpoint bindings
+
+        :param resources map[string]int:
+            Resources identifies the revision to use for each resource
+            of the application's charm.
+        """
+        # resolve indirect references
+        charm = self.resolve(charm)
+        # stringify all config values for API
+        options = {k: str(v) for k, v in options.items()}
+        # build param object
+        app = client.ApplicationDeploy(
+            charm_url=charm,
+            series=series,
+            application=application,
+            config=options,
+            constraints=constraints,
+            storage=storage,
+            endpoint_bindings=endpoint_bindings,
+            resources=resources,
+        )
+        # do the do
+        log.info('Deploying %s', charm)
+        await self.app_facade.Deploy([app])
+        return application
+
+    async def addUnit(self, application, to):
+        """
+        :param application string:
+            Application holds the application placeholder name for which a unit
+            is added.
+
+        :param to string:
+            To holds the optional location where to add the unit, as a
+            placeholder pointing to another unit change or to a machine change.
+        """
+        application = self.resolve(application)
+        placement = self.resolve(to)
+        if self._units_by_app.get(application):
+            # enough units for this application already exist;
+            # claim one, and carry on
+            # NB: this should probably honor placement, but the juju client
+            # doesn't, so we're not bothering, either
+            unit_name = self._units_by_app[application].pop()
+            log.debug('Reusing unit %s for %s', unit_name, application)
+            return self.model.units[unit_name]
+
+        log.debug('Adding new unit for %s%s', application,
+                  ' to %s' % placement if placement else '')
+        return await self.model.applications[application].add_unit(
+            count=1,
+            to=placement,
+        )
+
+    async def expose(self, application):
+        """
+        :param application string:
+            Application holds the placeholder name of the application that must
+            be exposed.
+        """
+        application = self.resolve(application)
+        log.info('Exposing %s', application)
+        return await self.model.applications[application].expose()
+
+    async def setAnnotations(self, id_, entity_type, annotations):
+        """
+        :param id_ string:
+            Id is the placeholder for the application or machine change
+            corresponding to the entity to be annotated.
+
+        :param entity_type EntityType:
+            EntityType holds the type of the entity, "application" or
+            "machine".
+
+        :param annotations map[string]string:
+            Annotations holds the annotations as key/value pairs.
+        """
+        entity_id = self.resolve(id_)
+        try:
+            entity = self.model.state.get_entity(entity_type, entity_id)
+        except KeyError:
+            entity = await self.model._wait_for_new(entity_type, entity_id)
+        return await entity.set_annotations(annotations)
+
+
+class CharmStore(object):
+    """
+    Async wrapper around theblues.charmstore.CharmStore
+    """
+    def __init__(self, loop):
+        self.loop = loop
+        self._cs = charmstore.CharmStore()
+
+    def __getattr__(self, name):
+        """
+        Wrap method calls in coroutines that use run_in_executor to make them
+        async.
+        """
+        attr = getattr(self._cs, name)
+        if not callable(attr):
+            wrapper = partial(getattr, self._cs, name)
+            setattr(self, name, wrapper)
+        else:
+            async def coro(*args, **kwargs):
+                method = partial(attr, *args, **kwargs)
+                return await self.loop.run_in_executor(None, method)
+            setattr(self, name, coro)
+            wrapper = coro
+        return wrapper