import argparse
import builtins
+from collections import defaultdict
import functools
+from glob import glob
import json
import keyword
from pathlib import Path
import pprint
+import re
import textwrap
from typing import Sequence, Mapping, TypeVar, Any, Union
import typing
_marker = object()
+JUJU_VERSION = re.compile('[0-9]+\.[0-9-]+[\.\-][0-9a-z]+(\.[0-9]+)?')
+# Workaround for https://bugs.launchpad.net/juju/+bug/1683906
+NAUGHTY_CLASSES = ['ClientFacade', 'Client', 'FullStatus', 'ModelStatusInfo',
+ 'ModelInfo']
+
+
# Map basic types to Python's typing with a callable
SCHEMA_TO_PYTHON = {
'string': str,
}
+# Friendly warning message to stick at the top of generated files.
+HEADER = """\
+# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
+# Changes will be overwritten/lost when the file is regenerated.
+
+"""
+
+
+# Classes and helper functions that we'll write to _client.py
+LOOKUP_FACADE = '''
+def lookup_facade(name, version):
+ """
+ Given a facade name and version, attempt to pull that facade out
+ of the correct client<version>.py file.
+
+ """
+ try:
+ facade = getattr(CLIENTS[str(version)], name)
+ except KeyError:
+ raise ImportError("No facades found for version {}".format(version))
+ except AttributeError:
+ raise ImportError(
+ "No facade with name '{}' in version {}".format(name, version))
+ return facade
+
+
+'''
+
+TYPE_FACTORY = '''
+class TypeFactory:
+ @classmethod
+ def from_connection(cls, connection):
+ """
+ Given a connected Connection object, return an initialized and
+ connected instance of an API Interface matching the name of
+ this class.
+
+ @param connection: initialized Connection object.
+
+ """
+ version = connection.facades[cls.__name__[:-6]]
+
+ c = lookup_facade(cls.__name__, version)
+ c = c()
+ c.connect(connection)
+
+ return c
+
+
+'''
+
+CLIENT_TABLE = '''
+CLIENTS = {{
+ {clients}
+}}
+
+
+'''
+
+
class KindRegistry(dict):
def register(self, name, version, obj):
self[name] = {version: {
_types = TypeRegistry()
_registry = KindRegistry()
-classes = {}
+CLASSES = {}
+factories = codegen.Capture()
def booler(v):
self.append((name, rtype))
def do_explode(self, kind):
- if kind in basic_types:
+ if kind in basic_types or type(kind) is typing.TypeVar:
return False
if not issubclass(kind, (typing.Sequence,
typing.Mapping)):
def buildTypes(schema, capture):
- global classes
INDENT = " "
for kind in sorted((k for k in _types if not isinstance(k, str)),
key=lambda x: str(x)):
name = _types[kind]
- args = Args(kind)
- if name in classes:
+ if name in capture and not name in NAUGHTY_CLASSES:
continue
+ args = Args(kind)
+ # Write Factory class for _client.py
+ make_factory(name)
+ # Write actual class
source = ["""
class {}(Type):
_toSchema = {}
_toPy = {}
- def __init__(self{}{}):
+ def __init__(self{}{}, **unknown_fields):
'''
{}
'''""".format(
name,
- args.PyToSchemaMapping(),
- args.SchemaToPyMapping(),
+ # pprint these to get stable ordering across regens
+ pprint.pformat(args.PyToSchemaMapping(), width=999),
+ pprint.pformat(args.SchemaToPyMapping(), width=999),
", " if args else "",
args.as_kwargs(),
textwrap.indent(args.get_doc(), INDENT * 2))
if arg_type in basic_types:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif issubclass(arg_type, typing.Sequence):
- value_type = arg_type_name.__parameters__[0]
+ value_type = (
+ arg_type_name.__parameters__[0]
+ if len(arg_type_name.__parameters__)
+ else None
+ )
if type(value_type) is typing.TypeVar:
source.append("{}self.{} = [{}.from_json(o) for o in {} or []]".format(
INDENT * 2, arg_name, strcast(value_type), arg_name))
else:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif issubclass(arg_type, typing.Mapping):
- value_type = arg_type_name.__parameters__[1]
+ value_type = (
+ arg_type_name.__parameters__[1]
+ if len(arg_type_name.__parameters__) > 1
+ else None
+ )
if type(value_type) is typing.TypeVar:
source.append("{}self.{} = {{k: {}.from_json(v) for k, v in ({} or dict()).items()}}".format(
INDENT * 2, arg_name, strcast(value_type), arg_name))
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
source = "\n".join(source)
- capture.write(source)
- capture.write("\n\n")
+ capture.clear(name)
+ capture[name].write(source)
+ capture[name].write("\n\n")
co = compile(source, __name__, "exec")
ns = _getns()
exec(co, ns)
cls = ns[name]
- classes[name] = cls
+ CLASSES[name] = cls
def retspec(defs):
# Error or the expected Type
if not defs:
return None
+ if defs in basic_types:
+ return strcast(defs, False)
rtypes = _registry.getObj(_types[defs])
if not rtypes:
return None
reply = await f(*args, **kwargs)
if cls is None:
return reply
- if 'Error' in reply:
- cls = classes['Error']
+ if 'error' in reply:
+ cls = CLASSES['Error']
if issubclass(cls, typing.Sequence):
result = []
item_cls = cls.__parameters__[0]
for item in reply:
result.append(item_cls.from_json(item))
+ """
+ if 'error' in item:
+ cls = CLASSES['Error']
+ else:
+ cls = item_cls
+ result.append(cls.from_json(item))
+ """
else:
- result = cls.from_json(reply['Response'])
+ result = cls.from_json(reply['response'])
return result
return wrapper
assignments = []
toschema = args.PyToSchemaMapping()
for arg in args._get_arg_str(False, False):
- assignments.append("{}params[\'{}\'] = {}".format(INDENT,
- toschema[arg],
- arg))
+ assignments.append("{}_params[\'{}\'] = {}".format(INDENT,
+ toschema[arg],
+ arg))
assignments = "\n".join(assignments)
res = retspec(result)
source = """
Returns -> {res}
'''
# map input types to rpc msg
- params = dict()
- msg = dict(Type='{cls.name}', Request='{name}', Version={cls.version}, Params=params)
+ _params = dict()
+ msg = dict(type='{cls.name}', request='{name}', version={cls.version}, params=_params)
{assignments}
reply = {await}self.rpc(msg)
return reply
for methodname in sorted(properties):
method, source = _buildMethod(cls, methodname)
setattr(cls, methodname, method)
- capture.write(source, depth=1)
+ capture["{}Facade".format(cls.__name__)].write(source, depth=1)
def _buildMethod(cls, name):
params = _types.get(spec['$ref'])
spec = prop.get('Result')
if spec:
- result = _types.get(spec['$ref'])
+ if '$ref' in spec:
+ result = _types.get(spec['$ref'])
+ else:
+ result = SCHEMA_TO_PYTHON[spec['type']]
return makeFunc(cls, name, params, result)
@classmethod
def from_json(cls, data):
+ if isinstance(data, cls):
+ return data
if isinstance(data, str):
- data = json.loads(data)
+ try:
+ data = json.loads(data)
+ except json.JSONDecodeError:
+ raise
d = {}
for k, v in (data or {}).items():
d[cls._toPy.get(k, k)] = v
try:
return cls(**d)
except TypeError:
- print(cls)
raise
def serialize(self):
if not defs:
return
for d, data in defs.items():
- if d in _registry:
+ if d in _registry and not d in NAUGHTY_CLASSES:
continue
node = self.deref(data, d)
kind = node.get("type")
return ns
-
-def generate_facacdes(options):
- global classes
- schemas = json.loads(Path(options.schema).read_text("utf-8"))
- capture = codegen.CodeWriter()
- capture.write(textwrap.dedent("""\
- # DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
- # Changes will be overwritten/lost when the file is regenerated.
-
- from juju.client.facade import Type, ReturnMapping
-
- """))
- schemas = [Schema(s) for s in schemas]
-
- for schema in schemas:
- schema.buildDefinitions()
- buildTypes(schema, capture)
-
- for schema in schemas:
- # TODO generate class now with a metaclass that takes the schema
- # the generated class has the right name and it in turn uses
- # the metaclass to populate cls
- cls, source = buildFacade(schema)
- capture.write(source)
- buildMethods(cls, capture)
- classes[schema.name] = cls
-
- return capture
+def make_factory(name):
+ if name in factories:
+ del factories[name]
+ factories[name].write("class {}(TypeFactory):\n pass\n\n".format(name))
+
+
+def write_facades(captures, options):
+ """
+ Write the Facades to the appropriate _client<version>.py
+
+ """
+ for version in sorted(captures.keys()):
+ filename = "{}/_client{}.py".format(options.output_dir, version)
+ with open(filename, "w") as f:
+ f.write(HEADER)
+ f.write("from juju.client.facade import Type, ReturnMapping\n")
+ f.write("from juju.client._definitions import *\n\n")
+ for key in sorted(
+ [k for k in captures[version].keys() if "Facade" in k]):
+ print(captures[version][key], file=f)
+
+ # Return the last (most recent) version for use in other routines.
+ return version
+
+
+def write_definitions(captures, options, version):
+ """
+ Write auxillary (non versioned) classes to
+ _definitions.py The auxillary classes currently get
+ written redudantly into each capture object, so we can look in
+ one of them -- we just use the last one from the loop above.
+
+ """
+ with open("{}/_definitions.py".format(options.output_dir), "w") as f:
+ f.write(HEADER)
+ f.write("from juju.client.facade import Type, ReturnMapping\n\n")
+ for key in sorted(
+ [k for k in captures[version].keys() if "Facade" not in k]):
+ print(captures[version][key], file=f)
+
+
+def write_client(captures, options):
+ """
+ Write the TypeFactory classes to _client.py, along with some
+ imports and tables so that we can look up versioned Facades.
+
+ """
+ with open("{}/_client.py".format(options.output_dir), "w") as f:
+ f.write(HEADER)
+ f.write("from juju.client._definitions import *\n\n")
+ clients = ", ".join("_client{}".format(v) for v in captures)
+ f.write("from juju.client import " + clients + "\n\n")
+ f.write(CLIENT_TABLE.format(clients=",\n ".join(
+ ['"{}": _client{}'.format(v, v) for v in captures])))
+ f.write(LOOKUP_FACADE)
+ f.write(TYPE_FACTORY)
+ for key in sorted([k for k in factories.keys() if "Facade" in k]):
+ print(factories[key], file=f)
+
+
+def generate_facades(options):
+ captures = defaultdict(codegen.Capture)
+ schemas = {}
+ for p in sorted(glob(options.schema)):
+ if 'latest' in p:
+ juju_version = 'latest'
+ else:
+ try:
+ juju_version = re.search(JUJU_VERSION, p).group()
+ except AttributeError:
+ print("Cannot extract a juju version from {}".format(p))
+ print("Schemas must include a juju version in the filename")
+ raise SystemExit(1)
+
+ new_schemas = json.loads(Path(p).read_text("utf-8"))
+ schemas[juju_version] = [Schema(s) for s in new_schemas]
+
+ # Build all of the auxillary (unversioned) classes
+ # TODO: get rid of some of the excess trips through loops in the
+ # called functions.
+ for juju_version in sorted(schemas.keys()):
+ for schema in schemas[juju_version]:
+ schema.buildDefinitions()
+ buildTypes(schema, captures[schema.version])
+
+ # Build the Facade classes
+ for juju_version in sorted(schemas.keys()):
+ for schema in schemas[juju_version]:
+ cls, source = buildFacade(schema)
+ cls_name = "{}Facade".format(schema.name)
+
+ captures[schema.version].clear(cls_name)
+ # Make the factory class for _client.py
+ make_factory(cls_name)
+ # Make the actual class
+ captures[schema.version][cls_name].write(source)
+ # Build the methods for each Facade class.
+ buildMethods(cls, captures[schema.version])
+ # Mark this Facade class as being done for this version --
+ # helps mitigate some excessive looping.
+ CLASSES[schema.name] = cls
+
+ return captures
def setup():
parser = argparse.ArgumentParser()
- parser.add_argument("-s", "--schema", default="schemas.json")
- parser.add_argument("-o", "--output", default="client.py")
+ parser.add_argument("-s", "--schema", default="juju/client/schemas*")
+ parser.add_argument("-o", "--output_dir", default="juju/client")
options = parser.parse_args()
return options
def main():
options = setup()
- capture = generate_facacdes(options)
- with open(options.output, "w") as fp:
- print(capture, file=fp)
+ # Generate some text blobs
+ captures = generate_facades(options)
+ # ... and write them out
+ last_version = write_facades(captures, options)
+ write_definitions(captures, options, last_version)
+ write_client(captures, options)
if __name__ == '__main__':
main()