self.append((name, rtype))
def do_explode(self, kind):
- if kind in basic_types:
+ if kind in basic_types or type(kind) is typing.TypeVar:
return False
if not issubclass(kind, (typing.Sequence,
typing.Mapping)):
for kind in sorted((k for k in _types if not isinstance(k, str)),
key=lambda x: str(x)):
name = _types[kind]
- args = Args(kind)
if name in classes:
continue
+ args = Args(kind)
source = ["""
class {}(Type):
_toSchema = {}
{}
'''""".format(
name,
- args.PyToSchemaMapping(),
- args.SchemaToPyMapping(),
+ # pprint these to get stable ordering across regens
+ pprint.pformat(args.PyToSchemaMapping(), width=999),
+ pprint.pformat(args.SchemaToPyMapping(), width=999),
", " if args else "",
args.as_kwargs(),
textwrap.indent(args.get_doc(), INDENT * 2))
if arg_type in basic_types:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif issubclass(arg_type, typing.Sequence):
- value_type = arg_type_name.__parameters__[0]
+ value_type = (
+ arg_type_name.__parameters__[0]
+ if len(arg_type_name.__parameters__)
+ else None
+ )
if type(value_type) is typing.TypeVar:
source.append("{}self.{} = [{}.from_json(o) for o in {} or []]".format(
INDENT * 2, arg_name, strcast(value_type), arg_name))
else:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif issubclass(arg_type, typing.Mapping):
- value_type = arg_type_name.__parameters__[1]
+ value_type = (
+ arg_type_name.__parameters__[1]
+ if len(arg_type_name.__parameters__) > 1
+ else None
+ )
if type(value_type) is typing.TypeVar:
source.append("{}self.{} = {{k: {}.from_json(v) for k, v in ({} or dict()).items()}}".format(
INDENT * 2, arg_name, strcast(value_type), arg_name))
item_cls = cls.__parameters__[0]
for item in reply:
result.append(item_cls.from_json(item))
+ """
+ if 'error' in item:
+ cls = classes['Error']
+ else:
+ cls = item_cls
+ result.append(cls.from_json(item))
+ """
else:
result = cls.from_json(reply['response'])
assignments = []
toschema = args.PyToSchemaMapping()
for arg in args._get_arg_str(False, False):
- assignments.append("{}params[\'{}\'] = {}".format(INDENT,
- toschema[arg],
- arg))
+ assignments.append("{}_params[\'{}\'] = {}".format(INDENT,
+ toschema[arg],
+ arg))
assignments = "\n".join(assignments)
res = retspec(result)
source = """
Returns -> {res}
'''
# map input types to rpc msg
- params = dict()
- msg = dict(Type='{cls.name}', Request='{name}', Version={cls.version}, Params=params)
+ _params = dict()
+ msg = dict(type='{cls.name}', request='{name}', version={cls.version}, params=_params)
{assignments}
reply = {await}self.rpc(msg)
return reply
try:
return cls(**d)
except TypeError:
- print(cls)
raise
def serialize(self):