Merge branch 'master' into bug/fix-invalid-annotations
authorPete Vander Giessen <petevg@gmail.com>
Tue, 7 Mar 2017 21:20:39 +0000 (15:20 -0600)
committerGitHub <noreply@github.com>
Tue, 7 Mar 2017 21:20:39 +0000 (15:20 -0600)
1  2 
juju/client/connection.py
juju/model.py

@@@ -11,10 -11,11 +11,11 @@@ import subproces
  import websockets
  from http.client import HTTPSConnection
  
+ import asyncio
  import yaml
  
  from juju import tag
- from juju.errors import JujuAPIError, JujuConnectionError, JujuError
+ from juju.errors import JujuError, JujuAPIError, JujuConnectionError
  
  log = logging.getLogger("websocket")
  
@@@ -33,16 -34,19 +34,19 @@@ class Connection
          # Connect to the currently active model
          client = await Connection.connect_current()
  
+     Note: Any connection method or constructor can accept an optional `loop`
+     argument to override the default event loop from `asyncio.get_event_loop`.
      """
      def __init__(
              self, endpoint, uuid, username, password, cacert=None,
-             macaroons=None):
+             macaroons=None, loop=None):
          self.endpoint = endpoint
          self.uuid = uuid
          self.username = username
          self.password = password
          self.macaroons = macaroons
          self.cacert = cacert
+         self.loop = loop or asyncio.get_event_loop()
  
          self.__request_id__ = 0
          self.addr = None
@@@ -67,6 -71,7 +71,7 @@@
  
          kw = dict()
          kw['ssl'] = self._get_ssl(self.cacert)
+         kw['loop'] = self.loop
          self.addr = url
          self.ws = await websockets.connect(url, **kw)
          log.info("Driver connected to juju %s", url)
          outgoing = json.dumps(msg, indent=2, cls=encoder)
          await self.ws.send(outgoing)
          result = await self.recv()
 -        if result and 'error' in result:
 +
 +        if not result:
 +            return result
 +
 +        if 'error' in result:
 +            # API Error Response
              raise JujuAPIError(result)
 +
 +        if not 'response' in result:
 +            # This may never happen
 +            return result
 +
 +        if 'results' in result['response']:
 +            # Check for errors in a result list.
 +            errors = []
 +            for res in result['response']['results']:
 +                if res.get('error', {}).get('message'):
 +                    errors.append(res['error']['message'])
 +            if errors:
 +                raise JujuError(errors)
 +
 +        elif result['response'].get('error', {}).get('message'):
 +            raise JujuError(result['response']['error']['message'])
 +
          return result
  
      def http_headers(self):
              self.password,
              self.cacert,
              self.macaroons,
+             self.loop,
          )
  
      async def controller(self):
              self.password,
              self.cacert,
              self.macaroons,
+             self.loop,
          )
  
      @classmethod
      async def connect(
              cls, endpoint, uuid, username, password, cacert=None,
-             macaroons=None):
+             macaroons=None, loop=None):
          """Connect to the websocket.
  
          If uuid is None, the connection will be to the controller. Otherwise it
          will be to the model.
  
          """
-         client = cls(endpoint, uuid, username, password, cacert, macaroons)
+         client = cls(endpoint, uuid, username, password, cacert, macaroons,
+                      loop)
          await client.open()
  
          redirect_info = await client.redirect_info()
              "Couldn't authenticate to %s", endpoint)
  
      @classmethod
-     async def connect_current(cls):
+     async def connect_current(cls, loop=None):
          """Connect to the currently active model.
  
          """
          jujudata = JujuData()
          controller_name = jujudata.current_controller()
-         models = jujudata.models()[controller_name]
-         model_name = models['current-model']
+         model_name = jujudata.current_model()
  
          return await cls.connect_model(
-             '{}:{}'.format(controller_name, model_name))
+             '{}:{}'.format(controller_name, model_name), loop)
  
      @classmethod
-     async def connect_current_controller(cls):
+     async def connect_current_controller(cls, loop=None):
          """Connect to the currently active controller.
  
          """
          if not controller_name:
              raise JujuConnectionError('No current controller')
  
-         return await cls.connect_controller(controller_name)
+         return await cls.connect_controller(controller_name, loop)
  
      @classmethod
-     async def connect_controller(cls, controller_name):
+     async def connect_controller(cls, controller_name, loop=None):
          """Connect to a controller by name.
  
          """
          macaroons = get_macaroons() if not password else None
  
          return await cls.connect(
-             endpoint, None, username, password, cacert, macaroons)
+             endpoint, None, username, password, cacert, macaroons, loop)
  
      @classmethod
-     async def connect_model(cls, model):
+     async def connect_model(cls, model, loop=None):
          """Connect to a model by name.
  
-         :param str model: <controller>:<model>
+         :param str model: [<controller>:]<model>
  
          """
-         controller_name, model_name = model.split(':')
          jujudata = JujuData()
+         if ':' in model:
+             # explicit controller given
+             controller_name, model_name = model.split(':')
+         else:
+             # use the current controller if one isn't explicitly given
+             controller_name = jujudata.current_controller()
+             model_name = model
+         accounts = jujudata.accounts()[controller_name]
+         username = accounts['user']
+         # model name must include a user prefix, so add it if it doesn't
+         if '/' not in model_name:
+             model_name = '{}/{}'.format(username, model_name)
          controller = jujudata.controllers()[controller_name]
          endpoint = controller['api-endpoints'][0]
          cacert = controller.get('ca-cert')
-         accounts = jujudata.accounts()[controller_name]
-         username = accounts['user']
          password = accounts.get('password')
          models = jujudata.models()[controller_name]
          model_uuid = models['models'][model_name]['uuid']
          macaroons = get_macaroons() if not password else None
  
          return await cls.connect(
-             endpoint, model_uuid, username, password, cacert, macaroons)
+             endpoint, model_uuid, username, password, cacert, macaroons, loop)
  
      def build_facades(self, info):
          self.facades.clear()
@@@ -348,6 -344,14 +366,14 @@@ class JujuData
          output = yaml.safe_load(output)
          return output.get('current-controller', '')
  
+     def current_model(self, controller_name=None):
+         if not controller_name:
+             controller_name = self.current_controller()
+         models = self.models()[controller_name]
+         if 'current-model' not in models:
+             raise JujuError('No current model')
+         return models['current-model']
      def controllers(self):
          return self._load_yaml('controllers.yaml', 'controllers')
  
@@@ -371,7 -375,7 +397,7 @@@ def get_macaroons()
          cookie_file = os.path.expanduser('~/.go-cookies')
          with open(cookie_file, 'r') as f:
              cookies = json.load(f)
-     except (OSError, ValueError) as e:
+     except (OSError, ValueError):
          log.warn("Couldn't load macaroons from %s", cookie_file)
          return []
  
diff --combined juju/model.py
@@@ -13,7 -13,8 +13,8 @@@ from functools import partia
  from pathlib import Path
  
  import yaml
- from theblues import charmstore
+ import theblues.charmstore
+ import theblues.errors
  
  from .client import client
  from .client import watcher
@@@ -376,8 -377,8 +377,8 @@@ class Model(object)
          self.state = ModelState(self)
          self.info = None
          self._watcher_task = None
-         self._watch_shutdown = asyncio.Event(loop=loop)
-         self._watch_received = asyncio.Event(loop=loop)
+         self._watch_shutdown = asyncio.Event(loop=self.loop)
+         self._watch_received = asyncio.Event(loop=self.loop)
          self._charmstore = CharmStore(self.loop)
  
      async def connect(self, *args, **kw):
          args and kw are passed through to Connection.connect()
  
          """
+         if 'loop' not in kw:
+             kw['loop'] = self.loop
          self.connection = await connection.Connection.connect(*args, **kw)
          await self._after_connect()
  
          """Connect to the current Juju model.
  
          """
-         self.connection = await connection.Connection.connect_current()
+         self.connection = await connection.Connection.connect_current(
+             self.loop)
          await self._after_connect()
  
      async def connect_model(self, model_name):
          :param model_name:  Format [controller:][user/]model
  
          """
-         self.connection = await connection.Connection.connect_model(model_name)
+         self.connection = await connection.Connection.connect_model(model_name,
+                                                                     self.loop)
          await self._after_connect()
  
      async def _after_connect(self):
          """
          async def _block():
              while not all(c() for c in conditions):
-                 await asyncio.sleep(wait_period)
-         await asyncio.wait_for(_block(), timeout)
+                 await asyncio.sleep(wait_period, loop=self.loop)
+         await asyncio.wait_for(_block(), timeout, loop=self.loop)
  
      @property
      def applications(self):
                          # canceled with it. So we shield them. But this means
                          # they can *never* be canceled.
                          await asyncio.shield(
-                             self._notify_observers(delta, old_obj, new_obj))
+                             self._notify_observers(delta, old_obj, new_obj),
+                             loop=self.loop)
                      self._watch_received.set()
              except CancelledError:
                  log.debug('Closing watcher connection')
  
          for o in self.observers:
              if o.cares_about(delta):
-                 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+                 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+                                       loop=self.loop)
  
      async def _wait(self, entity_type, entity_id, action, predicate=None):
          """
          """
          raise NotImplementedError()
  
+     def _get_series(self, entity_url, entity):
+         # try to get the series from the provided charm URL
+         if entity_url.startswith('cs:'):
+             parts = entity_url[3:].split('/')
+         else:
+             parts = entity_url.split('/')
+         if parts[0].startswith('~'):
+             parts.pop(0)
+         if len(parts) > 1:
+             # series was specified in the URL
+             return parts[0]
+         # series was not supplied at all, so use the newest
+         # supported series according to the charm store
+         ss = entity['Meta']['supported-series']
+         return ss['SupportedSeries'][0]
      async def deploy(
              self, entity_url, application_name=None, bind=None, budget=None,
              channel=None, config=None, constraints=None, force=False,
              - series is required; how do we pick a default?
  
          """
-         if to:
-             placement = parse_placement(to)
-         else:
-             placement = []
          if storage:
              storage = {
                  k: client.Constraints(**v)
              entity_url.startswith('local:') or
              os.path.isdir(entity_url)
          )
-         entity_id = await self.charmstore.entityId(entity_url) \
-             if not is_local else entity_url
+         if is_local:
+             entity_id = entity_url
+         else:
+             entity = await self.charmstore.entity(entity_url)
+             entity_id = entity['Id']
  
-         app_facade = client.ApplicationFacade()
          client_facade = client.ClientFacade()
-         app_facade.connect(self.connection)
          client_facade.connect(self.connection)
  
          is_bundle = ((is_local and
                  # haven't made it yet we'll need to wait on them to be added
                  await asyncio.gather(*[
                      asyncio.ensure_future(
-                         self._wait_for_new('application', app_name))
+                         self._wait_for_new('application', app_name),
+                         loop=self.loop)
                      for app_name in pending_apps
-                 ])
+                 ], loop=self.loop)
              return [app for name, app in self.applications.items()
                      if name in handler.applications]
          else:
-             log.debug(
-                 'Deploying %s', entity_id)
              if not is_local:
+                 if not application_name:
+                     application_name = entity['Meta']['charm-metadata']['Name']
+                 if not series:
+                     series = self._get_series(entity_url, entity)
+                 if not channel:
+                     channel = 'stable'
                  await client_facade.AddCharm(channel, entity_id)
-             elif not entity_id.startswith('local:'):
+             else:
                  # We have a local charm dir that needs to be uploaded
                  charm_dir = os.path.abspath(
                      os.path.expanduser(entity_id))
                          "Pass a 'series' kwarg to Model.deploy().".format(
                              charm_dir))
                  entity_id = await self.add_local_charm_dir(charm_dir, series)
-             app = client.ApplicationDeploy(
-                 application=application_name,
-                 channel=channel,
+             return await self._deploy(
                  charm_url=entity_id,
-                 config=config,
-                 constraints=parse_constraints(constraints),
+                 application=application_name,
+                 series=series,
+                 config=config or {},
+                 constraints=constraints,
                  endpoint_bindings=bind,
-                 num_units=num_units,
                  resources=resources,
-                 series=series,
                  storage=storage,
+                 channel=channel,
+                 num_units=num_units,
+                 placement=parse_placement(to),
              )
-             app.placement = placement
  
-             await app_facade.Deploy([app])
-             return await self._wait_for_new('application', application_name)
+     async def _deploy(self, charm_url, application, series, config,
+                       constraints, endpoint_bindings, resources, storage,
+                       channel=None, num_units=None, placement=None):
+         """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
+         """
+         log.info('Deploying %s', charm_url)
+         # stringify all config values for API, and convert to YAML
+         config = {k: str(v) for k, v in config.items()}
+         config = yaml.dump({application: config},
+                            default_flow_style=False)
+         app_facade = client.ApplicationFacade()
+         app_facade.connect(self.connection)
+         app = client.ApplicationDeploy(
+             charm_url=charm_url,
+             application=application,
+             series=series,
+             channel=channel,
+             config_yaml=config,
+             constraints=parse_constraints(constraints),
+             endpoint_bindings=endpoint_bindings,
+             num_units=num_units,
+             resources=resources,
+             storage=storage,
+             placement=placement,
+         )
+         result = await app_facade.Deploy([app])
+         errors = [r.error.message for r in result.results if r.error]
+         if errors:
+             raise JujuError('\n'.join(errors))
+         return await self._wait_for_new('application', application)
  
      def destroy(self):
          """Terminate all machines and resources for this model.
          """
          raise NotImplementedError()
  
-     def get_status(self, filter_=None, utc=False):
+     async def get_status(self, filters=None, utc=False):
          """Return the status of the model.
  
-         :param str filter_: Service or unit name or wildcard ('*')
+         :param str filters: Optional list of applications, units, or machines
+             to include, which can use wildcards ('*').
          :param bool utc: Display time as UTC in RFC3339 format
  
          """
-         raise NotImplementedError()
-     status = get_status
+         client_facade = client.ClientFacade()
+         client_facade.connect(self.connection)
+         return await client_facade.FullStatus(filters)
  
      def sync_tools(
              self, all_=False, destination=None, dry_run=False, public=False,
@@@ -1488,7 -1544,7 +1544,7 @@@ class BundleHandler(object)
              charm_urls = await asyncio.gather(*[
                  self.model.add_local_charm_dir(*params)
                  for params in args
-             ])
+             ], loop=self.model.loop)
              # Update the 'charm:' entry for each app with the new 'local:' url.
              for app_name, charm_url in zip(apps, charm_urls):
                  bundle['services'][app_name]['charm'] = charm_url
          self.plan = await self.client_facade.GetBundleChanges(
              yaml.dump(self.bundle))
  
 -        if self.plan.errors:
 -            raise JujuError('\n'.join(self.plan.errors))
 -
      async def execute_plan(self):
          for step in self.plan.changes:
              method = getattr(self, step.method)
          """
          # resolve indirect references
          charm = self.resolve(charm)
-         # stringify all config values for API, and convert to YAML
-         options = {k: str(v) for k, v in options.items()}
-         options = yaml.dump({application: options}, default_flow_style=False)
-         # build param object
-         app = client.ApplicationDeploy(
+         await self.model._deploy(
              charm_url=charm,
-             series=series,
              application=application,
-             # Pass options to config-yaml rather than config, as
-             # config-yaml invokes a newer codepath that better handles
-             # empty strings in the options values.
-             config_yaml=options,
-             constraints=parse_constraints(constraints),
-             storage=storage,
+             series=series,
+             config=options,
+             constraints=constraints,
              endpoint_bindings=endpoint_bindings,
              resources=resources,
+             storage=storage,
          )
-         # do the do
-         log.info('Deploying %s', charm)
-         await self.app_facade.Deploy([app])
-         # ensure the app is in the model for future operations
-         await self.model._wait_for_new('application', application)
          return application
  
      async def addUnit(self, application, to):
@@@ -1733,7 -1780,7 +1777,7 @@@ class CharmStore(object)
      """
      def __init__(self, loop):
          self.loop = loop
-         self._cs = charmstore.CharmStore()
+         self._cs = theblues.charmstore.CharmStore(timeout=5)
  
      def __getattr__(self, name):
          """
          else:
              async def coro(*args, **kwargs):
                  method = partial(attr, *args, **kwargs)
-                 return await self.loop.run_in_executor(None, method)
+                 for attempt in range(1, 4):
+                     try:
+                         return await self.loop.run_in_executor(None, method)
+                     except theblues.errors.ServerError:
+                         if attempt == 3:
+                             raise
+                         await asyncio.sleep(1, loop=self.loop)
              setattr(self, name, coro)
              wrapper = coro
          return wrapper