Fixes based on review
[osm/N2VC.git] / juju / model.py
index ef6bc85..3df2669 100644 (file)
@@ -13,7 +13,8 @@ from functools import partial
 from pathlib import Path
 
 import yaml
-from theblues import charmstore
+import theblues.charmstore
+import theblues.errors
 
 from .client import client
 from .client import watcher
@@ -376,8 +377,8 @@ class Model(object):
         self.state = ModelState(self)
         self.info = None
         self._watcher_task = None
-        self._watch_shutdown = asyncio.Event(loop=loop)
-        self._watch_received = asyncio.Event(loop=loop)
+        self._watch_shutdown = asyncio.Event(loop=self.loop)
+        self._watch_received = asyncio.Event(loop=self.loop)
         self._charmstore = CharmStore(self.loop)
 
     async def connect(self, *args, **kw):
@@ -386,6 +387,8 @@ class Model(object):
         args and kw are passed through to Connection.connect()
 
         """
+        if 'loop' not in kw:
+            kw['loop'] = self.loop
         self.connection = await connection.Connection.connect(*args, **kw)
         await self._after_connect()
 
@@ -393,7 +396,8 @@ class Model(object):
         """Connect to the current Juju model.
 
         """
-        self.connection = await connection.Connection.connect_current()
+        self.connection = await connection.Connection.connect_current(
+            self.loop)
         await self._after_connect()
 
     async def connect_model(self, model_name):
@@ -402,7 +406,8 @@ class Model(object):
         :param model_name:  Format [controller:][user/]model
 
         """
-        self.connection = await connection.Connection.connect_model(model_name)
+        self.connection = await connection.Connection.connect_model(model_name,
+                                                                    self.loop)
         await self._after_connect()
 
     async def _after_connect(self):
@@ -511,8 +516,8 @@ class Model(object):
         """
         async def _block():
             while not all(c() for c in conditions):
-                await asyncio.sleep(wait_period)
-        await asyncio.wait_for(_block(), timeout)
+                await asyncio.sleep(wait_period, loop=self.loop)
+        await asyncio.wait_for(_block(), timeout, loop=self.loop)
 
     @property
     def applications(self):
@@ -623,7 +628,8 @@ class Model(object):
                         # canceled with it. So we shield them. But this means
                         # they can *never* be canceled.
                         await asyncio.shield(
-                            self._notify_observers(delta, old_obj, new_obj))
+                            self._notify_observers(delta, old_obj, new_obj),
+                            loop=self.loop)
                     self._watch_received.set()
             except CancelledError:
                 log.debug('Closing watcher connection')
@@ -662,7 +668,8 @@ class Model(object):
 
         for o in self.observers:
             if o.cares_about(delta):
-                asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+                asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+                                      loop=self.loop)
 
     async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
@@ -671,7 +678,7 @@ class Model(object):
 
         :param entity_type: The entity's type.
         :param entity_id: The entity's id.
-        :param action: the type of action (e.g., 'add' or 'change')
+        :param action: the type of action (e.g., 'add', 'change', or 'remove')
         :param predicate: optional callable that must take as an
             argument a delta, and must return a boolean, indicating
             whether the delta contains the specific action we're looking
@@ -686,7 +693,9 @@ class Model(object):
 
         self.add_observer(callback, entity_type, action, entity_id, predicate)
         entity_id = await q.get()
-        return self.state._live_entity_map(entity_type)[entity_id]
+        # object might not be in the entity_map if we were waiting for a
+        # 'remove' action
+        return self.state._live_entity_map(entity_type).get(entity_id)
 
     async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
         """Wait for a new object to appear in the Model and return it.
@@ -830,7 +839,7 @@ class Model(object):
         :param \*cidrs: Optional list of existing subnet CIDRs
 
         """
-        pass
+        raise NotImplementedError()
 
     def add_ssh_key(self, key):
         """Add a public SSH key to this model.
@@ -838,7 +847,7 @@ class Model(object):
         :param str key: The public ssh key
 
         """
-        pass
+        raise NotImplementedError()
     add_ssh_keys = add_ssh_key
 
     def add_subnet(self, cidr_or_id, space, *zones):
@@ -849,13 +858,13 @@ class Model(object):
         :param str \*zones: Zone(s) in which the subnet resides
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_backups(self):
         """Retrieve metadata for backups in this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def block(self, *commands):
         """Add a new block to this model.
@@ -864,13 +873,13 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_blocks(self):
         """List blocks for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_cached_images(self, arch=None, kind=None, series=None):
         """Return a list of cached OS images.
@@ -880,7 +889,7 @@ class Model(object):
         :param str series: Filter by image series, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_backup(self, note=None, no_download=False):
         """Create a backup of this model.
@@ -890,7 +899,7 @@ class Model(object):
         :return str: Path to downloaded archive
 
         """
-        pass
+        raise NotImplementedError()
 
     def create_storage_pool(self, name, provider_type, **pool_config):
         """Create or define a storage pool.
@@ -900,7 +909,7 @@ class Model(object):
         :param \*\*pool_config: key/value pool configuration pairs
 
         """
-        pass
+        raise NotImplementedError()
 
     def debug_log(
             self, no_tail=False, exclude_module=None, include_module=None,
@@ -924,7 +933,7 @@ class Model(object):
         :param list exclude: Do not show log messages for these entities
 
         """
-        pass
+        raise NotImplementedError()
 
     async def deploy(
             self, entity_url, application_name=None, bind=None, budget=None,
@@ -965,11 +974,6 @@ class Model(object):
             - series is required; how do we pick a default?
 
         """
-        if to:
-            placement = parse_placement(to)
-        else:
-            placement = []
-
         if storage:
             storage = {
                 k: client.Constraints(**v)
@@ -980,12 +984,13 @@ class Model(object):
             entity_url.startswith('local:') or
             os.path.isdir(entity_url)
         )
-        entity_id = await self.charmstore.entityId(entity_url) \
-            if not is_local else entity_url
+        if is_local:
+            entity_id = entity_url
+        else:
+            entity = await self.charmstore.entity(entity_url)
+            entity_id = entity['Id']
 
-        app_facade = client.ApplicationFacade()
         client_facade = client.ClientFacade()
-        app_facade.connect(self.connection)
         client_facade.connect(self.connection)
 
         is_bundle = ((is_local and
@@ -1003,18 +1008,36 @@ class Model(object):
                 # haven't made it yet we'll need to wait on them to be added
                 await asyncio.gather(*[
                     asyncio.ensure_future(
-                        self._wait_for_new('application', app_name))
+                        self._wait_for_new('application', app_name),
+                        loop=self.loop)
                     for app_name in pending_apps
-                ])
+                ], loop=self.loop)
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
-            log.debug(
-                'Deploying %s', entity_id)
-
             if not is_local:
+                if not application_name:
+                    application_name = entity['Meta']['charm-metadata']['Name']
+                if not series and '/' in entity_url:
+                    # try to get the series from the provided charm URL
+                    if entity_url.startswith('cs:'):
+                        parts = entity_url[3:].split('/')
+                    else:
+                        parts = entity_url.split('/')
+                    if parts[0].startswith('~'):
+                        parts.pop(0)
+                    if len(parts) > 1:
+                        # series was specified in the URL
+                        series = parts[0]
+                if not series:
+                    # series was not supplied at all, so use the newest
+                    # supported series according to the charm store
+                    ss = entity['Meta']['supported-series']
+                    series = ss['SupportedSeries'][0]
+                if not channel:
+                    channel = 'stable'
                 await client_facade.AddCharm(channel, entity_id)
-            elif not entity_id.startswith('local:'):
+            else:
                 # We have a local charm dir that needs to be uploaded
                 charm_dir = os.path.abspath(
                     os.path.expanduser(entity_id))
@@ -1025,32 +1048,60 @@ class Model(object):
                         "Pass a 'series' kwarg to Model.deploy().".format(
                             charm_dir))
                 entity_id = await self.add_local_charm_dir(charm_dir, series)
-
-            app = client.ApplicationDeploy(
-                application=application_name,
-                channel=channel,
+            return await self._deploy(
                 charm_url=entity_id,
-                config=config,
-                constraints=parse_constraints(constraints),
+                application=application_name,
+                series=series,
+                config=config or {},
+                constraints=constraints,
                 endpoint_bindings=bind,
-                num_units=num_units,
                 resources=resources,
-                series=series,
                 storage=storage,
+                channel=channel,
+                num_units=num_units,
+                placement=parse_placement(to),
             )
-            app.placement = placement
 
-            result = await app_facade.Deploy([app])
-            errors = [r.error.message for r in result.results if r.error]
-            if errors:
-                raise JujuError('\n'.join(errors))
-            return await self._wait_for_new('application', application_name)
+    async def _deploy(self, charm_url, application, series, config,
+                      constraints, endpoint_bindings, resources, storage,
+                      channel=None, num_units=None, placement=None):
+        """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
+        """
+        log.info('Deploying %s', charm_url)
+
+        # stringify all config values for API, and convert to YAML
+        config = {k: str(v) for k, v in config.items()}
+        config = yaml.dump({application: config},
+                           default_flow_style=False)
+
+        app_facade = client.ApplicationFacade()
+        app_facade.connect(self.connection)
+
+        app = client.ApplicationDeploy(
+            charm_url=charm_url,
+            application=application,
+            series=series,
+            channel=channel,
+            config_yaml=config,
+            constraints=parse_constraints(constraints),
+            endpoint_bindings=endpoint_bindings,
+            num_units=num_units,
+            resources=resources,
+            storage=storage,
+            placement=placement,
+        )
+
+        result = await app_facade.Deploy([app])
+        errors = [r.error.message for r in result.results if r.error]
+        if errors:
+            raise JujuError('\n'.join(errors))
+        return await self._wait_for_new('application', application)
 
     def destroy(self):
         """Terminate all machines and resources for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     async def destroy_unit(self, *unit_names):
         """Destroy units by name.
@@ -1074,7 +1125,7 @@ class Model(object):
         :return str: Path to the archive file
 
         """
-        pass
+        raise NotImplementedError()
 
     def enable_ha(
             self, num_controllers=0, constraints=None, series=None, to=None):
@@ -1093,19 +1144,19 @@ class Model(object):
             If None, a new machine is provisioned.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_config(self):
         """Return the configuration settings for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_constraints(self):
         """Return the machine constraints for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def grant(self, username, acl='read'):
         """Grant a user access to this model.
@@ -1114,7 +1165,7 @@ class Model(object):
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        raise NotImplementedError()
 
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
@@ -1122,7 +1173,7 @@ class Model(object):
         :param str identity: User identity in the form <lp|gh>:<username>
 
         """
-        pass
+        raise NotImplementedError()
     import_ssh_keys = import_ssh_key
 
     def get_machines(self, machine, utc=False):
@@ -1132,25 +1183,25 @@ class Model(object):
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_shares(self):
         """Return list of all users with access to this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_spaces(self):
         """Return list of all known spaces, including associated subnets.
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_ssh_key(self):
         """Return known SSH keys for this model.
 
         """
-        pass
+        raise NotImplementedError()
     get_ssh_keys = get_ssh_key
 
     def get_storage(self, filesystem=False, volume=False):
@@ -1160,7 +1211,7 @@ class Model(object):
         :param bool volume: Include volume storage
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_storage_pools(self, names=None, providers=None):
         """Return list of storage pools.
@@ -1169,7 +1220,7 @@ class Model(object):
         :param list providers: Only include pools for these providers
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_subnets(self, space=None, zone=None):
         """Return list of known subnets.
@@ -1178,13 +1229,13 @@ class Model(object):
         :param str zone: Only include subnets in this zone
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_blocks(self):
         """Remove all blocks from this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_backup(self, backup_id):
         """Delete a backup.
@@ -1192,7 +1243,7 @@ class Model(object):
         :param str backup_id: The id of the backup to remove
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_cached_images(self, arch=None, kind=None, series=None):
         """Remove cached OS images.
@@ -1202,7 +1253,7 @@ class Model(object):
         :param str series: Image series to remove, e.g. 'xenial'
 
         """
-        pass
+        raise NotImplementedError()
 
     def remove_machine(self, *machine_ids):
         """Remove a machine from this model.
@@ -1210,7 +1261,7 @@ class Model(object):
         :param str \*machine_ids: Ids of the machines to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_machines = remove_machine
 
     def remove_ssh_key(self, *keys):
@@ -1219,7 +1270,7 @@ class Model(object):
         :param str \*keys: Keys to remove
 
         """
-        pass
+        raise NotImplementedError()
     remove_ssh_keys = remove_ssh_key
 
     def restore_backup(
@@ -1235,13 +1286,13 @@ class Model(object):
         :param bool upload_tools: Upload tools if bootstrapping a new machine
 
         """
-        pass
+        raise NotImplementedError()
 
     def retry_provisioning(self):
         """Retry provisioning for failed machines.
 
         """
-        pass
+        raise NotImplementedError()
 
     def revoke(self, username, acl='read'):
         """Revoke a user's access to this model.
@@ -1250,7 +1301,7 @@ class Model(object):
         :param str acl: Access control ('read' or 'write')
 
         """
-        pass
+        raise NotImplementedError()
 
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
@@ -1259,7 +1310,7 @@ class Model(object):
         :param int timeout: Time to wait before command is considered failed
 
         """
-        pass
+        raise NotImplementedError()
 
     def set_config(self, **config):
         """Set configuration keys on this model.
@@ -1267,7 +1318,7 @@ class Model(object):
         :param \*\*config: Config key/values
 
         """
-        pass
+        raise NotImplementedError()
 
     def set_constraints(self, constraints):
         """Set machine constraints on this model.
@@ -1275,7 +1326,7 @@ class Model(object):
         :param :class:`juju.Constraints` constraints: Machine constraints
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_output(self, action_uuid, wait=-1):
         """Get the results of an action by ID.
@@ -1284,7 +1335,7 @@ class Model(object):
         :param int wait: Time in seconds to wait for action to complete
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_action_status(self, uuid_or_prefix=None, name=None):
         """Get the status of all actions, filtered by ID, ID prefix, or action name.
@@ -1293,7 +1344,7 @@ class Model(object):
         :param str name: Filter by action name
 
         """
-        pass
+        raise NotImplementedError()
 
     def get_budget(self, budget_name):
         """Get budget usage info.
@@ -1301,17 +1352,19 @@ class Model(object):
         :param str budget_name: Name of budget
 
         """
-        pass
+        raise NotImplementedError()
 
-    def get_status(self, filter_=None, utc=False):
+    async def get_status(self, filters=None, utc=False):
         """Return the status of the model.
 
-        :param str filter_: Service or unit name or wildcard ('*')
+        :param str filters: Optional list of applications, units, or machines
+            to include, which can use wildcards ('*').
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        pass
-    status = get_status
+        client_facade = client.ClientFacade()
+        client_facade.connect(self.connection)
+        return await client_facade.FullStatus(filters)
 
     def sync_tools(
             self, all_=False, destination=None, dry_run=False, public=False,
@@ -1328,7 +1381,7 @@ class Model(object):
         :param str version: Copy a specific major.minor version
 
         """
-        pass
+        raise NotImplementedError()
 
     def unblock(self, *commands):
         """Unblock an operation that would alter this model.
@@ -1337,7 +1390,7 @@ class Model(object):
             'all-changes', 'destroy-model', 'remove-object'
 
         """
-        pass
+        raise NotImplementedError()
 
     def unset_config(self, *keys):
         """Unset configuration on this model.
@@ -1345,13 +1398,13 @@ class Model(object):
         :param str \*keys: The keys to unset
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_gui(self):
         """Upgrade the Juju GUI for this model.
 
         """
-        pass
+        raise NotImplementedError()
 
     def upgrade_juju(
             self, dry_run=False, reset_previous_upgrade=False,
@@ -1365,7 +1418,7 @@ class Model(object):
         :param str version: Upgrade to a specific version
 
         """
-        pass
+        raise NotImplementedError()
 
     def upload_backup(self, archive_path):
         """Store a backup archive remotely in Juju.
@@ -1373,7 +1426,7 @@ class Model(object):
         :param str archive_path: Path to local archive
 
         """
-        pass
+        raise NotImplementedError()
 
     @property
     def charmstore(self):
@@ -1489,7 +1542,7 @@ class BundleHandler(object):
             charm_urls = await asyncio.gather(*[
                 self.model.add_local_charm_dir(*params)
                 for params in args
-            ])
+            ], loop=self.model.loop)
             # Update the 'charm:' entry for each app with the new 'local:' url.
             for app_name, charm_url in zip(apps, charm_urls):
                 bundle['services'][app_name]['charm'] = charm_url
@@ -1648,24 +1701,16 @@ class BundleHandler(object):
         """
         # resolve indirect references
         charm = self.resolve(charm)
-        # stringify all config values for API
-        options = {k: str(v) for k, v in options.items()}
-        # build param object
-        app = client.ApplicationDeploy(
+        await self.model._deploy(
             charm_url=charm,
-            series=series,
             application=application,
+            series=series,
             config=options,
-            constraints=parse_constraints(constraints),
-            storage=storage,
+            constraints=constraints,
             endpoint_bindings=endpoint_bindings,
             resources=resources,
+            storage=storage,
         )
-        # do the do
-        log.info('Deploying %s', charm)
-        await self.app_facade.Deploy([app])
-        # ensure the app is in the model for future operations
-        await self.model._wait_for_new('application', application)
         return application
 
     async def addUnit(self, application, to):
@@ -1733,7 +1778,7 @@ class CharmStore(object):
     """
     def __init__(self, loop):
         self.loop = loop
-        self._cs = charmstore.CharmStore()
+        self._cs = theblues.charmstore.CharmStore(timeout=5)
 
     def __getattr__(self, name):
         """
@@ -1747,7 +1792,13 @@ class CharmStore(object):
         else:
             async def coro(*args, **kwargs):
                 method = partial(attr, *args, **kwargs)
-                return await self.loop.run_in_executor(None, method)
+                for attempt in range(1, 4):
+                    try:
+                        return await self.loop.run_in_executor(None, method)
+                    except theblues.errors.ServerError:
+                        if attempt == 3:
+                            raise
+                        await asyncio.sleep(1, loop=self.loop)
             setattr(self, name, coro)
             wrapper = coro
         return wrapper