from pathlib import Path
import pprint
import textwrap
-from typing import Sequence, Mapping, TypeVar, Any, Union, Optional
+from typing import Sequence, Mapping, TypeVar, Any, Union
import typing
from . import codegen
_registry = KindRegistry()
classes = {}
+
def booler(v):
if isinstance(v, str):
if v == "false":
def strcast(kind, keep_builtins=False):
if issubclass(kind, typing.GenericMeta):
return str(kind)[1:]
+ if str(kind).startswith('~'):
+ return str(kind)[1:]
if (kind in basic_types or
type(kind) in basic_types) and keep_builtins is False:
return kind.__name__
class Args(list):
def __init__(self, defs):
self.defs = defs
- #self.append("self")
if defs:
rtypes = _registry.getObj(_types[defs])
if len(rtypes) == 1:
self.append((name, rtype))
def do_explode(self, kind):
- if kind in basic_types:
+ if kind in basic_types or type(kind) is typing.TypeVar:
return False
if not issubclass(kind, (typing.Sequence,
typing.Mapping)):
return parts
return ''
+ def as_kwargs(self):
+ if self:
+ parts = []
+ for item in self:
+ parts.append('{}=None'.format(name_to_py(item[0])))
+ return ', '.join(parts)
+ return ''
+
def typed(self):
return self._get_arg_str(True)
for kind in sorted((k for k in _types if not isinstance(k, str)),
key=lambda x: str(x)):
name = _types[kind]
- args = Args(kind)
if name in classes:
continue
+ args = Args(kind)
source = ["""
class {}(Type):
_toSchema = {}
def __init__(self{}{}):
'''
{}
- '''""".format(name,
- args.PyToSchemaMapping(),
- args.SchemaToPyMapping(),
- ", " if args else "",
- args,
- textwrap.indent(args.get_doc(), INDENT *2))
- #pprint.pformat(schema['definitions'][name]))
+ '''""".format(
+ name,
+ # pprint these to get stable ordering across regens
+ pprint.pformat(args.PyToSchemaMapping(), width=999),
+ pprint.pformat(args.SchemaToPyMapping(), width=999),
+ ", " if args else "",
+ args.as_kwargs(),
+ textwrap.indent(args.get_doc(), INDENT * 2))
]
assignments = args._get_arg_str(False, False)
- for assign in assignments:
- source.append("{}self.{} = {}".format(INDENT * 2, assign, assign))
- if not assignments:
- source.append("{}pass".format(INDENT *2))
+
+ if not args:
+ source.append("{}pass".format(INDENT * 2))
+ else:
+ for arg in args:
+ arg_name = name_to_py(arg[0])
+ arg_type = arg[1]
+ arg_type_name = strcast(arg_type)
+ if arg_type in basic_types:
+ source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
+ elif issubclass(arg_type, typing.Sequence):
+ value_type = (
+ arg_type_name.__parameters__[0]
+ if len(arg_type_name.__parameters__)
+ else None
+ )
+ if type(value_type) is typing.TypeVar:
+ source.append("{}self.{} = [{}.from_json(o) for o in {} or []]".format(
+ INDENT * 2, arg_name, strcast(value_type), arg_name))
+ else:
+ source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
+ elif issubclass(arg_type, typing.Mapping):
+ value_type = (
+ arg_type_name.__parameters__[1]
+ if len(arg_type_name.__parameters__) > 1
+ else None
+ )
+ if type(value_type) is typing.TypeVar:
+ source.append("{}self.{} = {{k: {}.from_json(v) for k, v in ({} or dict()).items()}}".format(
+ INDENT * 2, arg_name, strcast(value_type), arg_name))
+ else:
+ source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
+ elif type(arg_type) is typing.TypeVar:
+ source.append("{}self.{} = {}.from_json({}) if {} else None".format(
+ INDENT * 2, arg_name, arg_type_name, arg_name, arg_name))
+ else:
+ source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
+
source = "\n".join(source)
capture.write(source)
capture.write("\n\n")
# so the value can be cast
def decorator(f):
@functools.wraps(f)
- def wrapper(*args, **kwargs):
- reply = f(*args, **kwargs)
- if cls is None or reply:
+ async def wrapper(*args, **kwargs):
+ nonlocal cls
+ reply = await f(*args, **kwargs)
+ if cls is None:
return reply
- if 'Error' in reply:
- cls = Error
+ if 'error' in reply:
+ cls = classes['Error']
if issubclass(cls, typing.Sequence):
result = []
+ item_cls = cls.__parameters__[0]
for item in reply:
+ result.append(item_cls.from_json(item))
+ """
+ if 'error' in item:
+ cls = classes['Error']
+ else:
+ cls = item_cls
result.append(cls.from_json(item))
+ """
else:
- result = cls.from_json(reply)
+ result = cls.from_json(reply['response'])
return result
return wrapper
assignments = []
toschema = args.PyToSchemaMapping()
for arg in args._get_arg_str(False, False):
- assignments.append("{}params[\'{}\'] = {}".format(INDENT,
- toschema[arg],
- arg))
+ assignments.append("{}_params[\'{}\'] = {}".format(INDENT,
+ toschema[arg],
+ arg))
assignments = "\n".join(assignments)
res = retspec(result)
source = """
-#@ReturnMapping({rettype})
+@ReturnMapping({rettype})
{async}def {name}(self{argsep}{args}):
'''
{docstring}
Returns -> {res}
'''
# map input types to rpc msg
- params = dict()
- msg = dict(Type='{cls.name}', Request='{name}', Version={cls.version}, Params=params)
+ _params = dict()
+ msg = dict(type='{cls.name}', request='{name}', version={cls.version}, params=_params)
{assignments}
reply = {await}self.rpc(msg)
- return self._map(reply, {name})
+ return reply
"""
name=name,
argsep=", " if args else "",
args=args,
- #ressep= " -> " if res else "",
res=res,
rettype=result.__name__ if result else None,
docstring=textwrap.indent(args.get_doc(), INDENT),
version=schema.version,
schema=schema))
source = """
-class {name}(Type):
+class {name}Facade(Type):
name = '{name}'
version = {version}
schema = {schema}
return cls, source
+class TypeEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, Type):
+ return obj.serialize()
+ return json.JSONEncoder.default(self, obj)
+
+
class Type:
def connect(self, connection):
self.connection = connection
async def rpc(self, msg):
- result = await self.connection.rpc(msg)
+ result = await self.connection.rpc(msg, encoder=TypeEncoder)
return result
- def _map(self, reply, method):
- # Error, expected return or None
- if not reply:
- return None
-
- if 'Error' in reply:
- retcls = classes['Error']
- data = reply['Error']
- classes["Error"]
- elif 'Response' in reply:
- retcls = method.__return_type__
- data = reply['Response']
- return retcls.from_json(data)
-
@classmethod
def from_json(cls, data):
if isinstance(data, str):
data = json.loads(data)
- return cls(**data)
+ d = {}
+ for k, v in (data or {}).items():
+ d[cls._toPy.get(k, k)] = v
+
+ try:
+ return cls(**d)
+ except TypeError:
+ raise
def serialize(self):
d = {}
elif kind == "array":
pass
_registry.register(d, self.version, result)
+ # XXX: This makes sure that the type gets added to the global
+ # _types dict even if no other type in the schema has a ref
+ # to it.
+ getRefType(d)
def buildObject(self, node, name=None, d=0):
# we don't need to build types recursively here
props = node.get("properties")
pprops = node.get("patternProperties")
if props:
- for p, prop in props.items():
+ # Sort these so the __init__ arg list for each Type remains
+ # consistently ordered across regens of client.py
+ for p in sorted(props):
+ prop = props[p]
if "$ref" in prop:
add((p, refType(prop)))
else:
if pprops:
if ".*" not in pprops:
raise ValueError(
- "Cannot handle actual pattern in patterProperties %s" %
+ "Cannot handle actual pattern in patternProperties %s" %
pprops)
pprop = pprops[".*"]
if "$ref" in pprop:
add((name, self.buildArray(pprop, d + 1)))
else:
add((name, Mapping[str, SCHEMA_TO_PYTHON[ppkind]]))
- #print("{}{}".format(d * " ", struct))
+
+ if not struct and node.get('additionalProperties', False):
+ add((name, Mapping[str, SCHEMA_TO_PYTHON['object']]))
+
return struct
def buildArray(self, obj, d=0):
kind = obj.get("type")
if kind and kind == "array":
items = obj['items']
- return self.buildArray(items, d+1)
+ return self.buildArray(items, d + 1)
else:
return Sequence[objType(obj)]
global classes
schemas = json.loads(Path(options.schema).read_text("utf-8"))
capture = codegen.CodeWriter()
- capture.write("""
-from libjuju.facade import Type, ReturnMapping
- """)
+ capture.write(textwrap.dedent("""\
+ # DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
+ # Changes will be overwritten/lost when the file is regenerated.
+
+ from juju.client.facade import Type, ReturnMapping
+
+ """))
schemas = [Schema(s) for s in schemas]
for schema in schemas: