New N2VC interface + updated libjuju
[osm/N2VC.git] / modules / libjuju / juju / model.py
index fc8d5e9..ac22599 100644 (file)
@@ -14,26 +14,25 @@ from concurrent.futures import CancelledError
 from functools import partial
 from pathlib import Path
 
-import websockets
-import yaml
 import theblues.charmstore
 import theblues.errors
+import websockets
+import yaml
 
 from . import tag, utils
-from .client import client
-from .client import connection
+from .client import client, connector
 from .client.client import ConfigValue
-from .constraints import parse as parse_constraints, normalize_key
-from .delta import get_entity_delta
-from .delta import get_entity_class
+from .constraints import parse as parse_constraints
+from .constraints import normalize_key
+from .delta import get_entity_class, get_entity_delta
+from .errors import JujuAPIError, JujuError
 from .exceptions import DeadEntityException
-from .errors import JujuError, JujuAPIError
 from .placement import parse as parse_placement
 
 log = logging.getLogger(__name__)
 
 
-class _Observer(object):
+class _Observer:
     """Wrapper around an observer callable.
 
     This wrapper allows filter criteria to be associated with the
@@ -77,7 +76,7 @@ class _Observer(object):
         return True
 
 
-class ModelObserver(object):
+class ModelObserver:
     """
     Base class for creating observers that react to changes in a model.
     """
@@ -100,7 +99,7 @@ class ModelObserver(object):
         pass
 
 
-class ModelState(object):
+class ModelState:
     """Holds the state of the model, including the delta history of all
     entities in the model.
 
@@ -144,6 +143,14 @@ class ModelState(object):
         """
         return self._live_entity_map('unit')
 
+    @property
+    def relations(self):
+        """Return a map of relation-id:Relation for all relations currently in
+        the model.
+
+        """
+        return self._live_entity_map('relation')
+
     def entity_history(self, entity_type, entity_id):
         """Return the history deque for an entity.
 
@@ -209,7 +216,7 @@ class ModelState(object):
             connected=connected)
 
 
-class ModelEntity(object):
+class ModelEntity:
     """An object in the Model tree"""
 
     def __init__(self, entity_id, model, history_index=-1, connected=True):
@@ -228,7 +235,7 @@ class ModelEntity(object):
         self.model = model
         self._history_index = history_index
         self.connected = connected
-        self.connection = model.connection
+        self.connection = model.connection()
 
     def __repr__(self):
         return '<{} entity_id="{}">'.format(type(self).__name__,
@@ -380,90 +387,148 @@ class ModelEntity(object):
         return self.model.state.get_entity(self.entity_type, self.entity_id)
 
 
-class Model(object):
+class Model:
     """
     The main API for interacting with a Juju model.
     """
-    def __init__(self, loop=None,
-                 max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE):
-        """Instantiate a new connected Model.
+    def __init__(
+        self,
+        loop=None,
+        max_frame_size=None,
+        bakery_client=None,
+        jujudata=None,
+    ):
+        """Instantiate a new Model.
+
+        The connect method will need to be called before this
+        object can be used for anything interesting.
+
+        If jujudata is None, jujudata.FileJujuData will be used.
 
         :param loop: an asyncio event loop
         :param max_frame_size: See
             `juju.client.connection.Connection.MAX_FRAME_SIZE`
+        :param bakery_client httpbakery.Client: The bakery client to use
+            for macaroon authorization.
+        :param jujudata JujuData: The source for current controller information.
+        """
+        self._connector = connector.Connector(
+            loop=loop,
+            max_frame_size=max_frame_size,
+            bakery_client=bakery_client,
+            jujudata=jujudata,
+        )
+        self._observers = weakref.WeakValueDictionary()
+        self.state = ModelState(self)
+        self._info = None
+        self._watch_stopping = asyncio.Event(loop=self._connector.loop)
+        self._watch_stopped = asyncio.Event(loop=self._connector.loop)
+        self._watch_received = asyncio.Event(loop=self._connector.loop)
+        self._watch_stopped.set()
+        self._charmstore = CharmStore(self._connector.loop)
+
+    def is_connected(self):
+        """Reports whether the Model is currently connected."""
+        return self._connector.is_connected()
+
+    @property
+    def loop(self):
+        return self._connector.loop
 
+    def connection(self):
+        """Return the current Connection object. It raises an exception
+        if the Model is disconnected"""
+        return self._connector.connection()
+
+    async def get_controller(self):
+        """Return a Controller instance for the currently connected model.
+        :return Controller:
         """
-        self.loop = loop or asyncio.get_event_loop()
-        self.max_frame_size = max_frame_size
-        self.connection = None
-        self.observers = weakref.WeakValueDictionary()
-        self.state = ModelState(self)
-        self.info = None
-        self._watch_stopping = asyncio.Event(loop=self.loop)
-        self._watch_stopped = asyncio.Event(loop=self.loop)
-        self._watch_received = asyncio.Event(loop=self.loop)
-        self._charmstore = CharmStore(self.loop)
+        from juju.controller import Controller
+        controller = Controller(jujudata=self._connector.jujudata)
+        kwargs = self.connection().connect_params()
+        kwargs.pop('uuid')
+        await controller._connect_direct(**kwargs)
+        return controller
 
     async def __aenter__(self):
-        await self.connect_current()
+        await self.connect()
         return self
 
     async def __aexit__(self, exc_type, exc, tb):
         await self.disconnect()
 
-        if exc_type is not None:
-            return False
+    async def connect(self, model_name=None, **kwargs):
+        """Connect to a juju model.
 
-    async def connect(self, *args, **kw):
-        """Connect to an arbitrary Juju model.
+        If any arguments are specified other than model_name, then
+        model_name must be None and an explicit connection will be made
+        using Connection.connect using those parameters (the 'uuid'
+        parameter must be specified).
 
-        args and kw are passed through to Connection.connect()
+        Otherwise, if model_name is None, connect to the current model.
 
-        """
-        if 'loop' not in kw:
-            kw['loop'] = self.loop
-        if 'max_frame_size' not in kw:
-            kw['max_frame_size'] = self.max_frame_size
-        self.connection = await connection.Connection.connect(*args, **kw)
-        await self._after_connect()
+        Otherwise, model_name must specify the name of a known
+        model.
 
-    async def connect_current(self):
-        """Connect to the current Juju model.
+        :param model_name:  Format [controller:][user/]model
 
         """
-        self.connection = await connection.Connection.connect_current(
-            self.loop, max_frame_size=self.max_frame_size)
+        await self.disconnect()
+        if not kwargs:
+            await self._connector.connect_model(model_name)
+        else:
+            if kwargs.get('uuid') is None:
+                raise ValueError('no UUID specified when connecting to model')
+            await self._connector.connect(**kwargs)
         await self._after_connect()
 
     async def connect_model(self, model_name):
-        """Connect to a specific Juju model by name.
-
-        :param model_name:  Format [controller:][user/]model
+        """
+        .. deprecated:: 0.6.2
+           Use connect(model_name=model_name) instead.
+        """
+        return await self.connect(model_name=model_name)
 
+    async def connect_current(self):
+        """
+        .. deprecated:: 0.6.2
+           Use connect instead.
         """
-        self.connection = await connection.Connection.connect_model(
-            model_name, self.loop, self.max_frame_size)
+        return await self.connect()
+
+    async def _connect_direct(self, **kwargs):
+        await self.disconnect()
+        await self._connector.connect(**kwargs)
         await self._after_connect()
 
     async def _after_connect(self):
-        """Run initialization steps after connecting to websocket.
-
-        """
         self._watch()
+
+        # Wait for the first packet of data from the AllWatcher,
+        # which contains all information on the model.
+        # TODO this means that we can't do anything until
+        # we've received all the model data, which might be
+        # a whole load of unneeded data if all the client wants
+        # to do is make one RPC call.
         await self._watch_received.wait()
+
         await self.get_info()
 
     async def disconnect(self):
         """Shut down the watcher task and close websockets.
 
         """
-        if self.connection and self.connection.is_open:
+        if not self._watch_stopped.is_set():
             log.debug('Stopping watcher task')
             self._watch_stopping.set()
             await self._watch_stopped.wait()
+            self._watch_stopping.clear()
+
+        if self.is_connected():
             log.debug('Closing model connection')
-            await self.connection.close()
-            self.connection = None
+            await self._connector.disconnect()
+            self.info = None
 
     async def add_local_charm_dir(self, charm_dir, series):
         """Upload a local charm to the model.
@@ -480,7 +545,7 @@ class Model(object):
         with fh:
             func = partial(
                 self.add_local_charm, fh, series, os.stat(fh.name).st_size)
-            charm_url = await self.loop.run_in_executor(None, func)
+            charm_url = await self._connector.loop.run_in_executor(None, func)
 
         log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
         return charm_url
@@ -505,7 +570,7 @@ class Model(object):
            instead.
 
         """
-        conn, headers, path_prefix = self.connection.https_connection()
+        conn, headers, path_prefix = self.connection().https_connection()
         path = "%s/charms?series=%s" % (path_prefix, series)
         headers['Content-Type'] = 'application/zip'
         if size:
@@ -549,13 +614,20 @@ class Model(object):
     async def block_until(self, *conditions, timeout=None, wait_period=0.5):
         """Return only after all conditions are true.
 
+        Raises `websockets.ConnectionClosed` if disconnected.
         """
-        async def _block():
-            while not all(c() for c in conditions):
-                if not (self.connection and self.connection.is_open):
-                    raise websockets.ConnectionClosed(1006, 'no reason')
-                await asyncio.sleep(wait_period, loop=self.loop)
-        await asyncio.wait_for(_block(), timeout, loop=self.loop)
+        def _disconnected():
+            return not (self.is_connected() and self.connection().is_open)
+
+        def done():
+            return _disconnected() or all(c() for c in conditions)
+
+        await utils.block_until(done,
+                                timeout=timeout,
+                                wait_period=wait_period,
+                                loop=self.loop)
+        if _disconnected():
+            raise websockets.ConnectionClosed(1006, 'no reason')
 
     @property
     def applications(self):
@@ -581,6 +653,13 @@ class Model(object):
         """
         return self.state.units
 
+    @property
+    def relations(self):
+        """Return a list of all Relations currently in the model.
+
+        """
+        return list(self.state.relations.values())
+
     async def get_info(self):
         """Return a client.ModelInfo object for this Model.
 
@@ -594,7 +673,7 @@ class Model(object):
         explicit call to this method.
 
         """
-        facade = client.ClientFacade.from_connection(self.connection)
+        facade = client.ClientFacade.from_connection(self.connection())
 
         self.info = await facade.ModelInfo()
         log.debug('Got ModelInfo: %s', vars(self.info))
@@ -639,7 +718,7 @@ class Model(object):
         """
         observer = _Observer(
             callable_, entity_type, action, entity_id, predicate)
-        self.observers[observer] = callable_
+        self._observers[observer] = callable_
 
     def _watch(self):
         """Start an asynchronous watch against this model.
@@ -650,13 +729,13 @@ class Model(object):
         async def _all_watcher():
             try:
                 allwatcher = client.AllWatcherFacade.from_connection(
-                    self.connection)
+                    self.connection())
                 while not self._watch_stopping.is_set():
                     try:
                         results = await utils.run_with_interrupt(
                             allwatcher.Next(),
                             self._watch_stopping,
-                            self.loop)
+                            self._connector.loop)
                     except JujuAPIError as e:
                         if 'watcher was stopped' not in str(e):
                             raise
@@ -673,19 +752,27 @@ class Model(object):
                         del allwatcher.Id
                         continue
                     except websockets.ConnectionClosed:
-                        monitor = self.connection.monitor
+                        monitor = self.connection().monitor
                         if monitor.status == monitor.ERROR:
                             # closed unexpectedly, try to reopen
                             log.warning(
                                 'Watcher: connection closed, reopening')
-                            await self.connection.reconnect()
+                            await self.connection().reconnect()
+                            if monitor.status != monitor.CONNECTED:
+                                # reconnect failed; abort and shutdown
+                                log.error('Watcher: automatic reconnect '
+                                          'failed; stopping watcher')
+                                break
                             del allwatcher.Id
                             continue
                         else:
                             # closed on request, go ahead and shutdown
                             break
                     if self._watch_stopping.is_set():
-                        await allwatcher.Stop()
+                        try:
+                            await allwatcher.Stop()
+                        except websockets.ConnectionClosed:
+                            pass  # can't stop on a closed conn
                         break
                     for delta in results.deltas:
                         delta = get_entity_delta(delta)
@@ -704,7 +791,7 @@ class Model(object):
         self._watch_received.clear()
         self._watch_stopping.clear()
         self._watch_stopped.clear()
-        self.loop.create_task(_all_watcher())
+        self._connector.loop.create_task(_all_watcher())
 
     async def _notify_observers(self, delta, old_obj, new_obj):
         """Call observing callbacks, notifying them of a change in model state
@@ -724,10 +811,10 @@ class Model(object):
             'Model changed: %s %s %s',
             delta.entity, delta.type, delta.get_id())
 
-        for o in self.observers:
+        for o in self._observers:
             if o.cares_about(delta):
                 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
-                                      loop=self.loop)
+                                      loop=self._connector.loop)
 
     async def _wait(self, entity_type, entity_id, action, predicate=None):
         """
@@ -744,7 +831,7 @@ class Model(object):
             has a 'completed' status. See the _Observer class for details.
 
         """
-        q = asyncio.Queue(loop=self.loop)
+        q = asyncio.Queue(loop=self._connector.loop)
 
         async def callback(delta, old, new, model):
             await q.put(delta.get_id())
@@ -755,24 +842,19 @@ class Model(object):
         # 'remove' action
         return self.state._live_entity_map(entity_type).get(entity_id)
 
-    async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
+    async def _wait_for_new(self, entity_type, entity_id):
         """Wait for a new object to appear in the Model and return it.
 
-        Waits for an object of type ``entity_type`` with id ``entity_id``.
-        If ``entity_id`` is ``None``, it will wait for the first new entity
-        of the correct type.
-
-        This coroutine blocks until the new object appears in the model.
+        Waits for an object of type ``entity_type`` with id ``entity_id``
+        to appear in the model.  This is similar to watching for the
+        object using ``block_until``, but uses the watcher rather than
+        polling.
 
         """
         # if the entity is already in the model, just return it
         if entity_id in self.state._live_entity_map(entity_type):
             return self.state._live_entity_map(entity_type)[entity_id]
-        # if we know the entity_id, we can trigger on any action that puts
-        # the enitty into the model; otherwise, we have to watch for the
-        # next "add" action on that entity_type
-        action = 'add' if entity_id is None else None
-        return await self._wait(entity_type, entity_id, action, predicate)
+        return await self._wait(entity_type, entity_id, None)
 
     async def wait_for_action(self, action_id):
         """Given an action, wait for it to complete."""
@@ -785,7 +867,7 @@ class Model(object):
         def predicate(delta):
             return delta.data['status'] in ('completed', 'failed')
 
-        return await self._wait('action', action_id, 'change', predicate)
+        return await self._wait('action', action_id, None, predicate)
 
     async def add_machine(
             self, spec=None, constraints=None, disks=None, series=None):
@@ -865,7 +947,7 @@ class Model(object):
             params.series = series
 
         # Submit the request.
-        client_facade = client.ClientFacade.from_connection(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection())
         results = await client_facade.AddMachines([params])
         error = results.machines[0].error
         if error:
@@ -881,29 +963,33 @@ class Model(object):
         :param str relation2: '<application>[:<relation_name>]'
 
         """
-        app_facade = client.ApplicationFacade.from_connection(self.connection)
+        app_facade = client.ApplicationFacade.from_connection(self.connection())
 
         log.debug(
             'Adding relation %s <-> %s', relation1, relation2)
 
+        def _find_relation(*specs):
+            for rel in self.relations:
+                if rel.matches(*specs):
+                    return rel
+            return None
+
         try:
             result = await app_facade.AddRelation([relation1, relation2])
         except JujuAPIError as e:
             if 'relation already exists' not in e.message:
                 raise
-            log.debug(
-                'Relation %s <-> %s already exists', relation1, relation2)
-            # TODO: if relation already exists we should return the
-            # Relation ModelEntity here
-            return None
+            rel = _find_relation(relation1, relation2)
+            if rel:
+                return rel
+            raise JujuError('Relation {} {} exists but not in model'.format(
+                relation1, relation2))
 
-        def predicate(delta):
-            endpoints = {}
-            for endpoint in delta.data['endpoints']:
-                endpoints[endpoint['application-name']] = endpoint['relation']
-            return endpoints == result.endpoints
+        specs = ['{}:{}'.format(app, data['name'])
+                 for app, data in result.endpoints.items()]
 
-        return await self._wait_for_new('relation', None, predicate)
+        await self.block_until(lambda: _find_relation(*specs) is not None)
+        return _find_relation(*specs)
 
     def add_space(self, name, *cidrs):
         """Add a new network space.
@@ -924,7 +1010,7 @@ class Model(object):
         :param str key: The public ssh key
 
         """
-        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
         return await key_facade.AddKeys([key], user)
     add_ssh_keys = add_ssh_key
 
@@ -1072,9 +1158,14 @@ class Model(object):
                 for k, v in storage.items()
             }
 
+        entity_path = Path(entity_url.replace('local:', ''))
+        bundle_path = entity_path / 'bundle.yaml'
+        metadata_path = entity_path / 'metadata.yaml'
+
         is_local = (
             entity_url.startswith('local:') or
-            os.path.isdir(entity_url)
+            entity_path.is_dir() or
+            entity_path.is_file()
         )
         if is_local:
             entity_id = entity_url.replace('local:', '')
@@ -1082,10 +1173,11 @@ class Model(object):
             entity = await self.charmstore.entity(entity_url, channel=channel)
             entity_id = entity['Id']
 
-        client_facade = client.ClientFacade.from_connection(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection())
 
         is_bundle = ((is_local and
-                      (Path(entity_id) / 'bundle.yaml').exists()) or
+                      (entity_id.endswith('.yaml') and entity_path.exists()) or
+                      bundle_path.exists()) or
                      (not is_local and 'bundle/' in entity_id))
 
         if is_bundle:
@@ -1100,9 +1192,9 @@ class Model(object):
                 await asyncio.gather(*[
                     asyncio.ensure_future(
                         self._wait_for_new('application', app_name),
-                        loop=self.loop)
+                        loop=self._connector.loop)
                     for app_name in pending_apps
-                ], loop=self.loop)
+                ], loop=self._connector.loop)
             return [app for name, app in self.applications.items()
                     if name in handler.applications]
         else:
@@ -1118,6 +1210,9 @@ class Model(object):
                                                             entity_id,
                                                             entity)
             else:
+                if not application_name:
+                    metadata = yaml.load(metadata_path.read_text())
+                    application_name = metadata['name']
                 # We have a local charm dir that needs to be uploaded
                 charm_dir = os.path.abspath(
                     os.path.expanduser(entity_id))
@@ -1163,7 +1258,7 @@ class Model(object):
             return None
 
         resources_facade = client.ResourcesFacade.from_connection(
-            self.connection)
+            self.connection())
         response = await resources_facade.AddPendingResources(
             tag.application(application),
             entity_url,
@@ -1186,7 +1281,7 @@ class Model(object):
                            default_flow_style=False)
 
         app_facade = client.ApplicationFacade.from_connection(
-            self.connection)
+            self.connection())
 
         app = client.ApplicationDeploy(
             charm_url=charm_url,
@@ -1201,7 +1296,6 @@ class Model(object):
             storage=storage,
             placement=placement
         )
-
         result = await app_facade.Deploy([app])
         errors = [r.error.message for r in result.results if r.error]
         if errors:
@@ -1218,7 +1312,7 @@ class Model(object):
         """Destroy units by name.
 
         """
-        app_facade = client.ApplicationFacade.from_connection(self.connection)
+        app_facade = client.ApplicationFacade.from_connection(self.connection())
 
         log.debug(
             'Destroying unit%s %s',
@@ -1263,7 +1357,7 @@ class Model(object):
             which have `source` and `value` attributes.
         """
         config_facade = client.ModelConfigFacade.from_connection(
-            self.connection
+            self.connection()
         )
         result = await config_facade.ModelGet()
         config = result.config
@@ -1277,22 +1371,6 @@ class Model(object):
         """
         raise NotImplementedError()
 
-    async def grant(self, username, acl='read'):
-        """Grant a user access to this model.
-
-        :param str username: Username
-        :param str acl: Access control ('read' or 'write')
-
-        """
-        controller_conn = await self.connection.controller()
-        model_facade = client.ModelManagerFacade.from_connection(
-            controller_conn)
-        user = tag.user(username)
-        model = tag.model(self.info.uuid)
-        changes = client.ModifyModelAccess(acl, 'grant', model, user)
-        await self.revoke(username)
-        return await model_facade.ModifyModelAccess([changes])
-
     def import_ssh_key(self, identity):
         """Add a public SSH key from a trusted indentity source to this model.
 
@@ -1326,7 +1404,7 @@ class Model(object):
             else it's fingerprint
 
         """
-        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
         entity = {'tag': tag.model(self.info.uuid)}
         entities = client.Entities([entity])
         return await key_facade.ListKeys(entities, raw_ssh)
@@ -1399,7 +1477,7 @@ class Model(object):
         :param str user: Juju user to which the key is registered
 
         """
-        key_facade = client.KeyManagerFacade.from_connection(self.connection)
+        key_facade = client.KeyManagerFacade.from_connection(self.connection())
         key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
         key = hashlib.md5(key).hexdigest()
         key = ':'.join(a + b for a, b in zip(key[::2], key[1::2]))
@@ -1427,20 +1505,6 @@ class Model(object):
         """
         raise NotImplementedError()
 
-    async def revoke(self, username):
-        """Revoke a user's access to this model.
-
-        :param str username: Username to revoke
-
-        """
-        controller_conn = await self.connection.controller()
-        model_facade = client.ModelManagerFacade.from_connection(
-            controller_conn)
-        user = tag.user(username)
-        model = tag.model(self.info.uuid)
-        changes = client.ModifyModelAccess('read', 'revoke', model, user)
-        return await model_facade.ModifyModelAccess([changes])
-
     def run(self, command, timeout=None):
         """Run command on all machines in this model.
 
@@ -1457,7 +1521,7 @@ class Model(object):
             `ConfigValue` instances, as returned by `get_config`.
         """
         config_facade = client.ModelConfigFacade.from_connection(
-            self.connection
+            self.connection()
         )
         for key, value in config.items():
             if isinstance(value, ConfigValue):
@@ -1506,7 +1570,7 @@ class Model(object):
         :param bool utc: Display time as UTC in RFC3339 format
 
         """
-        client_facade = client.ClientFacade.from_connection(self.connection)
+        client_facade = client.ClientFacade.from_connection(self.connection())
         return await client_facade.FullStatus(filters)
 
     def sync_tools(
@@ -1587,7 +1651,7 @@ class Model(object):
                   ', '.join(tags) if tags else "all units")
 
         metrics_facade = client.MetricsDebugFacade.from_connection(
-            self.connection)
+            self.connection())
 
         entities = [client.Entity(tag) for tag in tags]
         metrics_result = await metrics_facade.GetMetrics(entities)
@@ -1623,7 +1687,7 @@ def get_charm_series(path):
     return series[0] if series else None
 
 
-class BundleHandler(object):
+class BundleHandler:
     """
     Handle bundles by using the API to translate bundle YAML into a plan of
     steps and then dispatching each of those using the API.
@@ -1638,11 +1702,11 @@ class BundleHandler(object):
             app_units = self._units_by_app.setdefault(unit.application, [])
             app_units.append(unit_name)
         self.client_facade = client.ClientFacade.from_connection(
-            model.connection)
+            model.connection())
         self.app_facade = client.ApplicationFacade.from_connection(
-            model.connection)
+            model.connection())
         self.ann_facade = client.AnnotationsFacade.from_connection(
-            model.connection)
+            model.connection())
 
     async def _handle_local_charms(self, bundle):
         """Search for references to local charms (i.e. filesystem paths)
@@ -1694,8 +1758,11 @@ class BundleHandler(object):
         return bundle
 
     async def fetch_plan(self, entity_id):
-        is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
-        if is_local:
+        is_local = not entity_id.startswith('cs:')
+
+        if is_local and os.path.isfile(entity_id):
+            bundle_yaml = Path(entity_id).read_text()
+        elif is_local and os.path.isdir(entity_id):
             bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
         else:
             bundle_yaml = await self.charmstore.files(entity_id,
@@ -1920,7 +1987,7 @@ class BundleHandler(object):
         return await entity.set_annotations(annotations)
 
 
-class CharmStore(object):
+class CharmStore:
     """
     Async wrapper around theblues.charmstore.CharmStore
     """
@@ -1952,7 +2019,7 @@ class CharmStore(object):
         return wrapper
 
 
-class CharmArchiveGenerator(object):
+class CharmArchiveGenerator:
     """
     Create a Zip archive of a local charm directory for upload to a controller.