X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fclient%2Fconnection.py;h=3ee8f16c914bc7052f4583f41b02de6b4470a7b6;hb=ac5b9fc0eaf87c5126949307352d1d56781ef3ef;hp=3011a8adfb2ae307e23ea2dd45cedcbb00990eda;hpb=896db9ff2ab6afeb9756948c771bddae942e2723;p=osm%2FN2VC.git diff --git a/juju/client/connection.py b/juju/client/connection.py index 3011a8a..3ee8f16 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -15,7 +15,8 @@ import asyncio import yaml from juju import tag -from juju.errors import JujuAPIError, JujuConnectionError +from juju.errors import JujuError, JujuAPIError, JujuConnectionError +from juju.utils import IdQueue log = logging.getLogger("websocket") @@ -34,6 +35,8 @@ class Connection: # Connect to the currently active model client = await Connection.connect_current() + Note: Any connection method or constructor can accept an optional `loop` + argument to override the default event loop from `asyncio.get_event_loop`. """ def __init__( self, endpoint, uuid, username, password, cacert=None, @@ -50,6 +53,7 @@ class Connection: self.addr = None self.ws = None self.facades = {} + self.messages = IdQueue(loop=self.loop) @property def is_open(self): @@ -72,17 +76,32 @@ class Connection: kw['loop'] = self.loop self.addr = url self.ws = await websockets.connect(url, **kw) + self.loop.create_task(self.receiver()) log.info("Driver connected to juju %s", url) return self async def close(self): await self.ws.close() - async def recv(self): - result = await self.ws.recv() - if result is not None: - result = json.loads(result) - return result + async def recv(self, request_id): + if not self.is_open: + raise websockets.exceptions.ConnectionClosed(0, 'websocket closed') + return await self.messages.get(request_id) + + async def receiver(self): + while self.is_open: + try: + result = await self.ws.recv() + if result is not None: + result = json.loads(result) + await self.messages.put(result['request-id'], result) + except Exception as e: + await self.messages.put_all(e) + if isinstance(e, websockets.ConnectionClosed): + # ConnectionClosed is not really exceptional for us, + # but it may be for any pending message listeners + return + raise async def rpc(self, msg, encoder=None): self.__request_id__ += 1 @@ -93,9 +112,31 @@ class Connection: msg['version'] = self.facades[msg['type']] outgoing = json.dumps(msg, indent=2, cls=encoder) await self.ws.send(outgoing) - result = await self.recv() - if result and 'error' in result: + result = await self.recv(msg['request-id']) + + if not result: + return result + + if 'error' in result: + # API Error Response raise JujuAPIError(result) + + if 'response' not in result: + # This may never happen + return result + + if 'results' in result['response']: + # Check for errors in a result list. + errors = [] + for res in result['response']['results']: + if res.get('error', {}).get('message'): + errors.append(res['error']['message']) + if errors: + raise JujuError(errors) + + elif result['response'].get('error', {}).get('message'): + raise JujuError(result['response']['error']['message']) + return result def http_headers(self): @@ -221,8 +262,7 @@ class Connection: """ jujudata = JujuData() controller_name = jujudata.current_controller() - models = jujudata.models()[controller_name] - model_name = models['current-model'] + model_name = jujudata.current_model() return await cls.connect_model( '{}:{}'.format(controller_name, model_name), loop) @@ -260,20 +300,30 @@ class Connection: async def connect_model(cls, model, loop=None): """Connect to a model by name. - :param str model: : + :param str model: [:] """ - controller_name, model_name = model.split(':') - jujudata = JujuData() + + if ':' in model: + # explicit controller given + controller_name, model_name = model.split(':') + else: + # use the current controller if one isn't explicitly given + controller_name = jujudata.current_controller() + model_name = model + + accounts = jujudata.accounts()[controller_name] + username = accounts['user'] + # model name must include a user prefix, so add it if it doesn't + if '/' not in model_name: + model_name = '{}/{}'.format(username, model_name) + controller = jujudata.controllers()[controller_name] endpoint = controller['api-endpoints'][0] cacert = controller.get('ca-cert') - accounts = jujudata.accounts()[controller_name] - username = accounts['user'] password = accounts.get('password') models = jujudata.models()[controller_name] - model_name = '{}/{}'.format(username, model_name) model_uuid = models['models'][model_name]['uuid'] macaroons = get_macaroons() if not password else None @@ -333,6 +383,14 @@ class JujuData: output = yaml.safe_load(output) return output.get('current-controller', '') + def current_model(self, controller_name=None): + if not controller_name: + controller_name = self.current_controller() + models = self.models()[controller_name] + if 'current-model' not in models: + raise JujuError('No current model') + return models['current-model'] + def controllers(self): return self._load_yaml('controllers.yaml', 'controllers')