self.append((name, rtype))
def do_explode(self, kind):
- if kind in basic_types:
+ if kind in basic_types or type(kind) is typing.TypeVar:
return False
if not issubclass(kind, (typing.Sequence,
typing.Mapping)):
for kind in sorted((k for k in _types if not isinstance(k, str)),
key=lambda x: str(x)):
name = _types[kind]
- args = Args(kind)
if name in classes:
continue
+ args = Args(kind)
source = ["""
class {}(Type):
_toSchema = {}
if arg_type in basic_types:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif issubclass(arg_type, typing.Sequence):
- value_type = arg_type_name.__parameters__[0]
+ value_type = (
+ arg_type_name.__parameters__[0]
+ if len(arg_type_name.__parameters__)
+ else None
+ )
if type(value_type) is typing.TypeVar:
source.append("{}self.{} = [{}.from_json(o) for o in {} or []]".format(
INDENT * 2, arg_name, strcast(value_type), arg_name))
else:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif issubclass(arg_type, typing.Mapping):
- value_type = arg_type_name.__parameters__[1]
+ value_type = (
+ arg_type_name.__parameters__[1]
+ if len(arg_type_name.__parameters__) > 1
+ else None
+ )
if type(value_type) is typing.TypeVar:
source.append("{}self.{} = {{k: {}.from_json(v) for k, v in ({} or dict()).items()}}".format(
INDENT * 2, arg_name, strcast(value_type), arg_name))
else:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
elif type(arg_type) is typing.TypeVar:
- source.append("{}self.{} = {}.from_json({})".format(
- INDENT * 2, arg_name, arg_type_name, arg_name))
+ source.append("{}self.{} = {}.from_json({}) if {} else None".format(
+ INDENT * 2, arg_name, arg_type_name, arg_name, arg_name))
else:
source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
reply = await f(*args, **kwargs)
if cls is None:
return reply
- if 'Error' in reply:
+ if 'error' in reply:
cls = classes['Error']
if issubclass(cls, typing.Sequence):
result = []
item_cls = cls.__parameters__[0]
for item in reply:
result.append(item_cls.from_json(item))
+ """
+ if 'error' in item:
+ cls = classes['Error']
+ else:
+ cls = item_cls
+ result.append(cls.from_json(item))
+ """
else:
- result = cls.from_json(reply['Response'])
+ result = cls.from_json(reply['response'])
return result
return wrapper
assignments = []
toschema = args.PyToSchemaMapping()
for arg in args._get_arg_str(False, False):
- assignments.append("{}params[\'{}\'] = {}".format(INDENT,
- toschema[arg],
- arg))
+ assignments.append("{}_params[\'{}\'] = {}".format(INDENT,
+ toschema[arg],
+ arg))
assignments = "\n".join(assignments)
res = retspec(result)
source = """
Returns -> {res}
'''
# map input types to rpc msg
- params = dict()
- msg = dict(Type='{cls.name}', Request='{name}', Version={cls.version}, Params=params)
+ _params = dict()
+ msg = dict(type='{cls.name}', request='{name}', version={cls.version}, params=_params)
{assignments}
reply = {await}self.rpc(msg)
return reply
version=schema.version,
schema=schema))
source = """
-class {name}(Type):
+class {name}Facade(Type):
name = '{name}'
version = {version}
schema = {schema}
for k, v in (data or {}).items():
d[cls._toPy.get(k, k)] = v
- return cls(**d)
+ try:
+ return cls(**d)
+ except TypeError:
+ raise
def serialize(self):
d = {}
add((name, self.buildArray(pprop, d + 1)))
else:
add((name, Mapping[str, SCHEMA_TO_PYTHON[ppkind]]))
- #print("{}{}".format(d * " ", struct))
+
+ if not struct and node.get('additionalProperties', False):
+ add((name, Mapping[str, SCHEMA_TO_PYTHON['object']]))
+
return struct
def buildArray(self, obj, d=0):
global classes
schemas = json.loads(Path(options.schema).read_text("utf-8"))
capture = codegen.CodeWriter()
- capture.write("""
-from juju.client.facade import Type, ReturnMapping
- """)
+ capture.write(textwrap.dedent("""\
+ # DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
+ # Changes will be overwritten/lost when the file is regenerated.
+
+ from juju.client.facade import Type, ReturnMapping
+
+ """))
schemas = [Schema(s) for s in schemas]
for schema in schemas: