New N2VC interface + updated libjuju
This commit introduces the Python3 N2VC module, which acts as a standard
interface to the VCA.
The goal of this is to provide a common way for modules to interface
with the VCA.
- Updated libjuju from 0.6.1 to 0.7.3
Signed-off-by: Adam Israel <adam.israel@canonical.com>
Change-Id: Ide70fb5ae5797eb6486de24653dc09a23f9c009e
diff --git a/modules/libjuju/juju/application.py b/modules/libjuju/juju/application.py
index 620e9c9..555bb3d 100644
--- a/modules/libjuju/juju/application.py
+++ b/modules/libjuju/juju/application.py
@@ -52,6 +52,24 @@
]
@property
+ def relations(self):
+ return [rel for rel in self.model.relations if rel.matches(self.name)]
+
+ def related_applications(self, endpoint_name=None):
+ apps = {}
+ for rel in self.relations:
+ if rel.is_peer:
+ local_ep, remote_ep = rel.endpoints[0]
+ else:
+ def is_us(ep):
+ return ep.application.name == self.name
+ local_ep, remote_ep = sorted(rel.endpoints, key=is_us)
+ if endpoint_name is not None and endpoint_name != local_ep.name:
+ continue
+ apps[remote_ep.application.name] = remote_ep.application
+ return apps
+
+ @property
def status(self):
"""Get the application status, as set by the charm's leader.
diff --git a/modules/libjuju/juju/client/_client.py b/modules/libjuju/juju/client/_client.py
index 2ef0ffd..d959a56 100644
--- a/modules/libjuju/juju/client/_client.py
+++ b/modules/libjuju/juju/client/_client.py
@@ -1,10 +1,8 @@
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.
-from juju.client._definitions import * # noqa
-
from juju.client import _client1, _client2, _client3, _client4, _client5
-
+from juju.client._definitions import * # noqa
CLIENTS = {
"1": _client1,
@@ -43,7 +41,13 @@
@param connection: initialized Connection object.
"""
- version = connection.facades[cls.__name__[:-6]]
+ facade_name = cls.__name__
+ if not facade_name.endswith('Facade'):
+ raise TypeError('Unexpected class name: {}'.format(facade_name))
+ facade_name = facade_name[:-len('Facade')]
+ version = connection.facades.get(facade_name)
+ if version is None:
+ raise Exception('No facade {} in facades {}'.format(facade_name, connection.facades))
c = lookup_facade(cls.__name__, version)
c = c()
diff --git a/modules/libjuju/juju/client/_client1.py b/modules/libjuju/juju/client/_client1.py
index 3774056..e161973 100644
--- a/modules/libjuju/juju/client/_client1.py
+++ b/modules/libjuju/juju/client/_client1.py
@@ -1,8 +1,8 @@
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.
-from juju.client.facade import Type, ReturnMapping
from juju.client._definitions import *
+from juju.client.facade import ReturnMapping, Type
class AgentToolsFacade(Type):
@@ -6641,5 +6641,3 @@
_params['include-disabled'] = include_disabled
reply = await self.rpc(msg)
return reply
-
-
diff --git a/modules/libjuju/juju/client/_client2.py b/modules/libjuju/juju/client/_client2.py
index 283e803..6f92a86 100644
--- a/modules/libjuju/juju/client/_client2.py
+++ b/modules/libjuju/juju/client/_client2.py
@@ -1,8 +1,8 @@
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.
-from juju.client.facade import Type, ReturnMapping
from juju.client._definitions import *
+from juju.client.facade import ReturnMapping, Type
class ActionFacade(Type):
@@ -4223,5 +4223,3 @@
reply = await self.rpc(msg)
return reply
-
-
diff --git a/modules/libjuju/juju/client/_client3.py b/modules/libjuju/juju/client/_client3.py
index 3f9ef55..b5f4b9d 100644
--- a/modules/libjuju/juju/client/_client3.py
+++ b/modules/libjuju/juju/client/_client3.py
@@ -1,8 +1,8 @@
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.
-from juju.client.facade import Type, ReturnMapping
from juju.client._definitions import *
+from juju.client.facade import ReturnMapping, Type
class ApplicationFacade(Type):
@@ -5216,5 +5216,3 @@
_params['entities'] = entities
reply = await self.rpc(msg)
return reply
-
-
diff --git a/modules/libjuju/juju/client/_client4.py b/modules/libjuju/juju/client/_client4.py
index 68ee3f9..9c47561 100644
--- a/modules/libjuju/juju/client/_client4.py
+++ b/modules/libjuju/juju/client/_client4.py
@@ -1,8 +1,8 @@
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.
-from juju.client.facade import Type, ReturnMapping
from juju.client._definitions import *
+from juju.client.facade import ReturnMapping, Type
class ApplicationFacade(Type):
@@ -2667,5 +2667,3 @@
_params['entities'] = entities
reply = await self.rpc(msg)
return reply
-
-
diff --git a/modules/libjuju/juju/client/_client5.py b/modules/libjuju/juju/client/_client5.py
index 22805ed..f0f1282 100644
--- a/modules/libjuju/juju/client/_client5.py
+++ b/modules/libjuju/juju/client/_client5.py
@@ -1,8 +1,8 @@
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.
-from juju.client.facade import Type, ReturnMapping
from juju.client._definitions import *
+from juju.client.facade import ReturnMapping, Type
class ApplicationFacade(Type):
@@ -2650,5 +2650,3 @@
_params['entities'] = entities
reply = await self.rpc(msg)
return reply
-
-
diff --git a/modules/libjuju/juju/client/_definitions.py b/modules/libjuju/juju/client/_definitions.py
index 198784d..fde035f 100644
--- a/modules/libjuju/juju/client/_definitions.py
+++ b/modules/libjuju/juju/client/_definitions.py
@@ -1,7 +1,7 @@
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.
-from juju.client.facade import Type, ReturnMapping
+from juju.client.facade import ReturnMapping, Type
class APIHostPortsResult(Type):
@@ -8018,5 +8018,3 @@
results : typing.Sequence<+T_co>[~ZoneResult]<~ZoneResult>
'''
self.results = [ZoneResult.from_json(o) for o in results or []]
-
-
diff --git a/modules/libjuju/juju/client/client.py b/modules/libjuju/juju/client/client.py
index 2f3e49d..2721d07 100644
--- a/modules/libjuju/juju/client/client.py
+++ b/modules/libjuju/juju/client/client.py
@@ -1,8 +1,7 @@
'''Replace auto-generated classes with our own, where necessary.
'''
-from . import _client, _definitions, overrides
-
+from . import _client, _definitions, overrides # isort:skip
for o in overrides.__all__:
if "Facade" not in o:
@@ -31,4 +30,4 @@
if not a.startswith('_'):
setattr(c_type, a, getattr(o_type, a))
-from ._client import * # noqa
+from ._client import * # noqa, isort:skip
diff --git a/modules/libjuju/juju/client/connection.py b/modules/libjuju/juju/client/connection.py
index c09468c..bdd1c3f 100644
--- a/modules/libjuju/juju/client/connection.py
+++ b/modules/libjuju/juju/client/connection.py
@@ -1,28 +1,21 @@
+import asyncio
import base64
-import io
import json
import logging
-import os
-import random
-import shlex
import ssl
-import string
-import subprocess
+import urllib.request
import weakref
-import websockets
from concurrent.futures import CancelledError
from http.client import HTTPSConnection
-from pathlib import Path
-import asyncio
-import yaml
-
-from juju import tag, utils
+import macaroonbakery.httpbakery as httpbakery
+import macaroonbakery.bakery as bakery
+import websockets
+from juju import errors, tag, utils
from juju.client import client
-from juju.errors import JujuError, JujuAPIError, JujuConnectionError
from juju.utils import IdQueue
-log = logging.getLogger("websocket")
+log = logging.getLogger('juju.client.connection')
class Monitor:
@@ -30,7 +23,7 @@
Monitor helper class for our Connection class.
Contains a reference to an instantiated Connection, along with a
- reference to the Connection.receiver Future. Upon inspecttion of
+ reference to the Connection.receiver Future. Upon inspection of
these objects, this class determines whether the connection is in
an 'error', 'connected' or 'disconnected' state.
@@ -48,10 +41,6 @@
self.connection = weakref.ref(connection)
self.reconnecting = asyncio.Lock(loop=connection.loop)
self.close_called = asyncio.Event(loop=connection.loop)
- self.receiver_stopped = asyncio.Event(loop=connection.loop)
- self.pinger_stopped = asyncio.Event(loop=connection.loop)
- self.receiver_stopped.set()
- self.pinger_stopped.set()
@property
def status(self):
@@ -81,7 +70,8 @@
return self.DISCONNECTING
# connection closed uncleanly (we didn't call connection.close)
- if self.receiver_stopped.is_set() or not connection.ws.open:
+ stopped = connection._receiver_task.stopped.is_set()
+ if stopped or not connection.ws.open:
return self.ERROR
# everything is fine!
@@ -96,47 +86,91 @@
client = await Connection.connect(
api_endpoint, model_uuid, username, password, cacert)
- # Connect using a controller/model name
- client = await Connection.connect_model('local.local:default')
-
- # Connect to the currently active model
- client = await Connection.connect_current()
-
Note: Any connection method or constructor can accept an optional `loop`
argument to override the default event loop from `asyncio.get_event_loop`.
"""
- DEFAULT_FRAME_SIZE = 'default_frame_size'
MAX_FRAME_SIZE = 2**22
"Maximum size for a single frame. Defaults to 4MB."
- def __init__(
- self, endpoint, uuid, username, password, cacert=None,
- macaroons=None, loop=None, max_frame_size=DEFAULT_FRAME_SIZE):
- self.endpoint = endpoint
- self._endpoint = endpoint
+ @classmethod
+ async def connect(
+ cls,
+ endpoint=None,
+ uuid=None,
+ username=None,
+ password=None,
+ cacert=None,
+ bakery_client=None,
+ loop=None,
+ max_frame_size=None,
+ ):
+ """Connect to the websocket.
+
+ If uuid is None, the connection will be to the controller. Otherwise it
+ will be to the model.
+ :param str endpoint The hostname:port of the controller to connect to.
+ :param str uuid The model UUID to connect to (None for a
+ controller-only connection).
+ :param str username The username for controller-local users (or None
+ to use macaroon-based login.)
+ :param str password The password for controller-local users.
+ :param str cacert The CA certificate of the controller (PEM formatted).
+ :param httpbakery.Client bakery_client The macaroon bakery client to
+ to use when performing macaroon-based login. Macaroon tokens
+ acquired when logging will be saved to bakery_client.cookies.
+ If this is None, a default bakery_client will be used.
+ :param loop asyncio.BaseEventLoop The event loop to use for async
+ operations.
+ :param max_frame_size The maximum websocket frame size to allow.
+ """
+ self = cls()
+ if endpoint is None:
+ raise ValueError('no endpoint provided')
self.uuid = uuid
- if macaroons:
- self.macaroons = macaroons
- self.username = ''
- self.password = ''
- else:
- self.macaroons = []
- self.username = username
- self.password = password
- self.cacert = cacert
- self._cacert = cacert
+ if bakery_client is None:
+ bakery_client = httpbakery.Client()
+ self.bakery_client = bakery_client
+ if username and '@' in username and not username.endswith('@local'):
+ # We're trying to log in as an external user - we need to use
+ # macaroon authentication with no username or password.
+ if password is not None:
+ raise errors.JujuAuthError('cannot log in as external '
+ 'user with a password')
+ username = None
+ self.usertag = tag.user(username)
+ self.password = password
self.loop = loop or asyncio.get_event_loop()
self.__request_id__ = 0
+
+ # The following instance variables are initialized by the
+ # _connect_with_redirect method, but create them here
+ # as a reminder that they will exist.
self.addr = None
self.ws = None
+ self.endpoint = None
+ self.cacert = None
+ self.info = None
+
+ # Create that _Task objects but don't start the tasks yet.
+ self._pinger_task = _Task(self._pinger, self.loop)
+ self._receiver_task = _Task(self._receiver, self.loop)
+
self.facades = {}
self.messages = IdQueue(loop=self.loop)
self.monitor = Monitor(connection=self)
- if max_frame_size is self.DEFAULT_FRAME_SIZE:
+ if max_frame_size is None:
max_frame_size = self.MAX_FRAME_SIZE
self.max_frame_size = max_frame_size
+ await self._connect_with_redirect([(endpoint, cacert)])
+ return self
+
+ @property
+ def username(self):
+ if not self.usertag:
+ return None
+ return self.usertag[len('user-'):]
@property
def is_open(self):
@@ -146,39 +180,34 @@
return ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
- async def open(self):
+ async def _open(self, endpoint, cacert):
if self.uuid:
- url = "wss://{}/model/{}/api".format(self.endpoint, self.uuid)
+ url = "wss://{}/model/{}/api".format(endpoint, self.uuid)
else:
- url = "wss://{}/api".format(self.endpoint)
+ url = "wss://{}/api".format(endpoint)
- kw = dict()
- kw['ssl'] = self._get_ssl(self.cacert)
- kw['loop'] = self.loop
- kw['max_size'] = self.max_frame_size
- self.addr = url
- self.ws = await websockets.connect(url, **kw)
- self.loop.create_task(self.receiver())
- self.monitor.receiver_stopped.clear()
- log.info("Driver connected to juju %s", url)
- self.monitor.close_called.clear()
- return self
+ return (await websockets.connect(
+ url,
+ ssl=self._get_ssl(cacert),
+ loop=self.loop,
+ max_size=self.max_frame_size,
+ ), url, endpoint, cacert)
async def close(self):
if not self.ws:
return
self.monitor.close_called.set()
- await self.monitor.pinger_stopped.wait()
- await self.monitor.receiver_stopped.wait()
+ await self._pinger_task.stopped.wait()
+ await self._receiver_task.stopped.wait()
await self.ws.close()
self.ws = None
- async def recv(self, request_id):
+ async def _recv(self, request_id):
if not self.is_open:
raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
return await self.messages.get(request_id)
- async def receiver(self):
+ async def _receiver(self):
try:
while self.is_open:
result = await utils.run_with_interrupt(
@@ -205,10 +234,8 @@
# make pending listeners aware of the error
await self.messages.put_all(e)
raise
- finally:
- self.monitor.receiver_stopped.set()
- async def pinger(self):
+ async def _pinger(self):
'''
A Controller can time us out if we are silent for too long. This
is especially true in JaaS, which has a fairly strict timeout.
@@ -232,11 +259,21 @@
loop=self.loop)
if self.monitor.close_called.is_set():
break
- finally:
- self.monitor.pinger_stopped.set()
- return
+ except websockets.exceptions.ConnectionClosed:
+ # The connection has closed - we can't do anything
+ # more until the connection is restarted.
+ log.debug('ping failed because of closed connection')
+ pass
async def rpc(self, msg, encoder=None):
+ '''Make an RPC to the API. The message is encoded as JSON
+ using the given encoder if any.
+ :param msg: Parameters for the call (will be encoded as JSON).
+ :param encoder: Encoder to be used when encoding the message.
+ :return: The result of the call.
+ :raises JujuAPIError: When there's an error returned.
+ :raises JujuError:
+ '''
self.__request_id__ += 1
msg['request-id'] = self.__request_id__
if'params' not in msg:
@@ -244,7 +281,12 @@
if "version" not in msg:
msg['version'] = self.facades[msg['type']]
outgoing = json.dumps(msg, indent=2, cls=encoder)
+ log.debug('connection {} -> {}'.format(id(self), outgoing))
for attempt in range(3):
+ if self.monitor.status == Monitor.DISCONNECTED:
+ # closed cleanly; shouldn't try to reconnect
+ raise websockets.exceptions.ConnectionClosed(
+ 0, 'websocket closed')
try:
await self.ws.send(outgoing)
break
@@ -257,14 +299,19 @@
# be cancelled when the pinger is cancelled by the reconnect,
# and we don't want the reconnect to be aborted halfway through
await asyncio.wait([self.reconnect()], loop=self.loop)
- result = await self.recv(msg['request-id'])
+ if self.monitor.status != Monitor.CONNECTED:
+ # reconnect failed; abort and shutdown
+ log.error('RPC: Automatic reconnect failed')
+ raise
+ result = await self._recv(msg['request-id'])
+ log.debug('connection {} <- {}'.format(id(self), result))
if not result:
return result
if 'error' in result:
# API Error Response
- raise JujuAPIError(result)
+ raise errors.JujuAPIError(result)
if 'response' not in result:
# This may never happen
@@ -272,30 +319,34 @@
if 'results' in result['response']:
# Check for errors in a result list.
- errors = []
+ # TODO This loses the results that might have succeeded.
+ # Perhaps JujuError should return all the results including
+ # errors, or perhaps a keyword parameter to the rpc method
+ # could be added to trigger this behaviour.
+ err_results = []
for res in result['response']['results']:
if res.get('error', {}).get('message'):
- errors.append(res['error']['message'])
- if errors:
- raise JujuError(errors)
+ err_results.append(res['error']['message'])
+ if err_results:
+ raise errors.JujuError(err_results)
elif result['response'].get('error', {}).get('message'):
- raise JujuError(result['response']['error']['message'])
+ raise errors.JujuError(result['response']['error']['message'])
return result
- def http_headers(self):
+ def _http_headers(self):
"""Return dictionary of http headers necessary for making an http
connection to the endpoint of this Connection.
:return: Dictionary of headers
"""
- if not self.username:
+ if not self.usertag:
return {}
creds = u'{}:{}'.format(
- tag.user(self.username),
+ self.usertag,
self.password or ''
)
token = base64.b64encode(creds.encode())
@@ -328,70 +379,46 @@
"/model/{}".format(self.uuid)
if self.uuid else ""
)
- return conn, self.http_headers(), path
+ return conn, self._http_headers(), path
async def clone(self):
"""Return a new Connection, connected to the same websocket endpoint
as this one.
"""
- return await Connection.connect(
- self.endpoint,
- self.uuid,
- self.username,
- self.password,
- self.cacert,
- self.macaroons,
- self.loop,
- self.max_frame_size,
- )
+ return await Connection.connect(**self.connect_params())
+
+ def connect_params(self):
+ """Return a tuple of parameters suitable for passing to
+ Connection.connect that can be used to make a new connection
+ to the same controller (and model if specified. The first
+ element in the returned tuple holds the endpoint argument;
+ the other holds a dict of the keyword args.
+ """
+ return {
+ 'endpoint': self.endpoint,
+ 'uuid': self.uuid,
+ 'username': self.username,
+ 'password': self.password,
+ 'cacert': self.cacert,
+ 'bakery_client': self.bakery_client,
+ 'loop': self.loop,
+ 'max_frame_size': self.max_frame_size,
+ }
async def controller(self):
"""Return a Connection to the controller at self.endpoint
-
"""
return await Connection.connect(
self.endpoint,
- None,
- self.username,
- self.password,
- self.cacert,
- self.macaroons,
- self.loop,
+ username=self.username,
+ password=self.password,
+ cacert=self.cacert,
+ bakery_client=self.bakery_client,
+ loop=self.loop,
+ max_frame_size=self.max_frame_size,
)
- async def _try_endpoint(self, endpoint, cacert):
- success = False
- result = None
- new_endpoints = []
-
- self.endpoint = endpoint
- self.cacert = cacert
- await self.open()
- try:
- result = await self.login()
- if 'discharge-required-error' in result['response']:
- log.info('Macaroon discharge required, disconnecting')
- else:
- # successful login!
- log.info('Authenticated')
- success = True
- except JujuAPIError as e:
- if e.error_code != 'redirection required':
- raise
- log.info('Controller requested redirect')
- redirect_info = await self.redirect_info()
- redir_cacert = redirect_info['ca-cert']
- new_endpoints = [
- ("{value}:{port}".format(**s), redir_cacert)
- for servers in redirect_info['servers']
- for s in servers if s["scope"] == 'public'
- ]
- finally:
- if not success:
- await self.close()
- return success, result, new_endpoints
-
async def reconnect(self):
""" Force a reconnection.
"""
@@ -400,256 +427,149 @@
return
async with monitor.reconnecting:
await self.close()
- await self._connect()
+ await self._connect_with_login([(self.endpoint, self.cacert)])
- async def _connect(self):
- endpoints = [(self._endpoint, self._cacert)]
- while endpoints:
- _endpoint, _cacert = endpoints.pop(0)
- success, result, new_endpoints = await self._try_endpoint(
- _endpoint, _cacert)
- if success:
+ async def _connect(self, endpoints):
+ if len(endpoints) == 0:
+ raise errors.JujuConnectionError('no endpoints to connect to')
+
+ async def _try_endpoint(endpoint, cacert, delay):
+ if delay:
+ await asyncio.sleep(delay)
+ return await self._open(endpoint, cacert)
+
+ # Try all endpoints in parallel, with slight increasing delay (+100ms
+ # for each subsequent endpoint); the delay allows us to prefer the
+ # earlier endpoints over the latter. Use first successful connection.
+ tasks = [self.loop.create_task(_try_endpoint(endpoint, cacert,
+ 0.1 * i))
+ for i, (endpoint, cacert) in enumerate(endpoints)]
+ for task in asyncio.as_completed(tasks, loop=self.loop):
+ try:
+ result = await task
break
- endpoints.extend(new_endpoints)
+ except ConnectionError:
+ continue # ignore; try another endpoint
else:
- # ran out of endpoints without a successful login
- raise JujuConnectionError("Couldn't authenticate to {}".format(
- self._endpoint))
+ raise errors.JujuConnectionError(
+ 'Unable to connect to any endpoint: {}'.format(', '.join([
+ endpoint for endpoint, cacert in endpoints])))
+ for task in tasks:
+ task.cancel()
+ self.ws = result[0]
+ self.addr = result[1]
+ self.endpoint = result[2]
+ self.cacert = result[3]
+ self._receiver_task.start()
+ log.info("Driver connected to juju %s", self.addr)
+ self.monitor.close_called.clear()
- response = result['response']
- self.info = response.copy()
- self.build_facades(response.get('facades', {}))
- self.loop.create_task(self.pinger())
- self.monitor.pinger_stopped.clear()
-
- @classmethod
- async def connect(
- cls, endpoint, uuid, username, password, cacert=None,
- macaroons=None, loop=None, max_frame_size=None):
+ async def _connect_with_login(self, endpoints):
"""Connect to the websocket.
If uuid is None, the connection will be to the controller. Otherwise it
will be to the model.
-
+ :return: The response field of login response JSON object.
"""
- client = cls(endpoint, uuid, username, password, cacert, macaroons,
- loop, max_frame_size)
- await client._connect()
- return client
+ success = False
+ try:
+ await self._connect(endpoints)
+ # It's possible that we may get several discharge-required errors,
+ # corresponding to different levels of authentication, so retry
+ # a few times.
+ for i in range(0, 2):
+ result = (await self.login())['response']
+ macaroonJSON = result.get('discharge-required')
+ if macaroonJSON is None:
+ self.info = result
+ success = True
+ return result
+ macaroon = bakery.Macaroon.from_dict(macaroonJSON)
+ self.bakery_client.handle_error(
+ httpbakery.Error(
+ code=httpbakery.ERR_DISCHARGE_REQUIRED,
+ message=result.get('discharge-required-error'),
+ version=macaroon.version,
+ info=httpbakery.ErrorInfo(
+ macaroon=macaroon,
+ macaroon_path=result.get('macaroon-path'),
+ ),
+ ),
+ # note: remove the port number.
+ 'https://' + self.endpoint + '/',
+ )
+ raise errors.JujuAuthError('failed to authenticate '
+ 'after several attempts')
+ finally:
+ if not success:
+ await self.close()
- @classmethod
- async def connect_current(cls, loop=None, max_frame_size=None):
- """Connect to the currently active model.
+ async def _connect_with_redirect(self, endpoints):
+ try:
+ login_result = await self._connect_with_login(endpoints)
+ except errors.JujuRedirectException as e:
+ login_result = await self._connect_with_login(e.endpoints)
+ self._build_facades(login_result.get('facades', {}))
+ self._pinger_task.start()
- """
- jujudata = JujuData()
-
- controller_name = jujudata.current_controller()
- if not controller_name:
- raise JujuConnectionError('No current controller')
-
- model_name = jujudata.current_model()
-
- return await cls.connect_model(
- '{}:{}'.format(controller_name, model_name), loop, max_frame_size)
-
- @classmethod
- async def connect_current_controller(cls, loop=None, max_frame_size=None):
- """Connect to the currently active controller.
-
- """
- jujudata = JujuData()
- controller_name = jujudata.current_controller()
- if not controller_name:
- raise JujuConnectionError('No current controller')
-
- return await cls.connect_controller(controller_name, loop,
- max_frame_size)
-
- @classmethod
- async def connect_controller(cls, controller_name, loop=None,
- max_frame_size=None):
- """Connect to a controller by name.
-
- """
- jujudata = JujuData()
- controller = jujudata.controllers()[controller_name]
- endpoint = controller['api-endpoints'][0]
- cacert = controller.get('ca-cert')
- accounts = jujudata.accounts()[controller_name]
- username = accounts['user']
- password = accounts.get('password')
- macaroons = get_macaroons(controller_name) if not password else None
-
- return await cls.connect(
- endpoint, None, username, password, cacert, macaroons, loop,
- max_frame_size)
-
- @classmethod
- async def connect_model(cls, model, loop=None, max_frame_size=None):
- """Connect to a model by name.
-
- :param str model: [<controller>:]<model>
-
- """
- jujudata = JujuData()
-
- if ':' in model:
- # explicit controller given
- controller_name, model_name = model.split(':')
- else:
- # use the current controller if one isn't explicitly given
- controller_name = jujudata.current_controller()
- model_name = model
-
- accounts = jujudata.accounts()[controller_name]
- username = accounts['user']
- # model name must include a user prefix, so add it if it doesn't
- if '/' not in model_name:
- model_name = '{}/{}'.format(username, model_name)
-
- controller = jujudata.controllers()[controller_name]
- endpoint = controller['api-endpoints'][0]
- cacert = controller.get('ca-cert')
- password = accounts.get('password')
- models = jujudata.models()[controller_name]
- model_uuid = models['models'][model_name]['uuid']
- macaroons = get_macaroons(controller_name) if not password else None
-
- return await cls.connect(
- endpoint, model_uuid, username, password, cacert, macaroons, loop,
- max_frame_size)
-
- def build_facades(self, facades):
+ def _build_facades(self, facades):
self.facades.clear()
for facade in facades:
self.facades[facade['name']] = facade['versions'][-1]
async def login(self):
- username = self.username
- if username and not username.startswith('user-'):
- username = 'user-{}'.format(username)
+ params = {}
+ if self.password:
+ params['auth-tag'] = self.usertag
+ params['credentials'] = self.password
+ else:
+ macaroons = _macaroons_for_domain(self.bakery_client.cookies,
+ self.endpoint)
+ params['macaroons'] = [[bakery.macaroon_to_dict(m) for m in ms]
+ for ms in macaroons]
- result = await self.rpc({
- "type": "Admin",
- "request": "Login",
- "version": 3,
- "params": {
- "auth-tag": username,
- "credentials": self.password,
- "nonce": "".join(random.sample(string.printable, 12)),
- "macaroons": self.macaroons
- }})
- return result
-
- async def redirect_info(self):
try:
- result = await self.rpc({
+ return await self.rpc({
+ "type": "Admin",
+ "request": "Login",
+ "version": 3,
+ "params": params,
+ })
+ except errors.JujuAPIError as e:
+ if e.error_code != 'redirection required':
+ raise
+ log.info('Controller requested redirect')
+ # Fetch additional redirection information now so that
+ # we can safely close the connection after login
+ # fails.
+ redirect_info = (await self.rpc({
"type": "Admin",
"request": "RedirectInfo",
"version": 3,
- })
- except JujuAPIError as e:
- if e.message == 'not redirected':
- return None
- raise
- return result['response']
+ }))['response']
+ raise errors.JujuRedirectException(redirect_info) from e
-class JujuData:
- def __init__(self):
- self.path = os.environ.get('JUJU_DATA') or '~/.local/share/juju'
- self.path = os.path.abspath(os.path.expanduser(self.path))
+class _Task:
+ def __init__(self, task, loop):
+ self.stopped = asyncio.Event(loop=loop)
+ self.stopped.set()
+ self.task = task
+ self.loop = loop
- def current_controller(self):
- cmd = shlex.split('juju list-controllers --format yaml')
- output = subprocess.check_output(cmd)
- output = yaml.safe_load(output)
- return output.get('current-controller', '')
-
- def current_model(self, controller_name=None):
- if not controller_name:
- controller_name = self.current_controller()
- models = self.models()[controller_name]
- if 'current-model' not in models:
- raise JujuError('No current model')
- return models['current-model']
-
- def controllers(self):
- return self._load_yaml('controllers.yaml', 'controllers')
-
- def models(self):
- return self._load_yaml('models.yaml', 'controllers')
-
- def accounts(self):
- return self._load_yaml('accounts.yaml', 'controllers')
-
- def credentials(self):
- return self._load_yaml('credentials.yaml', 'credentials')
-
- def load_credential(self, cloud, name=None):
- """Load a local credential.
-
- :param str cloud: Name of cloud to load credentials from.
- :param str name: Name of credential. If None, the default credential
- will be used, if available.
- :returns: A CloudCredential instance, or None.
- """
- try:
- cloud = tag.untag('cloud-', cloud)
- creds_data = self.credentials()[cloud]
- if not name:
- default_credential = creds_data.pop('default-credential', None)
- default_region = creds_data.pop('default-region', None) # noqa
- if default_credential:
- name = creds_data['default-credential']
- elif len(creds_data) == 1:
- name = list(creds_data)[0]
- else:
- return None, None
- cred_data = creds_data[name]
- auth_type = cred_data.pop('auth-type')
- return name, client.CloudCredential(
- auth_type=auth_type,
- attrs=cred_data,
- )
- except (KeyError, FileNotFoundError):
- return None, None
-
- def _load_yaml(self, filename, key):
- filepath = os.path.join(self.path, filename)
- with io.open(filepath, 'rt') as f:
- return yaml.safe_load(f)[key]
-
-
-def get_macaroons(controller_name=None):
- """Decode and return macaroons from default ~/.go-cookies
-
- """
- cookie_files = []
- if controller_name:
- cookie_files.append('~/.local/share/juju/cookies/{}.json'.format(
- controller_name))
- cookie_files.append('~/.go-cookies')
- for cookie_file in cookie_files:
- cookie_file = Path(cookie_file).expanduser()
- if cookie_file.exists():
+ def start(self):
+ async def run():
try:
- cookies = json.loads(cookie_file.read_text())
- break
- except (OSError, ValueError):
- log.warn("Couldn't load macaroons from %s", cookie_file)
- return []
- else:
- log.warn("Couldn't load macaroons from %s", ' or '.join(cookie_files))
- return []
+ return await(self.task())
+ finally:
+ self.stopped.set()
+ self.stopped.clear()
+ self.loop.create_task(run())
- base64_macaroons = [
- c['Value'] for c in cookies
- if c['Name'].startswith('macaroon-') and c['Value']
- ]
- return [
- json.loads(base64.b64decode(value).decode('utf-8'))
- for value in base64_macaroons
- ]
+def _macaroons_for_domain(cookies, domain):
+ '''Return any macaroons from the given cookie jar that
+ apply to the given domain name.'''
+ req = urllib.request.Request('https://' + domain + '/')
+ cookies.add_cookie_header(req)
+ return httpbakery.extract_macaroons(req)
diff --git a/modules/libjuju/juju/client/connector.py b/modules/libjuju/juju/client/connector.py
new file mode 100644
index 0000000..64fbe44
--- /dev/null
+++ b/modules/libjuju/juju/client/connector.py
@@ -0,0 +1,147 @@
+import asyncio
+import logging
+import copy
+
+import macaroonbakery.httpbakery as httpbakery
+from juju.client.connection import Connection
+from juju.client.jujudata import FileJujuData
+from juju.errors import JujuConnectionError, JujuError
+
+log = logging.getLogger('connector')
+
+
+class NoConnectionException(Exception):
+ '''Raised by Connector when the connection method is called
+ and there is no current connection.'''
+ pass
+
+
+class Connector:
+ '''This class abstracts out a reconnectable client that can connect
+ to controllers and models found in the Juju data files.
+ '''
+ def __init__(
+ self,
+ loop=None,
+ max_frame_size=None,
+ bakery_client=None,
+ jujudata=None,
+ ):
+ '''Initialize a connector that will use the given parameters
+ by default when making a new connection'''
+ self.max_frame_size = max_frame_size
+ self.loop = loop or asyncio.get_event_loop()
+ self.bakery_client = bakery_client
+ self._connection = None
+ self.controller_name = None
+ self.model_name = None
+ self.jujudata = jujudata or FileJujuData()
+
+ def is_connected(self):
+ '''Report whether there is a currently connected controller or not'''
+ return self._connection is not None
+
+ def connection(self):
+ '''Return the current connection; raises an exception if there
+ is no current connection.'''
+ if not self.is_connected():
+ raise NoConnectionException('not connected')
+ return self._connection
+
+ async def connect(self, **kwargs):
+ """Connect to an arbitrary Juju model.
+
+ kwargs are passed through to Connection.connect()
+ """
+ kwargs.setdefault('loop', self.loop)
+ kwargs.setdefault('max_frame_size', self.max_frame_size)
+ kwargs.setdefault('bakery_client', self.bakery_client)
+ self._connection = await Connection.connect(**kwargs)
+
+ async def disconnect(self):
+ """Shut down the watcher task and close websockets.
+ """
+ if self._connection:
+ log.debug('Closing model connection')
+ await self._connection.close()
+ self._connection = None
+
+ async def connect_controller(self, controller_name=None):
+ """Connect to a controller by name. If the name is empty, it
+ connect to the current controller.
+ """
+ if not controller_name:
+ controller_name = self.jujudata.current_controller()
+ if not controller_name:
+ raise JujuConnectionError('No current controller')
+
+ controller = self.jujudata.controllers()[controller_name]
+ # TODO change Connection so we can pass all the endpoints
+ # instead of just the first.
+ endpoint = controller['api-endpoints'][0]
+ accounts = self.jujudata.accounts().get(controller_name, {})
+
+ await self.connect(
+ endpoint=endpoint,
+ uuid=None,
+ username=accounts.get('user'),
+ password=accounts.get('password'),
+ cacert=controller.get('ca-cert'),
+ bakery_client=self.bakery_client_for_controller(controller_name),
+ )
+ self.controller_name = controller_name
+
+ async def connect_model(self, model_name=None):
+ """Connect to a model by name. If either controller or model
+ parts of the name are empty, the current controller and/or model
+ will be used.
+
+ :param str model: <controller>:<model>
+ """
+
+ try:
+ controller_name, model_name = self.jujudata.parse_model(model_name)
+ controller = self.jujudata.controllers().get(controller_name)
+ except JujuError as e:
+ raise JujuConnectionError(e.message) from e
+ if controller is None:
+ raise JujuConnectionError('Controller {} not found'.format(
+ controller_name))
+ # TODO change Connection so we can pass all the endpoints
+ # instead of just the first one.
+ endpoint = controller['api-endpoints'][0]
+ account = self.jujudata.accounts().get(controller_name, {})
+ models = self.jujudata.models().get(controller_name, {}).get('models',
+ {})
+ if model_name not in models:
+ raise JujuConnectionError('Model not found: {}'.format(model_name))
+
+ # TODO if there's no record for the required model name, connect
+ # to the controller to find out the model's uuid, then connect
+ # to that. This will let connect_model work with models that
+ # haven't necessarily synced with the local juju data,
+ # and also remove the need for base.CleanModel to
+ # subclass JujuData.
+ await self.connect(
+ endpoint=endpoint,
+ uuid=models[model_name]['uuid'],
+ username=account.get('user'),
+ password=account.get('password'),
+ cacert=controller.get('ca-cert'),
+ bakery_client=self.bakery_client_for_controller(controller_name),
+ )
+ self.controller_name = controller_name
+ self.model_name = controller_name + ':' + model_name
+
+ def bakery_client_for_controller(self, controller_name):
+ '''Make a copy of the bakery client with a the appropriate controller's
+ cookiejar in it.
+ '''
+ bakery_client = self.bakery_client
+ if bakery_client:
+ bakery_client = copy.copy(bakery_client)
+ else:
+ bakery_client = httpbakery.Client()
+ bakery_client.cookies = self.jujudata.cookies_for_controller(
+ controller_name)
+ return bakery_client
diff --git a/modules/libjuju/juju/client/facade.py b/modules/libjuju/juju/client/facade.py
index c015c5f..1c7baa0 100644
--- a/modules/libjuju/juju/client/facade.py
+++ b/modules/libjuju/juju/client/facade.py
@@ -1,16 +1,16 @@
import argparse
import builtins
-from collections import defaultdict
import functools
-from glob import glob
import json
import keyword
-from pathlib import Path
import pprint
import re
import textwrap
-from typing import Sequence, Mapping, TypeVar, Any, Union
import typing
+from collections import defaultdict
+from glob import glob
+from pathlib import Path
+from typing import Any, Mapping, Sequence, TypeVar, Union
from . import codegen
diff --git a/modules/libjuju/juju/client/gocookies.py b/modules/libjuju/juju/client/gocookies.py
new file mode 100644
index 0000000..a8a0df8
--- /dev/null
+++ b/modules/libjuju/juju/client/gocookies.py
@@ -0,0 +1,102 @@
+import datetime
+import http.cookiejar as cookiejar
+import json
+import time
+
+import pyrfc3339
+
+
+class GoCookieJar(cookiejar.FileCookieJar):
+ '''A CookieJar implementation that reads and writes cookies
+ to the cookiejar format as understood by the Go package
+ github.com/juju/persistent-cookiejar.'''
+ def _really_load(self, f, filename, ignore_discard, ignore_expires):
+ '''Implement the _really_load method called by FileCookieJar
+ to implement the actual cookie loading'''
+ data = json.load(f) or []
+ now = time.time()
+ for cookie in map(_new_py_cookie, data):
+ if not ignore_expires and cookie.is_expired(now):
+ continue
+ self.set_cookie(cookie)
+
+ def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ '''Implement the FileCookieJar abstract method.'''
+ if filename is None:
+ if self.filename is not None:
+ filename = self.filename
+ else:
+ raise ValueError(cookiejar.MISSING_FILENAME_TEXT)
+
+ # TODO: obtain file lock, read contents of file, and merge with
+ # current content.
+ go_cookies = []
+ now = time.time()
+ for cookie in self:
+ if not ignore_discard and cookie.discard:
+ continue
+ if not ignore_expires and cookie.is_expired(now):
+ continue
+ go_cookies.append(_new_go_cookie(cookie))
+ with open(filename, "w") as f:
+ f.write(json.dumps(go_cookies))
+
+
+def _new_py_cookie(go_cookie):
+ '''Convert a Go-style JSON-unmarshaled cookie into a Python cookie'''
+ expires = None
+ if go_cookie.get('Expires') is not None:
+ t = pyrfc3339.parse(go_cookie['Expires'])
+ expires = t.timestamp()
+ return cookiejar.Cookie(
+ version=0,
+ name=go_cookie['Name'],
+ value=go_cookie['Value'],
+ port=None,
+ port_specified=False,
+ # Unfortunately Python cookies don't record the original
+ # host that the cookie came from, so we'll just use Domain
+ # for that purpose, and record that the domain was specified,
+ # even though it probably was not. This means that
+ # we won't correctly record the CanonicalHost entry
+ # when writing the cookie file after reading it.
+ domain=go_cookie['Domain'],
+ domain_specified=not go_cookie['HostOnly'],
+ domain_initial_dot=False,
+ path=go_cookie['Path'],
+ path_specified=True,
+ secure=go_cookie['Secure'],
+ expires=expires,
+ discard=False,
+ comment=None,
+ comment_url=None,
+ rest=None,
+ rfc2109=False,
+ )
+
+
+def _new_go_cookie(py_cookie):
+ '''Convert a python cookie to the JSON-marshalable Go-style cookie form.'''
+ # TODO (perhaps):
+ # HttpOnly
+ # Creation
+ # LastAccess
+ # Updated
+ # not done properly: CanonicalHost.
+ go_cookie = {
+ 'Name': py_cookie.name,
+ 'Value': py_cookie.value,
+ 'Domain': py_cookie.domain,
+ 'HostOnly': not py_cookie.domain_specified,
+ 'Persistent': not py_cookie.discard,
+ 'Secure': py_cookie.secure,
+ 'CanonicalHost': py_cookie.domain,
+ }
+ if py_cookie.path_specified:
+ go_cookie['Path'] = py_cookie.path
+ if py_cookie.expires is not None:
+ unix_time = datetime.datetime.fromtimestamp(py_cookie.expires)
+ # Note: fromtimestamp bizarrely produces a time without
+ # a time zone, so we need to use accept_naive.
+ go_cookie['Expires'] = pyrfc3339.generate(unix_time, accept_naive=True)
+ return go_cookie
diff --git a/modules/libjuju/juju/client/jujudata.py b/modules/libjuju/juju/client/jujudata.py
new file mode 100644
index 0000000..8b844c2
--- /dev/null
+++ b/modules/libjuju/juju/client/jujudata.py
@@ -0,0 +1,219 @@
+import abc
+import io
+import os
+import pathlib
+
+import juju.client.client as jujuclient
+import yaml
+from juju import tag
+from juju.client.gocookies import GoCookieJar
+from juju.errors import JujuError
+
+
+class NoModelException(Exception):
+ pass
+
+
+class JujuData:
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def current_controller(self):
+ '''Return the current controller name'''
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def controllers(self):
+ '''Return all the currently known controllers as a dict
+ mapping controller name to a dict containing the
+ following string keys:
+ uuid: The UUID of the controller
+ api-endpoints: A list of host:port addresses for the controller.
+ ca-cert: the PEM-encoded CA cert of the controller (optional)
+
+ This is compatible with the "controllers" entry in the YAML-unmarshaled data
+ stored in ~/.local/share/juju/controllers.yaml.
+ '''
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def models(self):
+ '''Return all the currently known models as a dict
+ containing a key for each known controller,
+ each holding a dict value containing an optional "current-model"
+ key (the name of the current model for that controller,
+ if there is one), and a dict mapping fully-qualified
+ model names to a dict containing a "uuid" key with the
+ key for that model.
+ This is compatible with the YAML-unmarshaled data
+ stored in ~/.local/share/juju/models.yaml.
+ '''
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def accounts(self):
+ '''Return the currently known accounts, as a dict
+ containing a key for each known controller, with
+ each value holding a dict with the following keys:
+
+ user: The username to use when logging into the controller (str)
+ password: The password to use when logging into the controller (str, optional)
+ '''
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cookies_for_controller(self, controller_name):
+ '''Return the cookie jar to use when connecting to the
+ controller with the given name.
+ :return http.cookiejar.CookieJar
+ '''
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def current_model(self, controller_name=None, model_only=False):
+ '''Return the current model, qualified by its controller name.
+ If controller_name is specified, the current model for
+ that controller will be returned.
+ If model_only is true, only the model name, not qualified by
+ its controller name, will be returned.
+ '''
+ raise NotImplementedError()
+
+ def parse_model(self, model):
+ """Split the given model_name into controller and model parts.
+ If the controller part is empty, the current controller will be used.
+ If the model part is empty, the current model will be used for
+ the controller.
+ The returned model name will always be qualified with a username.
+ :param model str: The model name to parse.
+ :return (str, str): The controller and model names.
+ """
+ # TODO if model is empty, use $JUJU_MODEL environment variable.
+ if model and ':' in model:
+ # explicit controller given
+ controller_name, model_name = model.split(':')
+ else:
+ # use the current controller if one isn't explicitly given
+ controller_name = self.current_controller()
+ model_name = model
+ if not controller_name:
+ controller_name = self.current_controller()
+ if not model_name:
+ model_name = self.current_model(controller_name, model_only=True)
+ if not model_name:
+ raise NoModelException('no current model')
+
+ if '/' not in model_name:
+ # model name doesn't include a user prefix, so add one
+ # by using the current user for the controller.
+ accounts = self.accounts().get(controller_name)
+ if accounts is None:
+ raise JujuError('No account found for controller {} '.format(controller_name))
+ username = accounts.get('user')
+ if username is None:
+ raise JujuError('No username found for controller {}'.format(controller_name))
+ model_name = username + "/" + model_name
+
+ return controller_name, model_name
+
+
+class FileJujuData(JujuData):
+ '''Provide access to the Juju client configuration files.
+ Any configuration file is read once and then cached.'''
+ def __init__(self):
+ self.path = os.environ.get('JUJU_DATA') or '~/.local/share/juju'
+ self.path = os.path.abspath(os.path.expanduser(self.path))
+ # _loaded keeps track of the loaded YAML from
+ # the Juju data files so we don't need to load the same
+ # file many times.
+ self._loaded = {}
+
+ def refresh(self):
+ '''Forget the cache of configuration file data'''
+ self._loaded = {}
+
+ def current_controller(self):
+ '''Return the current controller name'''
+ return self._load_yaml('controllers.yaml', 'current-controller')
+
+ def current_model(self, controller_name=None, model_only=False):
+ '''Return the current model, qualified by its controller name.
+ If controller_name is specified, the current model for
+ that controller will be returned.
+
+ If model_only is true, only the model name, not qualified by
+ its controller name, will be returned.
+ '''
+ # TODO respect JUJU_MODEL environment variable.
+ if not controller_name:
+ controller_name = self.current_controller()
+ if not controller_name:
+ raise JujuError('No current controller')
+ models = self.models()[controller_name]
+ if 'current-model' not in models:
+ return None
+ if model_only:
+ return models['current-model']
+ return controller_name + ':' + models['current-model']
+
+ def load_credential(self, cloud, name=None):
+ """Load a local credential.
+
+ :param str cloud: Name of cloud to load credentials from.
+ :param str name: Name of credential. If None, the default credential
+ will be used, if available.
+ :return: A CloudCredential instance, or None.
+ """
+ try:
+ cloud = tag.untag('cloud-', cloud)
+ creds_data = self.credentials()[cloud]
+ if not name:
+ default_credential = creds_data.pop('default-credential', None)
+ default_region = creds_data.pop('default-region', None) # noqa
+ if default_credential:
+ name = creds_data['default-credential']
+ elif len(creds_data) == 1:
+ name = list(creds_data)[0]
+ else:
+ return None, None
+ cred_data = creds_data[name]
+ auth_type = cred_data.pop('auth-type')
+ return name, jujuclient.CloudCredential(
+ auth_type=auth_type,
+ attrs=cred_data,
+ )
+ except (KeyError, FileNotFoundError):
+ return None, None
+
+ def controllers(self):
+ return self._load_yaml('controllers.yaml', 'controllers')
+
+ def models(self):
+ return self._load_yaml('models.yaml', 'controllers')
+
+ def accounts(self):
+ return self._load_yaml('accounts.yaml', 'controllers')
+
+ def credentials(self):
+ return self._load_yaml('credentials.yaml', 'credentials')
+
+ def _load_yaml(self, filename, key):
+ if filename in self._loaded:
+ # Data already exists in the cache.
+ return self._loaded[filename].get(key)
+ # TODO use the file lock like Juju does.
+ filepath = os.path.join(self.path, filename)
+ with io.open(filepath, 'rt') as f:
+ data = yaml.safe_load(f)
+ self._loaded[filename] = data
+ return data.get(key)
+
+ def cookies_for_controller(self, controller_name):
+ f = pathlib.Path(self.path) / 'cookies' / (controller_name + '.json')
+ if not f.exists():
+ f = pathlib.Path('~/.go-cookies').expanduser()
+ # TODO if neither cookie file exists, where should
+ # we create the cookies?
+ jar = GoCookieJar(str(f))
+ jar.load()
+ return jar
diff --git a/modules/libjuju/juju/client/overrides.py b/modules/libjuju/juju/client/overrides.py
index 5e98e56..8b29de7 100644
--- a/modules/libjuju/juju/client/overrides.py
+++ b/modules/libjuju/juju/client/overrides.py
@@ -1,10 +1,8 @@
-from collections import namedtuple
import re
+from collections import namedtuple
+from . import _client, _definitions
from .facade import ReturnMapping, Type, TypeEncoder
-from .import _client
-from .import _definitions
-
__all__ = [
'Delta',
diff --git a/modules/libjuju/juju/controller.py b/modules/libjuju/juju/controller.py
index 55ea55e..957ab85 100644
--- a/modules/libjuju/juju/controller.py
+++ b/modules/libjuju/juju/controller.py
@@ -1,69 +1,101 @@
import asyncio
import logging
-from . import errors
-from . import tag
-from . import utils
-from .client import client
-from .client import connection
-from .model import Model
+from . import errors, tag, utils
+from .client import client, connector
from .user import User
log = logging.getLogger(__name__)
-class Controller(object):
- def __init__(self, loop=None,
- max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE):
+class Controller:
+ def __init__(
+ self,
+ loop=None,
+ max_frame_size=None,
+ bakery_client=None,
+ jujudata=None,
+ ):
"""Instantiate a new Controller.
One of the connect_* methods will need to be called before this
object can be used for anything interesting.
+ If jujudata is None, jujudata.FileJujuData will be used.
+
:param loop: an asyncio event loop
-
+ :param max_frame_size: See
+ `juju.client.connection.Connection.MAX_FRAME_SIZE`
+ :param bakery_client httpbakery.Client: The bakery client to use
+ for macaroon authorization.
+ :param jujudata JujuData: The source for current controller information.
"""
- self.loop = loop or asyncio.get_event_loop()
- self.max_frame_size = None
- self.connection = None
- self.controller_name = None
+ self._connector = connector.Connector(
+ loop=loop,
+ max_frame_size=max_frame_size,
+ bakery_client=bakery_client,
+ jujudata=jujudata,
+ )
- async def connect(
- self, endpoint, username, password, cacert=None, macaroons=None):
- """Connect to an arbitrary Juju controller.
+ async def __aenter__(self):
+ await self.connect()
+ return self
+ async def __aexit__(self, exc_type, exc, tb):
+ await self.disconnect()
+
+ @property
+ def loop(self):
+ return self._connector.loop
+
+ async def connect(self, controller_name=None, **kwargs):
+ """Connect to a Juju controller.
+
+ If any arguments are specified other than controller_name,
+ then controller_name must be None and an explicit
+ connection will be made using Connection.connect
+ using those parameters (the 'uuid' parameter must
+ be absent or None).
+
+ Otherwise, if controller_name is None, connect to the
+ current controller.
+
+ Otherwise, controller_name must specify the name
+ of a known controller.
"""
- self.connection = await connection.Connection.connect(
- endpoint, None, username, password, cacert, macaroons,
- max_frame_size=self.max_frame_size)
+ await self.disconnect()
+ if not kwargs:
+ await self._connector.connect_controller(controller_name)
+ else:
+ if controller_name is not None:
+ raise ValueError('controller name may not be specified with other connect parameters')
+ if kwargs.get('uuid') is not None:
+ # A UUID implies a model connection, not a controller connection.
+ raise ValueError('model UUID specified when connecting to controller')
+ await self._connector.connect(**kwargs)
- async def connect_current(self):
- """Connect to the current Juju controller.
+ async def _connect_direct(self, **kwargs):
+ await self.disconnect()
+ await self._connector.connect(**kwargs)
- """
- jujudata = connection.JujuData()
- controller_name = jujudata.current_controller()
- if not controller_name:
- raise errors.JujuConnectionError('No current controller')
- return await self.connect_controller(controller_name)
+ def is_connected(self):
+ """Reports whether the Controller is currently connected."""
+ return self._connector.is_connected()
- async def connect_controller(self, controller_name):
- """Connect to a Juju controller by name.
+ def connection(self):
+ """Return the current Connection object. It raises an exception
+ if the Controller is disconnected"""
+ return self._connector.connection()
- """
- self.connection = (
- await connection.Connection.connect_controller(
- controller_name, max_frame_size=self.max_frame_size))
- self.controller_name = controller_name
+ @property
+ def controller_name(self):
+ return self._connector.controller_name
async def disconnect(self):
"""Shut down the watcher task and close websockets.
"""
- if self.connection and self.connection.is_open:
- log.debug('Closing controller connection')
- await self.connection.close()
- self.connection = None
+ await self._connector.disconnect()
async def add_credential(self, name=None, credential=None, cloud=None,
owner=None):
@@ -84,20 +116,19 @@
cloud = await self.get_cloud()
if not owner:
- owner = self.connection.info['user-info']['identity']
+ owner = self.connection().info['user-info']['identity']
if credential and not name:
raise errors.JujuError('Name must be provided for credential')
if not credential:
- name, credential = connection.JujuData().load_credential(cloud,
- name)
+ name, credential = self._connector.jujudata.load_credential(cloud, name)
if credential is None:
- raise errors.JujuError('Unable to find credential: '
- '{}'.format(name))
+ raise errors.JujuError(
+ 'Unable to find credential: {}'.format(name))
log.debug('Uploading credential %s', name)
- cloud_facade = client.CloudFacade.from_connection(self.connection)
+ cloud_facade = client.CloudFacade.from_connection(self.connection())
await cloud_facade.UpdateCredentials([
client.UpdateCloudCredential(
tag=tag.credential(cloud, tag.untag('user-', owner), name),
@@ -121,12 +152,12 @@
the current user.
:param dict config: Model configuration.
:param str region: Region in which to create the model.
-
+ :return Model: A connection to the newly created model.
"""
model_facade = client.ModelManagerFacade.from_connection(
- self.connection)
+ self.connection())
- owner = owner or self.connection.info['user-info']['identity']
+ owner = owner or self.connection().info['user-info']['identity']
cloud_name = cloud_name or await self.get_cloud()
try:
@@ -153,7 +184,7 @@
if not config or 'authorized-keys' not in config:
config = config or {}
config['authorized-keys'] = await utils.read_ssh_key(
- loop=self.loop)
+ loop=self._connector.loop)
model_info = await model_facade.CreateModel(
tag.cloud(cloud_name),
@@ -163,17 +194,11 @@
owner,
region
)
-
- model = Model()
- await model.connect(
- self.connection.endpoint,
- model_info.uuid,
- self.connection.username,
- self.connection.password,
- self.connection.cacert,
- self.connection.macaroons,
- loop=self.loop,
- )
+ from juju.model import Model
+ model = Model(jujudata=self._connector.jujudata)
+ kwargs = self.connection().connect_params()
+ kwargs['uuid'] = model_info.uuid
+ await model._connect_direct(**kwargs)
return model
@@ -183,12 +208,12 @@
:param str \*models: Names or UUIDs of models to destroy
"""
- uuids = await self._model_uuids()
+ uuids = await self.model_uuids()
models = [uuids[model] if model in uuids else model
for model in models]
model_facade = client.ModelManagerFacade.from_connection(
- self.connection)
+ self.connection())
log.debug(
'Destroying model%s %s',
@@ -212,7 +237,7 @@
"""
if not display_name:
display_name = username
- user_facade = client.UserManagerFacade.from_connection(self.connection)
+ user_facade = client.UserManagerFacade.from_connection(self.connection())
users = [client.AddUser(display_name=display_name,
username=username,
password=password)]
@@ -223,7 +248,7 @@
"""Remove a user from this controller.
"""
client_facade = client.UserManagerFacade.from_connection(
- self.connection)
+ self.connection())
user = tag.user(username)
await client_facade.RemoveUser([client.Entity(user)])
@@ -234,7 +259,7 @@
:param str password: New password
"""
- user_facade = client.UserManagerFacade.from_connection(self.connection)
+ user_facade = client.UserManagerFacade.from_connection(self.connection())
entity = client.EntityPassword(password, tag.user(username))
return await user_facade.SetPassword([entity])
@@ -246,7 +271,7 @@
"""
controller_facade = client.ControllerFacade.from_connection(
- self.connection)
+ self.connection())
return await controller_facade.DestroyController(destroy_all_models)
async def disable_user(self, username):
@@ -255,7 +280,7 @@
:param str username: Username
"""
- user_facade = client.UserManagerFacade.from_connection(self.connection)
+ user_facade = client.UserManagerFacade.from_connection(self.connection())
entity = client.Entity(tag.user(username))
return await user_facade.DisableUser([entity])
@@ -263,7 +288,7 @@
"""Re-enable a previously disabled user.
"""
- user_facade = client.UserManagerFacade.from_connection(self.connection)
+ user_facade = client.UserManagerFacade.from_connection(self.connection())
entity = client.Entity(tag.user(username))
return await user_facade.EnableUser([entity])
@@ -278,17 +303,35 @@
"""
Get the name of the cloud that this controller lives on.
"""
- cloud_facade = client.CloudFacade.from_connection(self.connection)
+ cloud_facade = client.CloudFacade.from_connection(self.connection())
result = await cloud_facade.Clouds()
cloud = list(result.clouds.keys())[0] # only lives on one cloud
return tag.untag('cloud-', cloud)
- async def _model_uuids(self, all_=False, username=None):
+ async def get_models(self, all_=False, username=None):
+ """
+ .. deprecated:: 0.7.0
+ Use :meth:`.list_models` instead.
+ """
controller_facade = client.ControllerFacade.from_connection(
self.connection)
for attempt in (1, 2, 3):
try:
+ return await controller_facade.AllModels()
+ except errors.JujuAPIError as e:
+ # retry concurrency error until resolved in Juju
+ # see: https://bugs.launchpad.net/juju/+bug/1721786
+ if 'has been removed' not in e.message or attempt == 3:
+ raise
+
+ async def model_uuids(self):
+ """Return a mapping of model names to UUIDs.
+ """
+ controller_facade = client.ControllerFacade.from_connection(
+ self.connection())
+ for attempt in (1, 2, 3):
+ try:
response = await controller_facade.AllModels()
return {um.model.name: um.model.uuid
for um in response.user_models}
@@ -297,17 +340,14 @@
# see: https://bugs.launchpad.net/juju/+bug/1721786
if 'has been removed' not in e.message or attempt == 3:
raise
- await asyncio.sleep(attempt, loop=self.loop)
+ await asyncio.sleep(attempt, loop=self._connector.loop)
- async def list_models(self, all_=False, username=None):
+ async def list_models(self):
"""Return list of names of the available models on this controller.
- :param bool all_: List all models, regardless of user accessibilty
- (admin use only)
- :param str username: User for which to list models (admin use only)
-
+ Equivalent to ``sorted((await self.model_uuids()).keys())``
"""
- uuids = await self._model_uuids(all_, username)
+ uuids = await self.model_uuids()
return sorted(uuids.keys())
def get_payloads(self, *patterns):
@@ -347,24 +387,19 @@
"""Get a model by name or UUID.
:param str model: Model name or UUID
-
+ :returns Model: Connected Model instance.
"""
- uuids = await self._model_uuids()
+ uuids = await self.model_uuids()
if model in uuids:
- name_or_uuid = uuids[model]
+ uuid = uuids[model]
else:
- name_or_uuid = model
+ uuid = model
+ from juju.model import Model
model = Model()
- await model.connect(
- self.connection.endpoint,
- name_or_uuid,
- self.connection.username,
- self.connection.password,
- self.connection.cacert,
- self.connection.macaroons,
- loop=self.loop,
- )
+ kwargs = self.connection().connect_params()
+ kwargs['uuid'] = uuid
+ await model._connect_direct(**kwargs)
return model
async def get_user(self, username):
@@ -374,7 +409,7 @@
:returns: A :class:`~juju.user.User` instance
"""
client_facade = client.UserManagerFacade.from_connection(
- self.connection)
+ self.connection())
user = tag.user(username)
args = [client.Entity(user)]
try:
@@ -396,32 +431,77 @@
:returns: A list of :class:`~juju.user.User` instances
"""
client_facade = client.UserManagerFacade.from_connection(
- self.connection)
+ self.connection())
response = await client_facade.UserInfo(None, include_disabled)
return [User(self, r.result) for r in response.results]
async def grant(self, username, acl='login'):
- """Set access level of the given user on the controller
-
+ """Grant access level of the given user on the controller.
+ Note that if the user already has higher permissions than the
+ provided ACL, this will do nothing (see revoke for a way to
+ remove permissions).
:param str username: Username
:param str acl: Access control ('login', 'add-model' or 'superuser')
-
+ :returns: True if new access was granted, False if user already had
+ requested access or greater. Raises JujuError if failed.
"""
controller_facade = client.ControllerFacade.from_connection(
- self.connection)
+ self.connection())
user = tag.user(username)
- await self.revoke(username)
changes = client.ModifyControllerAccess(acl, 'grant', user)
- return await controller_facade.ModifyControllerAccess([changes])
+ try:
+ await controller_facade.ModifyControllerAccess([changes])
+ return True
+ except errors.JujuError as e:
+ if 'user already has' in str(e):
+ return False
+ else:
+ raise
- async def revoke(self, username):
- """Removes all access from a controller
+ async def revoke(self, username, acl='login'):
+ """Removes some or all access of a user to from a controller
+ If 'login' access is revoked, the user will no longer have any
+ permissions on the controller. Revoking a higher privilege from
+ a user without that privilege will have no effect.
:param str username: username
-
+ :param str acl: Access to remove ('login', 'add-model' or 'superuser')
"""
controller_facade = client.ControllerFacade.from_connection(
- self.connection)
+ self.connection())
user = tag.user(username)
changes = client.ModifyControllerAccess('login', 'revoke', user)
return await controller_facade.ModifyControllerAccess([changes])
+
+ async def grant_model(self, username, model_uuid, acl='read'):
+ """Grant a user access to a model. Note that if the user
+ already has higher permissions than the provided ACL,
+ this will do nothing (see revoke_model for a way to remove permissions).
+
+ :param str username: Username
+ :param str model_uuid: The UUID of the model to change.
+ :param str acl: Access control ('read, 'write' or 'admin')
+ """
+ model_facade = client.ModelManagerFacade.from_connection(
+ self.connection())
+ user = tag.user(username)
+ model = tag.model(model_uuid)
+ changes = client.ModifyModelAccess(acl, 'grant', model, user)
+ return await model_facade.ModifyModelAccess([changes])
+
+ async def revoke_model(self, username, model_uuid, acl='read'):
+ """Revoke some or all of a user's access to a model.
+ If 'read' access is revoked, the user will no longer have any
+ permissions on the model. Revoking a higher privilege from
+ a user without that privilege will have no effect.
+
+ :param str username: Username to revoke
+ :param str model_uuid: The UUID of the model to change.
+ :param str acl: Access control ('read, 'write' or 'admin')
+ """
+ model_facade = client.ModelManagerFacade.from_connection(
+ self.connection())
+ user = tag.user(username)
+ model = tag.model(self.info.uuid)
+ changes = client.ModifyModelAccess(acl, 'revoke', model, user)
+ return await model_facade.ModifyModelAccess([changes])
diff --git a/modules/libjuju/juju/errors.py b/modules/libjuju/juju/errors.py
index ecd1c0d..da11cdb 100644
--- a/modules/libjuju/juju/errors.py
+++ b/modules/libjuju/juju/errors.py
@@ -25,3 +25,25 @@
class JujuConnectionError(ConnectionError, JujuError):
pass
+
+
+class JujuAuthError(JujuConnectionError):
+ pass
+
+
+class JujuRedirectException(Exception):
+ """Exception indicating that a redirection was requested"""
+ def __init__(self, redirect_info):
+ self.redirect_info = redirect_info
+
+ @property
+ def ca_cert(self):
+ return self.redirect_info['ca-cert']
+
+ @property
+ def endpoints(self):
+ return [
+ ('{value}:{port}'.format(**s), self.ca_cert)
+ for servers in self.redirect_info['servers']
+ for s in servers if s['scope'] == 'public'
+ ]
diff --git a/modules/libjuju/juju/machine.py b/modules/libjuju/juju/machine.py
index 23b41c6..bd3d030 100644
--- a/modules/libjuju/juju/machine.py
+++ b/modules/libjuju/juju/machine.py
@@ -2,7 +2,7 @@
import logging
import os
-from dateutil.parser import parse as parse_date
+import pyrfc3339
from . import model, utils
from .client import client
@@ -66,8 +66,8 @@
change_log.append(('agent-version', '', agent_version))
# only update (other) delta fields if status data is newer
- status_since = parse_date(machine['instance-status']['since'])
- delta_since = parse_date(delta.data['instance-status']['since'])
+ status_since = pyrfc3339.parse(machine['instance-status']['since'])
+ delta_since = pyrfc3339.parse(delta.data['instance-status']['since'])
if status_since > delta_since:
for status_key in ('status', 'info', 'since'):
delta_key = key_map[status_key]
@@ -169,6 +169,8 @@
'scp',
'-i', os.path.expanduser('~/.local/share/juju/ssh/juju_id_rsa'),
'-o', 'StrictHostKeyChecking=no',
+ '-q',
+ '-B',
source, destination
]
cmd += scp_opts.split()
@@ -211,7 +213,7 @@
"""Get the time when the `agent_status` was last updated.
"""
- return parse_date(self.safe_data['agent-status']['since'])
+ return pyrfc3339.parse(self.safe_data['agent-status']['since'])
@property
def agent_version(self):
@@ -244,7 +246,7 @@
"""Get the time when the `status` was last updated.
"""
- return parse_date(self.safe_data['instance-status']['since'])
+ return pyrfc3339.parse(self.safe_data['instance-status']['since'])
@property
def dns_name(self):
@@ -260,3 +262,10 @@
if addresses:
return addresses[0]['value']
return None
+
+ @property
+ def series(self):
+ """Returns the series of the current machine
+
+ """
+ return self.safe_data['series']
diff --git a/modules/libjuju/juju/model.py b/modules/libjuju/juju/model.py
index fc8d5e9..ac22599 100644
--- a/modules/libjuju/juju/model.py
+++ b/modules/libjuju/juju/model.py
@@ -14,26 +14,25 @@
from functools import partial
from pathlib import Path
-import websockets
-import yaml
import theblues.charmstore
import theblues.errors
+import websockets
+import yaml
from . import tag, utils
-from .client import client
-from .client import connection
+from .client import client, connector
from .client.client import ConfigValue
-from .constraints import parse as parse_constraints, normalize_key
-from .delta import get_entity_delta
-from .delta import get_entity_class
+from .constraints import parse as parse_constraints
+from .constraints import normalize_key
+from .delta import get_entity_class, get_entity_delta
+from .errors import JujuAPIError, JujuError
from .exceptions import DeadEntityException
-from .errors import JujuError, JujuAPIError
from .placement import parse as parse_placement
log = logging.getLogger(__name__)
-class _Observer(object):
+class _Observer:
"""Wrapper around an observer callable.
This wrapper allows filter criteria to be associated with the
@@ -77,7 +76,7 @@
return True
-class ModelObserver(object):
+class ModelObserver:
"""
Base class for creating observers that react to changes in a model.
"""
@@ -100,7 +99,7 @@
pass
-class ModelState(object):
+class ModelState:
"""Holds the state of the model, including the delta history of all
entities in the model.
@@ -144,6 +143,14 @@
"""
return self._live_entity_map('unit')
+ @property
+ def relations(self):
+ """Return a map of relation-id:Relation for all relations currently in
+ the model.
+
+ """
+ return self._live_entity_map('relation')
+
def entity_history(self, entity_type, entity_id):
"""Return the history deque for an entity.
@@ -209,7 +216,7 @@
connected=connected)
-class ModelEntity(object):
+class ModelEntity:
"""An object in the Model tree"""
def __init__(self, entity_id, model, history_index=-1, connected=True):
@@ -228,7 +235,7 @@
self.model = model
self._history_index = history_index
self.connected = connected
- self.connection = model.connection
+ self.connection = model.connection()
def __repr__(self):
return '<{} entity_id="{}">'.format(type(self).__name__,
@@ -380,90 +387,148 @@
return self.model.state.get_entity(self.entity_type, self.entity_id)
-class Model(object):
+class Model:
"""
The main API for interacting with a Juju model.
"""
- def __init__(self, loop=None,
- max_frame_size=connection.Connection.DEFAULT_FRAME_SIZE):
- """Instantiate a new connected Model.
+ def __init__(
+ self,
+ loop=None,
+ max_frame_size=None,
+ bakery_client=None,
+ jujudata=None,
+ ):
+ """Instantiate a new Model.
+
+ The connect method will need to be called before this
+ object can be used for anything interesting.
+
+ If jujudata is None, jujudata.FileJujuData will be used.
:param loop: an asyncio event loop
:param max_frame_size: See
`juju.client.connection.Connection.MAX_FRAME_SIZE`
-
+ :param bakery_client httpbakery.Client: The bakery client to use
+ for macaroon authorization.
+ :param jujudata JujuData: The source for current controller information.
"""
- self.loop = loop or asyncio.get_event_loop()
- self.max_frame_size = max_frame_size
- self.connection = None
- self.observers = weakref.WeakValueDictionary()
+ self._connector = connector.Connector(
+ loop=loop,
+ max_frame_size=max_frame_size,
+ bakery_client=bakery_client,
+ jujudata=jujudata,
+ )
+ self._observers = weakref.WeakValueDictionary()
self.state = ModelState(self)
- self.info = None
- self._watch_stopping = asyncio.Event(loop=self.loop)
- self._watch_stopped = asyncio.Event(loop=self.loop)
- self._watch_received = asyncio.Event(loop=self.loop)
- self._charmstore = CharmStore(self.loop)
+ self._info = None
+ self._watch_stopping = asyncio.Event(loop=self._connector.loop)
+ self._watch_stopped = asyncio.Event(loop=self._connector.loop)
+ self._watch_received = asyncio.Event(loop=self._connector.loop)
+ self._watch_stopped.set()
+ self._charmstore = CharmStore(self._connector.loop)
+
+ def is_connected(self):
+ """Reports whether the Model is currently connected."""
+ return self._connector.is_connected()
+
+ @property
+ def loop(self):
+ return self._connector.loop
+
+ def connection(self):
+ """Return the current Connection object. It raises an exception
+ if the Model is disconnected"""
+ return self._connector.connection()
+
+ async def get_controller(self):
+ """Return a Controller instance for the currently connected model.
+ :return Controller:
+ """
+ from juju.controller import Controller
+ controller = Controller(jujudata=self._connector.jujudata)
+ kwargs = self.connection().connect_params()
+ kwargs.pop('uuid')
+ await controller._connect_direct(**kwargs)
+ return controller
async def __aenter__(self):
- await self.connect_current()
+ await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.disconnect()
- if exc_type is not None:
- return False
+ async def connect(self, model_name=None, **kwargs):
+ """Connect to a juju model.
- async def connect(self, *args, **kw):
- """Connect to an arbitrary Juju model.
+ If any arguments are specified other than model_name, then
+ model_name must be None and an explicit connection will be made
+ using Connection.connect using those parameters (the 'uuid'
+ parameter must be specified).
- args and kw are passed through to Connection.connect()
+ Otherwise, if model_name is None, connect to the current model.
- """
- if 'loop' not in kw:
- kw['loop'] = self.loop
- if 'max_frame_size' not in kw:
- kw['max_frame_size'] = self.max_frame_size
- self.connection = await connection.Connection.connect(*args, **kw)
- await self._after_connect()
-
- async def connect_current(self):
- """Connect to the current Juju model.
-
- """
- self.connection = await connection.Connection.connect_current(
- self.loop, max_frame_size=self.max_frame_size)
- await self._after_connect()
-
- async def connect_model(self, model_name):
- """Connect to a specific Juju model by name.
+ Otherwise, model_name must specify the name of a known
+ model.
:param model_name: Format [controller:][user/]model
"""
- self.connection = await connection.Connection.connect_model(
- model_name, self.loop, self.max_frame_size)
+ await self.disconnect()
+ if not kwargs:
+ await self._connector.connect_model(model_name)
+ else:
+ if kwargs.get('uuid') is None:
+ raise ValueError('no UUID specified when connecting to model')
+ await self._connector.connect(**kwargs)
+ await self._after_connect()
+
+ async def connect_model(self, model_name):
+ """
+ .. deprecated:: 0.6.2
+ Use connect(model_name=model_name) instead.
+ """
+ return await self.connect(model_name=model_name)
+
+ async def connect_current(self):
+ """
+ .. deprecated:: 0.6.2
+ Use connect instead.
+ """
+ return await self.connect()
+
+ async def _connect_direct(self, **kwargs):
+ await self.disconnect()
+ await self._connector.connect(**kwargs)
await self._after_connect()
async def _after_connect(self):
- """Run initialization steps after connecting to websocket.
-
- """
self._watch()
+
+ # Wait for the first packet of data from the AllWatcher,
+ # which contains all information on the model.
+ # TODO this means that we can't do anything until
+ # we've received all the model data, which might be
+ # a whole load of unneeded data if all the client wants
+ # to do is make one RPC call.
await self._watch_received.wait()
+
await self.get_info()
async def disconnect(self):
"""Shut down the watcher task and close websockets.
"""
- if self.connection and self.connection.is_open:
+ if not self._watch_stopped.is_set():
log.debug('Stopping watcher task')
self._watch_stopping.set()
await self._watch_stopped.wait()
+ self._watch_stopping.clear()
+
+ if self.is_connected():
log.debug('Closing model connection')
- await self.connection.close()
- self.connection = None
+ await self._connector.disconnect()
+ self.info = None
async def add_local_charm_dir(self, charm_dir, series):
"""Upload a local charm to the model.
@@ -480,7 +545,7 @@
with fh:
func = partial(
self.add_local_charm, fh, series, os.stat(fh.name).st_size)
- charm_url = await self.loop.run_in_executor(None, func)
+ charm_url = await self._connector.loop.run_in_executor(None, func)
log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
return charm_url
@@ -505,7 +570,7 @@
instead.
"""
- conn, headers, path_prefix = self.connection.https_connection()
+ conn, headers, path_prefix = self.connection().https_connection()
path = "%s/charms?series=%s" % (path_prefix, series)
headers['Content-Type'] = 'application/zip'
if size:
@@ -549,13 +614,20 @@
async def block_until(self, *conditions, timeout=None, wait_period=0.5):
"""Return only after all conditions are true.
+ Raises `websockets.ConnectionClosed` if disconnected.
"""
- async def _block():
- while not all(c() for c in conditions):
- if not (self.connection and self.connection.is_open):
- raise websockets.ConnectionClosed(1006, 'no reason')
- await asyncio.sleep(wait_period, loop=self.loop)
- await asyncio.wait_for(_block(), timeout, loop=self.loop)
+ def _disconnected():
+ return not (self.is_connected() and self.connection().is_open)
+
+ def done():
+ return _disconnected() or all(c() for c in conditions)
+
+ await utils.block_until(done,
+ timeout=timeout,
+ wait_period=wait_period,
+ loop=self.loop)
+ if _disconnected():
+ raise websockets.ConnectionClosed(1006, 'no reason')
@property
def applications(self):
@@ -581,6 +653,13 @@
"""
return self.state.units
+ @property
+ def relations(self):
+ """Return a list of all Relations currently in the model.
+
+ """
+ return list(self.state.relations.values())
+
async def get_info(self):
"""Return a client.ModelInfo object for this Model.
@@ -594,7 +673,7 @@
explicit call to this method.
"""
- facade = client.ClientFacade.from_connection(self.connection)
+ facade = client.ClientFacade.from_connection(self.connection())
self.info = await facade.ModelInfo()
log.debug('Got ModelInfo: %s', vars(self.info))
@@ -639,7 +718,7 @@
"""
observer = _Observer(
callable_, entity_type, action, entity_id, predicate)
- self.observers[observer] = callable_
+ self._observers[observer] = callable_
def _watch(self):
"""Start an asynchronous watch against this model.
@@ -650,13 +729,13 @@
async def _all_watcher():
try:
allwatcher = client.AllWatcherFacade.from_connection(
- self.connection)
+ self.connection())
while not self._watch_stopping.is_set():
try:
results = await utils.run_with_interrupt(
allwatcher.Next(),
self._watch_stopping,
- self.loop)
+ self._connector.loop)
except JujuAPIError as e:
if 'watcher was stopped' not in str(e):
raise
@@ -673,19 +752,27 @@
del allwatcher.Id
continue
except websockets.ConnectionClosed:
- monitor = self.connection.monitor
+ monitor = self.connection().monitor
if monitor.status == monitor.ERROR:
# closed unexpectedly, try to reopen
log.warning(
'Watcher: connection closed, reopening')
- await self.connection.reconnect()
+ await self.connection().reconnect()
+ if monitor.status != monitor.CONNECTED:
+ # reconnect failed; abort and shutdown
+ log.error('Watcher: automatic reconnect '
+ 'failed; stopping watcher')
+ break
del allwatcher.Id
continue
else:
# closed on request, go ahead and shutdown
break
if self._watch_stopping.is_set():
- await allwatcher.Stop()
+ try:
+ await allwatcher.Stop()
+ except websockets.ConnectionClosed:
+ pass # can't stop on a closed conn
break
for delta in results.deltas:
delta = get_entity_delta(delta)
@@ -704,7 +791,7 @@
self._watch_received.clear()
self._watch_stopping.clear()
self._watch_stopped.clear()
- self.loop.create_task(_all_watcher())
+ self._connector.loop.create_task(_all_watcher())
async def _notify_observers(self, delta, old_obj, new_obj):
"""Call observing callbacks, notifying them of a change in model state
@@ -724,10 +811,10 @@
'Model changed: %s %s %s',
delta.entity, delta.type, delta.get_id())
- for o in self.observers:
+ for o in self._observers:
if o.cares_about(delta):
asyncio.ensure_future(o(delta, old_obj, new_obj, self),
- loop=self.loop)
+ loop=self._connector.loop)
async def _wait(self, entity_type, entity_id, action, predicate=None):
"""
@@ -744,7 +831,7 @@
has a 'completed' status. See the _Observer class for details.
"""
- q = asyncio.Queue(loop=self.loop)
+ q = asyncio.Queue(loop=self._connector.loop)
async def callback(delta, old, new, model):
await q.put(delta.get_id())
@@ -755,24 +842,19 @@
# 'remove' action
return self.state._live_entity_map(entity_type).get(entity_id)
- async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
+ async def _wait_for_new(self, entity_type, entity_id):
"""Wait for a new object to appear in the Model and return it.
- Waits for an object of type ``entity_type`` with id ``entity_id``.
- If ``entity_id`` is ``None``, it will wait for the first new entity
- of the correct type.
-
- This coroutine blocks until the new object appears in the model.
+ Waits for an object of type ``entity_type`` with id ``entity_id``
+ to appear in the model. This is similar to watching for the
+ object using ``block_until``, but uses the watcher rather than
+ polling.
"""
# if the entity is already in the model, just return it
if entity_id in self.state._live_entity_map(entity_type):
return self.state._live_entity_map(entity_type)[entity_id]
- # if we know the entity_id, we can trigger on any action that puts
- # the enitty into the model; otherwise, we have to watch for the
- # next "add" action on that entity_type
- action = 'add' if entity_id is None else None
- return await self._wait(entity_type, entity_id, action, predicate)
+ return await self._wait(entity_type, entity_id, None)
async def wait_for_action(self, action_id):
"""Given an action, wait for it to complete."""
@@ -785,7 +867,7 @@
def predicate(delta):
return delta.data['status'] in ('completed', 'failed')
- return await self._wait('action', action_id, 'change', predicate)
+ return await self._wait('action', action_id, None, predicate)
async def add_machine(
self, spec=None, constraints=None, disks=None, series=None):
@@ -865,7 +947,7 @@
params.series = series
# Submit the request.
- client_facade = client.ClientFacade.from_connection(self.connection)
+ client_facade = client.ClientFacade.from_connection(self.connection())
results = await client_facade.AddMachines([params])
error = results.machines[0].error
if error:
@@ -881,29 +963,33 @@
:param str relation2: '<application>[:<relation_name>]'
"""
- app_facade = client.ApplicationFacade.from_connection(self.connection)
+ app_facade = client.ApplicationFacade.from_connection(self.connection())
log.debug(
'Adding relation %s <-> %s', relation1, relation2)
+ def _find_relation(*specs):
+ for rel in self.relations:
+ if rel.matches(*specs):
+ return rel
+ return None
+
try:
result = await app_facade.AddRelation([relation1, relation2])
except JujuAPIError as e:
if 'relation already exists' not in e.message:
raise
- log.debug(
- 'Relation %s <-> %s already exists', relation1, relation2)
- # TODO: if relation already exists we should return the
- # Relation ModelEntity here
- return None
+ rel = _find_relation(relation1, relation2)
+ if rel:
+ return rel
+ raise JujuError('Relation {} {} exists but not in model'.format(
+ relation1, relation2))
- def predicate(delta):
- endpoints = {}
- for endpoint in delta.data['endpoints']:
- endpoints[endpoint['application-name']] = endpoint['relation']
- return endpoints == result.endpoints
+ specs = ['{}:{}'.format(app, data['name'])
+ for app, data in result.endpoints.items()]
- return await self._wait_for_new('relation', None, predicate)
+ await self.block_until(lambda: _find_relation(*specs) is not None)
+ return _find_relation(*specs)
def add_space(self, name, *cidrs):
"""Add a new network space.
@@ -924,7 +1010,7 @@
:param str key: The public ssh key
"""
- key_facade = client.KeyManagerFacade.from_connection(self.connection)
+ key_facade = client.KeyManagerFacade.from_connection(self.connection())
return await key_facade.AddKeys([key], user)
add_ssh_keys = add_ssh_key
@@ -1072,9 +1158,14 @@
for k, v in storage.items()
}
+ entity_path = Path(entity_url.replace('local:', ''))
+ bundle_path = entity_path / 'bundle.yaml'
+ metadata_path = entity_path / 'metadata.yaml'
+
is_local = (
entity_url.startswith('local:') or
- os.path.isdir(entity_url)
+ entity_path.is_dir() or
+ entity_path.is_file()
)
if is_local:
entity_id = entity_url.replace('local:', '')
@@ -1082,10 +1173,11 @@
entity = await self.charmstore.entity(entity_url, channel=channel)
entity_id = entity['Id']
- client_facade = client.ClientFacade.from_connection(self.connection)
+ client_facade = client.ClientFacade.from_connection(self.connection())
is_bundle = ((is_local and
- (Path(entity_id) / 'bundle.yaml').exists()) or
+ (entity_id.endswith('.yaml') and entity_path.exists()) or
+ bundle_path.exists()) or
(not is_local and 'bundle/' in entity_id))
if is_bundle:
@@ -1100,9 +1192,9 @@
await asyncio.gather(*[
asyncio.ensure_future(
self._wait_for_new('application', app_name),
- loop=self.loop)
+ loop=self._connector.loop)
for app_name in pending_apps
- ], loop=self.loop)
+ ], loop=self._connector.loop)
return [app for name, app in self.applications.items()
if name in handler.applications]
else:
@@ -1118,6 +1210,9 @@
entity_id,
entity)
else:
+ if not application_name:
+ metadata = yaml.load(metadata_path.read_text())
+ application_name = metadata['name']
# We have a local charm dir that needs to be uploaded
charm_dir = os.path.abspath(
os.path.expanduser(entity_id))
@@ -1163,7 +1258,7 @@
return None
resources_facade = client.ResourcesFacade.from_connection(
- self.connection)
+ self.connection())
response = await resources_facade.AddPendingResources(
tag.application(application),
entity_url,
@@ -1186,7 +1281,7 @@
default_flow_style=False)
app_facade = client.ApplicationFacade.from_connection(
- self.connection)
+ self.connection())
app = client.ApplicationDeploy(
charm_url=charm_url,
@@ -1201,7 +1296,6 @@
storage=storage,
placement=placement
)
-
result = await app_facade.Deploy([app])
errors = [r.error.message for r in result.results if r.error]
if errors:
@@ -1218,7 +1312,7 @@
"""Destroy units by name.
"""
- app_facade = client.ApplicationFacade.from_connection(self.connection)
+ app_facade = client.ApplicationFacade.from_connection(self.connection())
log.debug(
'Destroying unit%s %s',
@@ -1263,7 +1357,7 @@
which have `source` and `value` attributes.
"""
config_facade = client.ModelConfigFacade.from_connection(
- self.connection
+ self.connection()
)
result = await config_facade.ModelGet()
config = result.config
@@ -1277,22 +1371,6 @@
"""
raise NotImplementedError()
- async def grant(self, username, acl='read'):
- """Grant a user access to this model.
-
- :param str username: Username
- :param str acl: Access control ('read' or 'write')
-
- """
- controller_conn = await self.connection.controller()
- model_facade = client.ModelManagerFacade.from_connection(
- controller_conn)
- user = tag.user(username)
- model = tag.model(self.info.uuid)
- changes = client.ModifyModelAccess(acl, 'grant', model, user)
- await self.revoke(username)
- return await model_facade.ModifyModelAccess([changes])
-
def import_ssh_key(self, identity):
"""Add a public SSH key from a trusted indentity source to this model.
@@ -1326,7 +1404,7 @@
else it's fingerprint
"""
- key_facade = client.KeyManagerFacade.from_connection(self.connection)
+ key_facade = client.KeyManagerFacade.from_connection(self.connection())
entity = {'tag': tag.model(self.info.uuid)}
entities = client.Entities([entity])
return await key_facade.ListKeys(entities, raw_ssh)
@@ -1399,7 +1477,7 @@
:param str user: Juju user to which the key is registered
"""
- key_facade = client.KeyManagerFacade.from_connection(self.connection)
+ key_facade = client.KeyManagerFacade.from_connection(self.connection())
key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
key = hashlib.md5(key).hexdigest()
key = ':'.join(a + b for a, b in zip(key[::2], key[1::2]))
@@ -1427,20 +1505,6 @@
"""
raise NotImplementedError()
- async def revoke(self, username):
- """Revoke a user's access to this model.
-
- :param str username: Username to revoke
-
- """
- controller_conn = await self.connection.controller()
- model_facade = client.ModelManagerFacade.from_connection(
- controller_conn)
- user = tag.user(username)
- model = tag.model(self.info.uuid)
- changes = client.ModifyModelAccess('read', 'revoke', model, user)
- return await model_facade.ModifyModelAccess([changes])
-
def run(self, command, timeout=None):
"""Run command on all machines in this model.
@@ -1457,7 +1521,7 @@
`ConfigValue` instances, as returned by `get_config`.
"""
config_facade = client.ModelConfigFacade.from_connection(
- self.connection
+ self.connection()
)
for key, value in config.items():
if isinstance(value, ConfigValue):
@@ -1506,7 +1570,7 @@
:param bool utc: Display time as UTC in RFC3339 format
"""
- client_facade = client.ClientFacade.from_connection(self.connection)
+ client_facade = client.ClientFacade.from_connection(self.connection())
return await client_facade.FullStatus(filters)
def sync_tools(
@@ -1587,7 +1651,7 @@
', '.join(tags) if tags else "all units")
metrics_facade = client.MetricsDebugFacade.from_connection(
- self.connection)
+ self.connection())
entities = [client.Entity(tag) for tag in tags]
metrics_result = await metrics_facade.GetMetrics(entities)
@@ -1623,7 +1687,7 @@
return series[0] if series else None
-class BundleHandler(object):
+class BundleHandler:
"""
Handle bundles by using the API to translate bundle YAML into a plan of
steps and then dispatching each of those using the API.
@@ -1638,11 +1702,11 @@
app_units = self._units_by_app.setdefault(unit.application, [])
app_units.append(unit_name)
self.client_facade = client.ClientFacade.from_connection(
- model.connection)
+ model.connection())
self.app_facade = client.ApplicationFacade.from_connection(
- model.connection)
+ model.connection())
self.ann_facade = client.AnnotationsFacade.from_connection(
- model.connection)
+ model.connection())
async def _handle_local_charms(self, bundle):
"""Search for references to local charms (i.e. filesystem paths)
@@ -1694,8 +1758,11 @@
return bundle
async def fetch_plan(self, entity_id):
- is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
- if is_local:
+ is_local = not entity_id.startswith('cs:')
+
+ if is_local and os.path.isfile(entity_id):
+ bundle_yaml = Path(entity_id).read_text()
+ elif is_local and os.path.isdir(entity_id):
bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
else:
bundle_yaml = await self.charmstore.files(entity_id,
@@ -1920,7 +1987,7 @@
return await entity.set_annotations(annotations)
-class CharmStore(object):
+class CharmStore:
"""
Async wrapper around theblues.charmstore.CharmStore
"""
@@ -1952,7 +2019,7 @@
return wrapper
-class CharmArchiveGenerator(object):
+class CharmArchiveGenerator:
"""
Create a Zip archive of a local charm directory for upload to a controller.
diff --git a/modules/libjuju/juju/relation.py b/modules/libjuju/juju/relation.py
index ef8946a..d2f2053 100644
--- a/modules/libjuju/juju/relation.py
+++ b/modules/libjuju/juju/relation.py
@@ -5,7 +5,119 @@
log = logging.getLogger(__name__)
+class Endpoint:
+ def __init__(self, model, data):
+ self.model = model
+ self.data = data
+
+ def __repr__(self):
+ return '<Endpoint {}:{}>'.format(self.application.name, self.name)
+
+ @property
+ def application(self):
+ return self.model.applications[self.data['application-name']]
+
+ @property
+ def name(self):
+ return self.data['relation']['name']
+
+ @property
+ def interface(self):
+ return self.data['relation']['interface']
+
+ @property
+ def role(self):
+ return self.data['relation']['role']
+
+ @property
+ def scope(self):
+ return self.data['relation']['scope']
+
+
class Relation(model.ModelEntity):
+ def __repr__(self):
+ return '<Relation id={} {}>'.format(self.entity_id, self.key)
+
+ @property
+ def endpoints(self):
+ return [Endpoint(self.model, data)
+ for data in self.safe_data['endpoints']]
+
+ @property
+ def provides(self):
+ """
+ The endpoint on the provides side of this relation, or None.
+ """
+ for endpoint in self.endpoints:
+ if endpoint.role == 'provider':
+ return endpoint
+ return None
+
+ @property
+ def requires(self):
+ """
+ The endpoint on the requires side of this relation, or None.
+ """
+ for endpoint in self.endpoints:
+ if endpoint.role == 'requirer':
+ return endpoint
+ return None
+
+ @property
+ def peers(self):
+ """
+ The peers endpoint of this relation, or None.
+ """
+ for endpoint in self.endpoints:
+ if endpoint.role == 'peer':
+ return endpoint
+ return None
+
+ @property
+ def is_subordinate(self):
+ return any(ep.scope == 'container' for ep in self.endpoints)
+
+ @property
+ def is_peer(self):
+ return any(ep.role == 'peer' for ep in self.endpoints)
+
+ def matches(self, *specs):
+ """
+ Check if this relation matches relationship specs.
+
+ Relation specs are strings that would be given to Juju to establish a
+ relation, and should be in the form ``<application>[:<endpoint_name>]``
+ where the ``:<endpoint_name>`` suffix is optional. If the suffix is
+ omitted, this relation will match on any endpoint as long as the given
+ application is involved.
+
+ In other words, this relation will match a spec if that spec could have
+ created this relation.
+
+ :return: True if all specs match.
+ """
+ for spec in specs:
+ if ':' in spec:
+ app_name, endpoint_name = spec.split(':')
+ else:
+ app_name, endpoint_name = spec, None
+ for endpoint in self.endpoints:
+ if app_name == endpoint.application.name and \
+ endpoint_name in (endpoint.name, None):
+ # found a match for this spec, so move to next one
+ break
+ else:
+ # no match for this spec
+ return False
+ return True
+
+ @property
+ def applications(self):
+ """
+ All applications involved in this relation.
+ """
+ return [ep.application for ep in self.endpoints]
+
async def destroy(self):
raise NotImplementedError()
# TODO: destroy a relation
diff --git a/modules/libjuju/juju/tag.py b/modules/libjuju/juju/tag.py
index 2514229..319e8f8 100644
--- a/modules/libjuju/juju/tag.py
+++ b/modules/libjuju/juju/tag.py
@@ -1,3 +1,8 @@
+# TODO: Tags should be a proper class, so that we can distinguish whether
+# something is already a tag or not. For example, 'user-foo' is a valid
+# username, but is ambiguous with the already-tagged username 'foo'.
+
+
def _prefix(prefix, s):
if s and not s.startswith(prefix):
return '{}{}'.format(prefix, s)
diff --git a/modules/libjuju/juju/unit.py b/modules/libjuju/juju/unit.py
index fc597bf..ce33b08 100644
--- a/modules/libjuju/juju/unit.py
+++ b/modules/libjuju/juju/unit.py
@@ -1,6 +1,6 @@
import logging
-from dateutil.parser import parse as parse_date
+import pyrfc3339
from . import model
from .client import client
@@ -21,7 +21,7 @@
"""Get the time when the `agent_status` was last updated.
"""
- return parse_date(self.safe_data['agent-status']['since'])
+ return pyrfc3339.parse(self.safe_data['agent-status']['since'])
@property
def agent_status_message(self):
@@ -42,7 +42,7 @@
"""Get the time when the `workload_status` was last updated.
"""
- return parse_date(self.safe_data['workload-status']['since'])
+ return pyrfc3339.parse(self.safe_data['workload-status']['since'])
@property
def workload_status_message(self):
diff --git a/modules/libjuju/juju/user.py b/modules/libjuju/juju/user.py
new file mode 100644
index 0000000..01710d7
--- /dev/null
+++ b/modules/libjuju/juju/user.py
@@ -0,0 +1,81 @@
+import logging
+
+import pyrfc3339
+
+from . import tag
+
+log = logging.getLogger(__name__)
+
+
+class User(object):
+ def __init__(self, controller, user_info):
+ self.controller = controller
+ self._user_info = user_info
+
+ @property
+ def tag(self):
+ return tag.user(self.username)
+
+ @property
+ def username(self):
+ return self._user_info.username
+
+ @property
+ def display_name(self):
+ return self._user_info.display_name
+
+ @property
+ def last_connection(self):
+ return pyrfc3339.parse(self._user_info.last_connection)
+
+ @property
+ def access(self):
+ return self._user_info.access
+
+ @property
+ def date_created(self):
+ return self._user_info.date_created
+
+ @property
+ def enabled(self):
+ return not self._user_info.disabled
+
+ @property
+ def disabled(self):
+ return self._user_info.disabled
+
+ @property
+ def created_by(self):
+ return self._user_info.created_by
+
+ async def set_password(self, password):
+ """Update this user's password.
+ """
+ await self.controller.change_user_password(self.username, password)
+ self._user_info.password = password
+
+ async def grant(self, acl='login'):
+ """Set access level of this user on the controller.
+
+ :param str acl: Access control ('login', 'add-model', or 'superuser')
+ """
+ if await self.controller.grant(self.username, acl):
+ self._user_info.access = acl
+
+ async def revoke(self):
+ """Removes all access rights for this user from the controller.
+ """
+ await self.controller.revoke(self.username)
+ self._user_info.access = ''
+
+ async def disable(self):
+ """Disable this user.
+ """
+ await self.controller.disable_user(self.username)
+ self._user_info.disabled = True
+
+ async def enable(self):
+ """Re-enable this user.
+ """
+ await self.controller.enable_user(self.username)
+ self._user_info.disabled = False
diff --git a/modules/libjuju/juju/utils.py b/modules/libjuju/juju/utils.py
index 1d9bc1c..3565fd6 100644
--- a/modules/libjuju/juju/utils.py
+++ b/modules/libjuju/juju/utils.py
@@ -46,6 +46,7 @@
can be passed on to a model.
'''
+ loop = loop or asyncio.get_event_loop()
return await loop.run_in_executor(None, _read_ssh_key)
@@ -71,6 +72,16 @@
await queue.put(value)
+async def block_until(*conditions, timeout=None, wait_period=0.5, loop=None):
+ """Return only after all conditions are true.
+
+ """
+ async def _block():
+ while not all(c() for c in conditions):
+ await asyncio.sleep(wait_period, loop=loop)
+ await asyncio.wait_for(_block(), timeout, loop=loop)
+
+
async def run_with_interrupt(task, event, loop=None):
"""
Awaits a task while allowing it to be interrupted by an `asyncio.Event`.
@@ -91,6 +102,10 @@
return_when=asyncio.FIRST_COMPLETED)
for f in pending:
f.cancel()
+ exception = [f.exception() for f in done
+ if f is not event_task and f.exception()]
+ if exception:
+ raise exception[0]
result = [f.result() for f in done if f is not event_task]
if result:
return result[0]