- await self._connect()
-
- async def _connect(self):
- endpoints = [(self._endpoint, self._cacert)]
- while endpoints:
- _endpoint, _cacert = endpoints.pop(0)
- success, result, new_endpoints = await self._try_endpoint(
- _endpoint, _cacert)
- if success:
- break
- endpoints.extend(new_endpoints)
- else:
- # ran out of endpoints without a successful login
- raise Exception("Couldn't authenticate to {}".format(
- self._endpoint))
-
- response = result['response']
- self.info = response.copy()
- self.build_facades(response.get('facades', {}))
- self.loop.create_task(self.pinger())
- self.monitor.pinger_stopped.clear()
+ await self._connect_with_login([(self.endpoint, self.cacert)])
+
+ async def _connect(self, endpoints):
+ if len(endpoints) == 0:
+ raise errors.JujuConnectionError('no endpoints to connect to')
+
+ async def _try_endpoint(endpoint, cacert, delay):
+ if delay:
+ await asyncio.sleep(delay)
+ return await self._open(endpoint, cacert)
+
+ # Try all endpoints in parallel, with slight increasing delay (+100ms
+ # for each subsequent endpoint); the delay allows us to prefer the
+ # earlier endpoints over the latter. Use first successful connection.
+ tasks = [self.loop.create_task(_try_endpoint(endpoint, cacert,
+ 0.1 * i))
+ for i, (endpoint, cacert) in enumerate(endpoints)]
+ for attempt in range(self._retries + 1):
+ for task in asyncio.as_completed(tasks, loop=self.loop):
+ try:
+ result = await task
+ break
+ except ConnectionError:
+ continue # ignore; try another endpoint
+ else:
+ _endpoints_str = ', '.join([endpoint
+ for endpoint, cacert in endpoints])
+ if attempt < self._retries:
+ log.debug('Retrying connection to endpoints: {}; '
+ 'attempt {} of {}'.format(_endpoints_str,
+ attempt + 1,
+ self._retries + 1))
+ await asyncio.sleep((attempt + 1) * self._retry_backoff)
+ continue
+ else:
+ raise errors.JujuConnectionError(
+ 'Unable to connect to any endpoint: '
+ '{}'.format(_endpoints_str))
+ # only executed if inner loop's else did not continue
+ # (i.e., inner loop did break due to successful connection)
+ break
+ for task in tasks:
+ task.cancel()
+ self.ws = result[0]
+ self.addr = result[1]
+ self.endpoint = result[2]
+ self.cacert = result[3]
+ self._receiver_task.start()
+ log.debug("Driver connected to juju %s", self.addr)
+ self.monitor.close_called.clear()