Add get metrics
[osm/N2VC.git] / juju / model.py
index 7fe0514..56924cf 100644 (file)
@@ -159,14 +159,12 @@ class ModelState(object):
 
     def get_entity(
             self, entity_type, entity_id, history_index=-1, connected=True):
-        """Return an object instance representing the entity created or
-        updated by ``delta``
+        """Return an object instance for the given entity_type and id.
+
+        By default the object state matches the most recent state from
+        Juju. To get an instance of the object in an older state, pass
+        history_index, an index into the history deque for the entity.
 
-        """
-        """
-        log.debug(
-            'Getting %s:%s at index %s',
-            entity_type, entity_id, history_index)
         """
 
         if history_index < 0 and history_index != -1:
@@ -351,6 +349,16 @@ class Model(object):
         self._watch_received = asyncio.Event(loop=loop)
         self._charmstore = CharmStore(self.loop)
 
+    async def connect(self, *args, **kw):
+        """Connect to an arbitrary Juju model.
+
+        args and kw are passed through to Connection.connect()
+
+        """
+        self.connection = await connection.Connection.connect(*args, **kw)
+        self._watch()
+        await self._watch_received.wait()
+
     async def connect_current(self):
         """Connect to the current Juju model.
 
@@ -359,6 +367,15 @@ class Model(object):
         self._watch()
         await self._watch_received.wait()
 
+    async def connect_model(self, arg):
+        """Connect to a specific Juju model.
+        :param arg:  <controller>:<user/model>
+
+        """
+        self.connection = await connection.Connection.connect_model(arg)
+        self._watch()
+        await self._watch_received.wait()
+
     async def disconnect(self):
         """Shut down the watcher task and close websockets.
 
@@ -535,22 +552,61 @@ class Model(object):
             if o.cares_about(delta):
                 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
 
-    async def _wait_for_new(self, entity_type, entity_id, predicate=None):
+    async def _wait(self, entity_type, entity_id, action, predicate=None):
+        """
+        Block the calling routine until a given action has happened to the
+        given entity
+
+        :param entity_type: The entity's type.
+        :param entity_id: The entity's id.
+        :param action: the type of action (e.g., 'add' or 'change')
+        :param predicate: optional callable that must take as an
+            argument a delta, and must return a boolean, indicating
+            whether the delta contains the specific action we're looking
+            for. For example, you might check to see whether a 'change'
+            has a 'completed' status. See the _Observer class for details.
+
+        """
+        q = asyncio.Queue(loop=self.loop)
+
+        async def callback(delta, old, new, model):
+            await q.put(delta.get_id())
+
+        self.add_observer(callback, entity_type, action, entity_id, predicate)
+        entity_id = await q.get()
+        return self.state._live_entity_map(entity_type)[entity_id]
+
+    async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
         """Wait for a new object to appear in the Model and return it.
 
         Waits for an object of type ``entity_type`` with id ``entity_id``.
+        If ``entity_id`` is ``None``, it will wait for the first new entity
+        of the correct type.
 
         This coroutine blocks until the new object appears in the model.
 
         """
-        entity_added = asyncio.Queue(loop=self.loop)
+        # if the entity is already in the model, just return it
+        if entity_id in self.state._live_entity_map(entity_type):
+            return self.state._live_entity_map(entity_type)[entity_id]
+        # if we know the entity_id, we can trigger on any action that puts
+        # the enitty into the model; otherwise, we have to watch for the
+        # next "add" action on that entity_type
+        action = 'add' if entity_id is None else None
+        return await self._wait(entity_type, entity_id, action, predicate)
 
-        async def callback(delta, old, new, model):
-            await entity_added.put(delta.get_id())
+    async def wait_for_action(self, action_id):
+        """Given an action, wait for it to complete."""
 
-        self.add_observer(callback, entity_type, 'add', entity_id, predicate)
-        entity_id = await entity_added.get()
-        return self.state._live_entity_map(entity_type)[entity_id]
+        if action_id.startswith("action-"):
+            # if we've been passed action.tag, transform it into the
+            # id that the api deltas will use.
+            action_id = action_id[7:]
+
+        def predicate(delta):
+            return delta.data['status'] in ('completed', 'failed')
+
+        return await self._wait('action', action_id, 'change', predicate)
 
     def add_machine(
             self, spec=None, constraints=None, disks=None, series=None,
@@ -759,9 +815,6 @@ class Model(object):
             - series is required; how do we pick a default?
 
         """
-        if constraints:
-            constraints = client.Value(**constraints)
-
         if to:
             placement = [
                 client.Placement(**p) for p in to
@@ -791,8 +844,11 @@ class Model(object):
             if pending_apps:
                 # new apps will usually be in the model by now, but if some
                 # haven't made it yet we'll need to wait on them to be added
-                await asyncio.wait([self._wait_for_new('application', app_name)
-                                    for app_name in pending_apps])
+                await asyncio.gather(*[
+                    asyncio.ensure_future(
+                        self.model._wait_for_new('application', app_name))
+                    for app_name in pending_apps
+                ])
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
@@ -815,7 +871,7 @@ class Model(object):
             )
 
             await app_facade.Deploy([app])
-            return [await self._wait_for_new('application', service_name)]
+            return await self._wait_for_new('application', service_name)
 
     def destroy(self):
         """Terminate all machines and resources for this model.
@@ -823,6 +879,21 @@ class Model(object):
         """
         pass
 
+    async def destroy_unit(self, *unit_names):
+        """Destroy units by name.
+
+        """
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        log.debug(
+            'Destroying unit%s %s',
+            's' if len(unit_names) == 1 else '',
+            ' '.join(unit_names))
+
+        return await app_facade.Destroy(self.name)
+    destroy_units = destroy_unit
+
     def get_backup(self, archive_id):
         """Download a backup archive file.
 
@@ -1135,6 +1206,36 @@ class Model(object):
     def charmstore(self):
         return self._charmstore
 
+    async def get_metrics(self, *tags):
+        """Retrieve metrics.
+
+        :param str \*tags: Tags of entities from which to retrieve metrics.
+            No tags retrieves the metrics of all units in the model.
+        """
+        log.debug("Retrieving metrics for %s",
+                  ', '.join(tags) if tags else "all units")
+
+        metrics_facade = client.MetricsDebugFacade()
+        metrics_facade.connect(self.connection)
+
+        entities = [client.Entity(tag) for tag in tags]
+        metrics_result = await metrics_facade.GetMetrics(entities)
+
+        metrics = collections.defaultdict(list)
+
+        for entity_metrics in metrics_result.results:
+            error = entity_metrics.error
+            if error:
+                if "is not a valid tag" in error:
+                    raise ValueError(error.message)
+                else:
+                    raise Exception(error.message)
+
+            for metric in entity_metrics.metrics:
+                metrics[metric.unit].append(metric.to_json())
+
+        return metrics
+
 
 class BundleHandler(object):
     """
@@ -1290,6 +1391,8 @@ class BundleHandler(object):
         # do the do
         log.info('Deploying %s', charm)
         await self.app_facade.Deploy([app])
+        # ensure the app is in the model for future operations
+        await self.model._wait_for_new('application', application)
         return application
 
     async def addUnit(self, application, to):