Refactored login code to better handle redirects (#116)
[osm/N2VC.git] / juju / model.py
index 634c3b6..3ed8fa7 100644 (file)
@@ -1,5 +1,7 @@
 import asyncio
+import base64
 import collections
+import hashlib
 import json
 import logging
 import os
@@ -13,10 +15,11 @@ from functools import partial
 from pathlib import Path
 
 import yaml
-from theblues import charmstore
+import theblues.charmstore
+import theblues.errors
 
+from . import tag
 from .client import client
-from .client import watcher
 from .client import connection
 from .constraints import parse as parse_constraints, normalize_key
 from .delta import get_entity_delta
@@ -229,7 +232,14 @@ class ModelEntity(object):
         model.
 
         """
-        return self.safe_data[name]
+        try:
+            return self.safe_data[name]
+        except KeyError:
+            name = name.replace('_', '-')
+            if name in self.safe_data:
+                return self.safe_data[name]
+            else:
+                raise
 
     def __bool__(self):
         return bool(self.data)
@@ -555,8 +565,7 @@ class Model(object):
         explicit call to this method.
 
         """
-        facade = client.ClientFacade()
-        facade.connect(self.connection)
+        facade = client.ClientFacade.from_connection(self.connection)
 
         self.info = await facade.ModelInfo()
         log.debug('Got ModelInfo: %s', vars(self.info))
@@ -612,9 +621,8 @@ class Model(object):
         async def _start_watch():
             self._watch_shutdown.clear()
             try:
-                allwatcher = watcher.AllWatcher()
-                self._watch_conn = await self.connection.clone()
-                allwatcher.connect(self._watch_conn)
+                allwatcher = client.AllWatcherFacade.from_connection(
+                    self.connection)
                 while True:
                     results = await allwatcher.Next()
                     for delta in results.deltas:
@@ -631,10 +639,10 @@ class Model(object):
                             loop=self.loop)
                     self._watch_received.set()
             except CancelledError:
-                log.debug('Closing watcher connection')
-                await self._watch_conn.close()
                 self._watch_shutdown.set()
-                self._watch_conn = None
+            except Exception:
+                log.exception('Error in watcher')
+                raise
 
         log.debug('Starting watcher task')
         self._watcher_task = self.loop.create_task(_start_watch())
@@ -786,12 +794,11 @@ class Model(object):
             params.series = series
 
         # Submit the request.
-        client_facade = client.ClientFacade()
-        client_facade.connect(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection)
         results = await client_facade.AddMachines([params])
         error = results.machines[0].error
         if error:
-            raise ValueError("Error adding machine: %s", error.message)
+            raise ValueError("Error adding machine: %s" % error.message)
         machine_id = results.machines[0].machine
         log.debug('Added new machine %s', machine_id)
         return await self._wait_for_new('machine', machine_id)
@@ -803,8 +810,7 @@ class Model(object):
         :param str relation2: '<application>[:<relation_name>]'
 
         """
-        app_facade = client.ApplicationFacade()
-        app_facade.connect(self.connection)
+        app_facade = client.ApplicationFacade.from_connection(self.connection)
 
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
@@ -840,13 +846,15 @@ class Model(object):
         """
         raise NotImplementedError()
 
-    def add_ssh_key(self, key):
+    async def add_ssh_key(self, user, key):
         """Add a public SSH key to this model.
 
+        :param str user: The username of the user
         :param str key: The public ssh key
 
         """
-        raise NotImplementedError()
+        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        return await key_facade.AddKeys([key], user)
     add_ssh_keys = add_ssh_key
 
     def add_subnet(self, cidr_or_id, space, *zones):
@@ -934,6 +942,22 @@ class Model(object):
         """
         raise NotImplementedError()
 
+    def _get_series(self, entity_url, entity):
+        # try to get the series from the provided charm URL
+        if entity_url.startswith('cs:'):
+            parts = entity_url[3:].split('/')
+        else:
+            parts = entity_url.split('/')
+        if parts[0].startswith('~'):
+            parts.pop(0)
+        if len(parts) > 1:
+            # series was specified in the URL
+            return parts[0]
+        # series was not supplied at all, so use the newest
+        # supported series according to the charm store
+        ss = entity['Meta']['supported-series']
+        return ss['SupportedSeries'][0]
+
     async def deploy(
             self, entity_url, application_name=None, bind=None, budget=None,
             channel=None, config=None, constraints=None, force=False,
@@ -968,16 +992,9 @@ class Model(object):
 
         TODO::
 
-            - application_name is required; fill this in automatically if not
-              provided by caller
-            - series is required; how do we pick a default?
+            - support local resources
 
         """
-        if to:
-            placement = parse_placement(to)
-        else:
-            placement = []
-
         if storage:
             storage = {
                 k: client.Constraints(**v)
@@ -988,13 +1005,13 @@ class Model(object):
             entity_url.startswith('local:') or
             os.path.isdir(entity_url)
         )
-        entity_id = await self.charmstore.entityId(entity_url) \
-            if not is_local else entity_url
+        if is_local:
+            entity_id = entity_url
+        else:
+            entity = await self.charmstore.entity(entity_url)
+            entity_id = entity['Id']
 
-        app_facade = client.ApplicationFacade()
-        client_facade = client.ClientFacade()
-        app_facade.connect(self.connection)
-        client_facade.connect(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection)
 
         is_bundle = ((is_local and
                       (Path(entity_id) / 'bundle.yaml').exists()) or
@@ -1018,12 +1035,20 @@ class Model(object):
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
-            log.debug(
-                'Deploying %s', entity_id)
-
             if not is_local:
+                if not application_name:
+                    application_name = entity['Meta']['charm-metadata']['Name']
+                if not series:
+                    series = self._get_series(entity_url, entity)
+                if not channel:
+                    channel = 'stable'
                 await client_facade.AddCharm(channel, entity_id)
-            elif not entity_id.startswith('local:'):
+                # XXX: we're dropping local resources here, but we don't
+                # actually support them yet anyway
+                resources = await self._add_store_resources(application_name,
+                                                            entity_id,
+                                                            entity)
+            else:
                 # We have a local charm dir that needs to be uploaded
                 charm_dir = os.path.abspath(
                     os.path.expanduser(entity_id))
@@ -1034,30 +1059,89 @@ class Model(object):
                         "Pass a 'series' kwarg to Model.deploy().".format(
                             charm_dir))
                 entity_id = await self.add_local_charm_dir(charm_dir, series)
-
-            app = client.ApplicationDeploy(
-                application=application_name,
-                channel=channel,
+            return await self._deploy(
                 charm_url=entity_id,
-                config=config,
-                constraints=parse_constraints(constraints),
+                application=application_name,
+                series=series,
+                config=config or {},
+                constraints=constraints,
                 endpoint_bindings=bind,
-                num_units=num_units,
                 resources=resources,
-                series=series,
                 storage=storage,
+                channel=channel,
+                num_units=num_units,
+                placement=parse_placement(to)
             )
-            app.placement = placement
 
-            result = await app_facade.Deploy([app])
-            errors = [r.error.message for r in result.results if r.error]
-            if errors:
-                raise JujuError('\n'.join(errors))
-            return await self._wait_for_new('application', application_name)
+    async def _add_store_resources(self, application, entity_url, entity=None):
+        if not entity:
+            # avoid extra charm store call if one was already made
+            entity = await self.charmstore.entity(entity_url)
+        resources = [
+            {
+                'description': resource['Description'],
+                'fingerprint': resource['Fingerprint'],
+                'name': resource['Name'],
+                'path': resource['Path'],
+                'revision': resource['Revision'],
+                'size': resource['Size'],
+                'type_': resource['Type'],
+                'origin': 'store',
+            } for resource in entity['Meta']['resources']
+        ]
+
+        if not resources:
+            return None
 
-    def destroy(self):
-        """Terminate all machines and resources for this model.
+        resources_facade = client.ResourcesFacade.from_connection(
+            self.connection)
+        response = await resources_facade.AddPendingResources(
+            tag.application(application),
+            entity_url,
+            [client.CharmResource(**resource) for resource in resources])
+        resource_map = {resource['name']: pid
+                        for resource, pid
+                        in zip(resources, response.pending_ids)}
+        return resource_map
 
+    async def _deploy(self, charm_url, application, series, config,
+                      constraints, endpoint_bindings, resources, storage,
+                      channel=None, num_units=None, placement=None):
+        """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
+        """
+        log.info('Deploying %s', charm_url)
+
+        # stringify all config values for API, and convert to YAML
+        config = {k: str(v) for k, v in config.items()}
+        config = yaml.dump({application: config},
+                           default_flow_style=False)
+
+        app_facade = client.ApplicationFacade.from_connection(
+            self.connection)
+
+        app = client.ApplicationDeploy(
+            charm_url=charm_url,
+            application=application,
+            series=series,
+            channel=channel,
+            config_yaml=config,
+            constraints=parse_constraints(constraints),
+            endpoint_bindings=endpoint_bindings,
+            num_units=num_units,
+            resources=resources,
+            storage=storage,
+            placement=placement
+        )
+
+        result = await app_facade.Deploy([app])
+        errors = [r.error.message for r in result.results if r.error]
+        if errors:
+            raise JujuError('\n'.join(errors))
+        return await self._wait_for_new('application', application)
+
+    async def destroy(self):
+        """Terminate all machines and resources for this model.
+            Is already implemented in controller.py.
         """
         raise NotImplementedError()
 
@@ -1065,8 +1149,7 @@ class Model(object):
         """Destroy units by name.
 
         """
-        app_facade = client.ApplicationFacade()
-        app_facade.connect(self.connection)
+        app_facade = client.ApplicationFacade.from_connection(self.connection)
 
         log.debug(
             'Destroying unit%s %s',
@@ -1116,14 +1199,21 @@ class Model(object):
         """
         raise NotImplementedError()
 
-    def grant(self, username, acl='read'):
+    async def grant(self, username, acl='read'):
         """Grant a user access to this model.
 
         :param str username: Username
         :param str acl: Access control ('read' or 'write')
 
         """
-        raise NotImplementedError()
+        controller_conn = await self.connection.controller()
+        model_facade = client.ModelManagerFacade.from_connection(
+            controller_conn)
+        user = tag.user(username)
+        model = tag.model(self.info.uuid)
+        changes = client.ModifyModelAccess(acl, 'grant', model, user)
+        await self.revoke(username)
+        return await model_facade.ModifyModelAccess([changes])
 
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
@@ -1134,14 +1224,11 @@ class Model(object):
         raise NotImplementedError()
     import_ssh_keys = import_ssh_key
 
-    def get_machines(self, machine, utc=False):
+    async def get_machines(self):
         """Return list of machines in this model.
 
-        :param str machine: Machine id, e.g. '0'
-        :param bool utc: Display time as UTC in RFC3339 format
-
         """
-        raise NotImplementedError()
+        return list(self.state.machines.keys())
 
     def get_shares(self):
         """Return list of all users with access to this model.
@@ -1155,11 +1242,16 @@ class Model(object):
         """
         raise NotImplementedError()
 
-    def get_ssh_key(self):
+    async def get_ssh_key(self, raw_ssh=False):
         """Return known SSH keys for this model.
+        :param bool raw_ssh: if True, returns the raw ssh key,
+            else it's fingerprint
 
         """
-        raise NotImplementedError()
+        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        entity = {'tag': tag.model(self.info.uuid)}
+        entities = client.Entities([entity])
+        return await key_facade.ListKeys(entities, raw_ssh)
     get_ssh_keys = get_ssh_key
 
     def get_storage(self, filesystem=False, volume=False):
@@ -1222,13 +1314,18 @@ class Model(object):
         raise NotImplementedError()
     remove_machines = remove_machine
 
-    def remove_ssh_key(self, *keys):
+    async def remove_ssh_key(self, user, key):
         """Remove a public SSH key(s) from this model.
 
-        :param str \*keys: Keys to remove
+        :param str key: Full ssh key
+        :param str user: Juju user to which the key is registered
 
         """
-        raise NotImplementedError()
+        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
+        key = hashlib.md5(key).hexdigest()
+        key = ':'.join(a+b for a, b in zip(key[::2], key[1::2]))
+        await key_facade.DeleteKeys([key], user)
     remove_ssh_keys = remove_ssh_key
 
     def restore_backup(
@@ -1252,14 +1349,19 @@ class Model(object):
         """
         raise NotImplementedError()
 
-    def revoke(self, username, acl='read'):
+    async def revoke(self, username):
         """Revoke a user's access to this model.
 
         :param str username: Username to revoke
-        :param str acl: Access control ('read' or 'write')
 
         """
-        raise NotImplementedError()
+        controller_conn = await self.connection.controller()
+        model_facade = client.ModelManagerFacade.from_connection(
+            controller_conn)
+        user = tag.user(username)
+        model = tag.model(self.info.uuid)
+        changes = client.ModifyModelAccess('read', 'revoke', model, user)
+        return await model_facade.ModifyModelAccess([changes])
 
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
@@ -1320,8 +1422,7 @@ class Model(object):
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        client_facade = client.ClientFacade()
-        client_facade.connect(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection)
         return await client_facade.FullStatus(filters)
 
     def sync_tools(
@@ -1401,8 +1502,8 @@ class Model(object):
         log.debug("Retrieving metrics for %s",
                   ', '.join(tags) if tags else "all units")
 
-        metrics_facade = client.MetricsDebugFacade()
-        metrics_facade.connect(self.connection)
+        metrics_facade = client.MetricsDebugFacade.from_connection(
+            self.connection)
 
         entities = [client.Entity(tag) for tag in tags]
         metrics_result = await metrics_facade.GetMetrics(entities)
@@ -1452,12 +1553,12 @@ class BundleHandler(object):
         for unit_name, unit in model.units.items():
             app_units = self._units_by_app.setdefault(unit.application, [])
             app_units.append(unit_name)
-        self.client_facade = client.ClientFacade()
-        self.client_facade.connect(model.connection)
-        self.app_facade = client.ApplicationFacade()
-        self.app_facade.connect(model.connection)
-        self.ann_facade = client.AnnotationsFacade()
-        self.ann_facade.connect(model.connection)
+        self.client_facade = client.ClientFacade.from_connection(
+            model.connection)
+        self.app_facade = client.ApplicationFacade.from_connection(
+            model.connection)
+        self.ann_facade = client.AnnotationsFacade.from_connection(
+            model.connection)
 
     async def _handle_local_charms(self, bundle):
         """Search for references to local charms (i.e. filesystem paths)
@@ -1500,7 +1601,7 @@ class BundleHandler(object):
             charm_urls = await asyncio.gather(*[
                 self.model.add_local_charm_dir(*params)
                 for params in args
-            ], loop=self.loop)
+            ], loop=self.model.loop)
             # Update the 'charm:' entry for each app with the new 'local:' url.
             for app_name, charm_url in zip(apps, charm_urls):
                 bundle['services'][app_name]['charm'] = charm_url
@@ -1521,9 +1622,6 @@ class BundleHandler(object):
         self.plan = await self.client_facade.GetBundleChanges(
             yaml.dump(self.bundle))
 
-        if self.plan.errors:
-            raise JujuError('\n'.join(self.plan.errors))
-
     async def execute_plan(self):
         for step in self.plan.changes:
             method = getattr(self, step.method)
@@ -1603,7 +1701,7 @@ class BundleHandler(object):
         results = await self.client_facade.AddMachines([params])
         error = results.machines[0].error
         if error:
-            raise ValueError("Error adding machine: %s", error.message)
+            raise ValueError("Error adding machine: %s" % error.message)
         machine = results.machines[0].machine
         log.debug('Added new machine %s', machine)
         return machine
@@ -1659,28 +1757,21 @@ class BundleHandler(object):
         """
         # resolve indirect references
         charm = self.resolve(charm)
-        # stringify all config values for API, and convert to YAML
-        options = {k: str(v) for k, v in options.items()}
-        options = yaml.dump({application: options}, default_flow_style=False)
-        # build param object
-        app = client.ApplicationDeploy(
+        # the bundle plan doesn't actually do anything with resources, even
+        # though it ostensibly gives us something (None) for that param
+        if not charm.startswith('local:'):
+            resources = await self.model._add_store_resources(application,
+                                                              charm)
+        await self.model._deploy(
             charm_url=charm,
-            series=series,
             application=application,
-            # Pass options to config-yaml rather than config, as
-            # config-yaml invokes a newer codepath that better handles
-            # empty strings in the options values.
-            config_yaml=options,
-            constraints=parse_constraints(constraints),
-            storage=storage,
+            series=series,
+            config=options,
+            constraints=constraints,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
+            storage=storage,
         )
-        # do the do
-        log.info('Deploying %s', charm)
-        await self.app_facade.Deploy([app])
-        # ensure the app is in the model for future operations
-        await self.model._wait_for_new('application', application)
         return application
 
     async def addUnit(self, application, to):
@@ -1748,7 +1839,7 @@ class CharmStore(object):
     """
     def __init__(self, loop):
         self.loop = loop
-        self._cs = charmstore.CharmStore()
+        self._cs = theblues.charmstore.CharmStore(timeout=5)
 
     def __getattr__(self, name):
         """
@@ -1762,7 +1853,13 @@ class CharmStore(object):
         else:
             async def coro(*args, **kwargs):
                 method = partial(attr, *args, **kwargs)
-                return await self.loop.run_in_executor(None, method)
+                for attempt in range(1, 4):
+                    try:
+                        return await self.loop.run_in_executor(None, method)
+                    except theblues.errors.ServerError:
+                        if attempt == 3:
+                            raise
+                        await asyncio.sleep(1, loop=self.loop)
             setattr(self, name, coro)
             wrapper = coro
         return wrapper