blob: c959e01649964a9a5eb955e2fc0829ebabfcc90f [file] [log] [blame]
import argparse
import builtins
from collections import defaultdict
import functools
from glob import glob
import json
import keyword
from pathlib import Path
import pprint
import re
import textwrap
from typing import Sequence, Mapping, TypeVar, Any, Union
import typing
from . import codegen
_marker = object()
JUJU_VERSION = re.compile('[0-9]+\.[0-9-]+[\.\-][0-9a-z]+(\.[0-9]+)?')
# Workaround for https://bugs.launchpad.net/juju/+bug/1683906
NAUGHTY_CLASSES = ['ClientFacade', 'Client', 'FullStatus', 'ModelStatusInfo',
'ModelInfo']
# Map basic types to Python's typing with a callable
SCHEMA_TO_PYTHON = {
'string': str,
'integer': int,
'float': float,
'number': float,
'boolean': bool,
'object': Any,
}
# Friendly warning message to stick at the top of generated files.
HEADER = """\
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.
"""
# Classes and helper functions that we'll write to _client.py
LOOKUP_FACADE = '''
def lookup_facade(name, version):
"""
Given a facade name and version, attempt to pull that facade out
of the correct client<version>.py file.
"""
try:
facade = getattr(CLIENTS[str(version)], name)
except KeyError:
raise ImportError("No facades found for version {}".format(version))
except AttributeError:
raise ImportError(
"No facade with name '{}' in version {}".format(name, version))
return facade
'''
TYPE_FACTORY = '''
class TypeFactory:
@classmethod
def from_connection(cls, connection):
"""
Given a connected Connection object, return an initialized and
connected instance of an API Interface matching the name of
this class.
@param connection: initialized Connection object.
"""
version = connection.facades[cls.__name__[:-6]]
c = lookup_facade(cls.__name__, version)
c = c()
c.connect(connection)
return c
'''
CLIENT_TABLE = '''
CLIENTS = {{
{clients}
}}
'''
class KindRegistry(dict):
def register(self, name, version, obj):
self[name] = {version: {
"object": obj,
}}
def lookup(self, name, version=None):
"""If version is omitted, max version is used"""
versions = self.get(name)
if not versions:
return None
if version:
return versions[version]
return versions[max(versions)]
def getObj(self, name, version=None):
result = self.lookup(name, version)
if result:
obj = result["object"]
return obj
return None
class TypeRegistry(dict):
def get(self, name):
# Two way mapping
refname = Schema.referenceName(name)
if refname not in self:
result = TypeVar(refname)
self[refname] = result
self[result] = refname
return self[refname]
_types = TypeRegistry()
_registry = KindRegistry()
CLASSES = {}
factories = codegen.Capture()
def booler(v):
if isinstance(v, str):
if v == "false":
return False
return bool(v)
def getRefType(ref):
return _types.get(ref)
def refType(obj):
return getRefType(obj["$ref"])
def objType(obj):
kind = obj.get('type')
if not kind:
raise ValueError("%s has no type" % obj)
result = SCHEMA_TO_PYTHON.get(kind)
if not result:
raise ValueError("%s has type %s" % (obj, kind))
return result
basic_types = [str, bool, int, float]
def name_to_py(name):
result = name.replace("-", "_")
result = result.lower()
if keyword.iskeyword(result) or result in dir(builtins):
result += "_"
return result
def strcast(kind, keep_builtins=False):
if issubclass(kind, typing.GenericMeta):
return str(kind)[1:]
if str(kind).startswith('~'):
return str(kind)[1:]
if (kind in basic_types or
type(kind) in basic_types) and keep_builtins is False:
return kind.__name__
return kind
class Args(list):
def __init__(self, defs):
self.defs = defs
if defs:
rtypes = _registry.getObj(_types[defs])
if len(rtypes) == 1:
if not self.do_explode(rtypes[0][1]):
for name, rtype in rtypes:
self.append((name, rtype))
else:
for name, rtype in rtypes:
self.append((name, rtype))
def do_explode(self, kind):
if kind in basic_types or type(kind) is typing.TypeVar:
return False
if not issubclass(kind, (typing.Sequence,
typing.Mapping)):
self.clear()
self.extend(Args(kind))
return True
return False
def PyToSchemaMapping(self):
m = {}
for n, rt in self:
m[name_to_py(n)] = n
return m
def SchemaToPyMapping(self):
m = {}
for n, tr in self:
m[n] = name_to_py(n)
return m
def _format(self, name, rtype, typed=True):
if typed:
return "{} : {}".format(
name_to_py(name),
strcast(rtype)
)
else:
return name_to_py(name)
def _get_arg_str(self, typed=False, joined=", "):
if self:
parts = []
for item in self:
parts.append(self._format(item[0], item[1], typed))
if joined:
return joined.join(parts)
return parts
return ''
def as_kwargs(self):
if self:
parts = []
for item in self:
parts.append('{}=None'.format(name_to_py(item[0])))
return ', '.join(parts)
return ''
def typed(self):
return self._get_arg_str(True)
def __str__(self):
return self._get_arg_str(False)
def get_doc(self):
return self._get_arg_str(True, "\n")
def buildTypes(schema, capture):
INDENT = " "
for kind in sorted((k for k in _types if not isinstance(k, str)),
key=lambda x: str(x)):
name = _types[kind]
if name in capture and not name in NAUGHTY_CLASSES:
continue
args = Args(kind)
# Write Factory class for _client.py
make_factory(name)
# Write actual class
source = ["""
class {}(Type):
_toSchema = {}
_toPy = {}
def __init__(self{}{}, **unknown_fields):
'''
{}
'''""".format(
name,
# pprint these to get stable ordering across regens
pprint.pformat(args.PyToSchemaMapping(), width=999),
pprint.pformat(args.SchemaToPyMapping(), width=999),
", " if args else "",
args.as_kwargs(),
textwrap.indent(args.get_doc(), INDENT * 2))
]
assignments = args._get_arg_str(False, False)
if not args:
source.append("{}pass".format(INDENT * 2))
else:
for arg in args:
arg_name = name_to_py(arg[0])
arg_type = arg[1]
arg_type_name = strcast(arg_type)
if arg_type in basic_types:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif issubclass(arg_type, typing.Sequence):
value_type = (
arg_type_name.__parameters__[0]
if len(arg_type_name.__parameters__)
else None
)
if type(value_type) is typing.TypeVar:
source.append("{}self.{} = [{}.from_json(o) for o in {} or []]".format(
INDENT * 2, arg_name, strcast(value_type), arg_name))
else:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif issubclass(arg_type, typing.Mapping):
value_type = (
arg_type_name.__parameters__[1]
if len(arg_type_name.__parameters__) > 1
else None
)
if type(value_type) is typing.TypeVar:
source.append("{}self.{} = {{k: {}.from_json(v) for k, v in ({} or dict()).items()}}".format(
INDENT * 2, arg_name, strcast(value_type), arg_name))
else:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif type(arg_type) is typing.TypeVar:
source.append("{}self.{} = {}.from_json({}) if {} else None".format(
INDENT * 2, arg_name, arg_type_name, arg_name, arg_name))
else:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
source = "\n".join(source)
capture.clear(name)
capture[name].write(source)
capture[name].write("\n\n")
co = compile(source, __name__, "exec")
ns = _getns()
exec(co, ns)
cls = ns[name]
CLASSES[name] = cls
def retspec(defs):
# return specs
# only return 1, so if there is more than one type
# we need to include a union
# In truth there is only 1 return
# Error or the expected Type
if not defs:
return None
if defs in basic_types:
return strcast(defs, False)
rtypes = _registry.getObj(_types[defs])
if not rtypes:
return None
if len(rtypes) > 1:
return Union[tuple([strcast(r[1], True) for r in rtypes])]
return strcast(rtypes[0][1], False)
def return_type(defs):
if not defs:
return None
rtypes = _registry.getObj(_types[defs])
if not rtypes:
return None
if len(rtypes) > 1:
for n, t in rtypes:
if n == "Error":
continue
return t
return rtypes[0][1]
def type_anno_func(func, defs, is_result=False):
annos = {}
if not defs:
return func
rtypes = _registry.getObj(_types[defs])
if is_result:
kn = "return"
if not rtypes:
annos[kn] = None
elif len(rtypes) > 1:
annos[kn] = Union[tuple([r[1] for r in rtypes])]
else:
annos[kn] = rtypes[0][1]
else:
for name, rtype in rtypes:
name = name_to_py(name)
annos[name] = rtype
func.__annotations__.update(annos)
return func
def ReturnMapping(cls):
# Annotate the method with a return Type
# so the value can be cast
def decorator(f):
@functools.wraps(f)
async def wrapper(*args, **kwargs):
nonlocal cls
reply = await f(*args, **kwargs)
if cls is None:
return reply
if 'error' in reply:
cls = CLASSES['Error']
if issubclass(cls, typing.Sequence):
result = []
item_cls = cls.__parameters__[0]
for item in reply:
result.append(item_cls.from_json(item))
"""
if 'error' in item:
cls = CLASSES['Error']
else:
cls = item_cls
result.append(cls.from_json(item))
"""
else:
result = cls.from_json(reply['response'])
return result
return wrapper
return decorator
def makeFunc(cls, name, params, result, async=True):
INDENT = " "
args = Args(params)
assignments = []
toschema = args.PyToSchemaMapping()
for arg in args._get_arg_str(False, False):
assignments.append("{}_params[\'{}\'] = {}".format(INDENT,
toschema[arg],
arg))
assignments = "\n".join(assignments)
res = retspec(result)
source = """
@ReturnMapping({rettype})
{async}def {name}(self{argsep}{args}):
'''
{docstring}
Returns -> {res}
'''
# map input types to rpc msg
_params = dict()
msg = dict(type='{cls.name}', request='{name}', version={cls.version}, params=_params)
{assignments}
reply = {await}self.rpc(msg)
return reply
"""
fsource = source.format(async="async " if async else "",
name=name,
argsep=", " if args else "",
args=args,
res=res,
rettype=result.__name__ if result else None,
docstring=textwrap.indent(args.get_doc(), INDENT),
cls=cls,
assignments=assignments,
await="await " if async else "")
ns = _getns()
exec(fsource, ns)
func = ns[name]
return func, fsource
def buildMethods(cls, capture):
properties = cls.schema['properties']
for methodname in sorted(properties):
method, source = _buildMethod(cls, methodname)
setattr(cls, methodname, method)
capture["{}Facade".format(cls.__name__)].write(source, depth=1)
def _buildMethod(cls, name):
params = None
result = None
method = cls.schema['properties'][name]
if 'properties' in method:
prop = method['properties']
spec = prop.get('Params')
if spec:
params = _types.get(spec['$ref'])
spec = prop.get('Result')
if spec:
if '$ref' in spec:
result = _types.get(spec['$ref'])
else:
result = SCHEMA_TO_PYTHON[spec['type']]
return makeFunc(cls, name, params, result)
def buildFacade(schema):
cls = type(schema.name, (Type,), dict(name=schema.name,
version=schema.version,
schema=schema))
source = """
class {name}Facade(Type):
name = '{name}'
version = {version}
schema = {schema}
""".format(name=schema.name,
version=schema.version,
schema=textwrap.indent(pprint.pformat(schema), " "))
return cls, source
class TypeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Type):
return obj.serialize()
return json.JSONEncoder.default(self, obj)
class Type:
def connect(self, connection):
self.connection = connection
async def rpc(self, msg):
result = await self.connection.rpc(msg, encoder=TypeEncoder)
return result
@classmethod
def from_json(cls, data):
if isinstance(data, cls):
return data
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError:
raise
d = {}
for k, v in (data or {}).items():
d[cls._toPy.get(k, k)] = v
try:
return cls(**d)
except TypeError:
raise
def serialize(self):
d = {}
for attr, tgt in self._toSchema.items():
d[tgt] = getattr(self, attr)
return d
def to_json(self):
return json.dumps(self.serialize())
class Schema(dict):
def __init__(self, schema):
self.name = schema['Name']
self.version = schema['Version']
self.update(schema['Schema'])
@classmethod
def referenceName(cls, ref):
if ref.startswith("#/definitions/"):
ref = ref.rsplit("/", 1)[-1]
return ref
def resolveDefinition(self, ref):
return self['definitions'][self.referenceName(ref)]
def deref(self, prop, name):
if not isinstance(prop, dict):
raise TypeError(prop)
if "$ref" not in prop:
return prop
target = self.resolveDefinition(prop["$ref"])
return target
def buildDefinitions(self):
# here we are building the types out
# anything in definitions is a type
# but these may contain references themselves
# so we dfs to the bottom and build upwards
# when a types is already in the registry
defs = self.get('definitions')
if not defs:
return
for d, data in defs.items():
if d in _registry and not d in NAUGHTY_CLASSES:
continue
node = self.deref(data, d)
kind = node.get("type")
if kind == "object":
result = self.buildObject(node, d)
elif kind == "array":
pass
_registry.register(d, self.version, result)
# XXX: This makes sure that the type gets added to the global
# _types dict even if no other type in the schema has a ref
# to it.
getRefType(d)
def buildObject(self, node, name=None, d=0):
# we don't need to build types recursively here
# they are all in definitions already
# we only want to include the type reference
# which we can derive from the name
struct = []
add = struct.append
props = node.get("properties")
pprops = node.get("patternProperties")
if props:
# Sort these so the __init__ arg list for each Type remains
# consistently ordered across regens of client.py
for p in sorted(props):
prop = props[p]
if "$ref" in prop:
add((p, refType(prop)))
else:
kind = prop['type']
if kind == "array":
add((p, self.buildArray(prop, d + 1)))
elif kind == "object":
struct.extend(self.buildObject(prop, p, d + 1))
else:
add((p, objType(prop)))
if pprops:
if ".*" not in pprops:
raise ValueError(
"Cannot handle actual pattern in patternProperties %s" %
pprops)
pprop = pprops[".*"]
if "$ref" in pprop:
add((name, Mapping[str, refType(pprop)]))
return struct
ppkind = pprop["type"]
if ppkind == "array":
add((name, self.buildArray(pprop, d + 1)))
else:
add((name, Mapping[str, SCHEMA_TO_PYTHON[ppkind]]))
if not struct and node.get('additionalProperties', False):
add((name, Mapping[str, SCHEMA_TO_PYTHON['object']]))
return struct
def buildArray(self, obj, d=0):
# return a sequence from an array in the schema
if "$ref" in obj:
return Sequence[refType(obj)]
else:
kind = obj.get("type")
if kind and kind == "array":
items = obj['items']
return self.buildArray(items, d + 1)
else:
return Sequence[objType(obj)]
def _getns():
ns = {'Type': Type,
'typing': typing,
'ReturnMapping': ReturnMapping
}
# Copy our types into the globals of the method
for facade in _registry:
ns[facade] = _registry.getObj(facade)
return ns
def make_factory(name):
if name in factories:
del factories[name]
factories[name].write("class {}(TypeFactory):\n pass\n\n".format(name))
def write_facades(captures, options):
"""
Write the Facades to the appropriate _client<version>.py
"""
for version in sorted(captures.keys()):
filename = "{}/_client{}.py".format(options.output_dir, version)
with open(filename, "w") as f:
f.write(HEADER)
f.write("from juju.client.facade import Type, ReturnMapping\n")
f.write("from juju.client._definitions import *\n\n")
for key in sorted(
[k for k in captures[version].keys() if "Facade" in k]):
print(captures[version][key], file=f)
# Return the last (most recent) version for use in other routines.
return version
def write_definitions(captures, options, version):
"""
Write auxillary (non versioned) classes to
_definitions.py The auxillary classes currently get
written redudantly into each capture object, so we can look in
one of them -- we just use the last one from the loop above.
"""
with open("{}/_definitions.py".format(options.output_dir), "w") as f:
f.write(HEADER)
f.write("from juju.client.facade import Type, ReturnMapping\n\n")
for key in sorted(
[k for k in captures[version].keys() if "Facade" not in k]):
print(captures[version][key], file=f)
def write_client(captures, options):
"""
Write the TypeFactory classes to _client.py, along with some
imports and tables so that we can look up versioned Facades.
"""
with open("{}/_client.py".format(options.output_dir), "w") as f:
f.write(HEADER)
f.write("from juju.client._definitions import *\n\n")
clients = ", ".join("_client{}".format(v) for v in captures)
f.write("from juju.client import " + clients + "\n\n")
f.write(CLIENT_TABLE.format(clients=",\n ".join(
['"{}": _client{}'.format(v, v) for v in captures])))
f.write(LOOKUP_FACADE)
f.write(TYPE_FACTORY)
for key in sorted([k for k in factories.keys() if "Facade" in k]):
print(factories[key], file=f)
def generate_facades(options):
captures = defaultdict(codegen.Capture)
schemas = {}
for p in sorted(glob(options.schema)):
if 'latest' in p:
juju_version = 'latest'
else:
try:
juju_version = re.search(JUJU_VERSION, p).group()
except AttributeError:
print("Cannot extract a juju version from {}".format(p))
print("Schemas must include a juju version in the filename")
raise SystemExit(1)
new_schemas = json.loads(Path(p).read_text("utf-8"))
schemas[juju_version] = [Schema(s) for s in new_schemas]
# Build all of the auxillary (unversioned) classes
# TODO: get rid of some of the excess trips through loops in the
# called functions.
for juju_version in sorted(schemas.keys()):
for schema in schemas[juju_version]:
schema.buildDefinitions()
buildTypes(schema, captures[schema.version])
# Build the Facade classes
for juju_version in sorted(schemas.keys()):
for schema in schemas[juju_version]:
cls, source = buildFacade(schema)
cls_name = "{}Facade".format(schema.name)
captures[schema.version].clear(cls_name)
# Make the factory class for _client.py
make_factory(cls_name)
# Make the actual class
captures[schema.version][cls_name].write(source)
# Build the methods for each Facade class.
buildMethods(cls, captures[schema.version])
# Mark this Facade class as being done for this version --
# helps mitigate some excessive looping.
CLASSES[schema.name] = cls
return captures
def setup():
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--schema", default="juju/client/schemas*")
parser.add_argument("-o", "--output_dir", default="juju/client")
options = parser.parse_args()
return options
def main():
options = setup()
# Generate some text blobs
captures = generate_facades(options)
# ... and write them out
last_version = write_facades(captures, options)
write_definitions(captures, options, last_version)
write_client(captures, options)
if __name__ == '__main__':
main()