import argparse
import builtins
-from collections import defaultdict
import functools
-from glob import glob
import json
import keyword
-from pathlib import Path
import pprint
import re
import textwrap
-from typing import Sequence, Mapping, TypeVar, Any, Union
import typing
+from collections import defaultdict
+from glob import glob
+from pathlib import Path
+from typing import Any, Mapping, Sequence, TypeVar, Union
from . import codegen
# Classes and helper functions that we'll write to _client.py
LOOKUP_FACADE = '''
def lookup_facade(name, version):
- """
- Given a facade name and version, attempt to pull that facade out
- of the correct client<version>.py file.
+ """
+ Given a facade name and version, attempt to pull that facade out
+ of the correct client<version>.py file.
- """
- try:
- facade = getattr(CLIENTS[str(version)], name)
- except KeyError:
- raise ImportError("No facades found for version {}".format(version))
- except AttributeError:
- raise ImportError(
- "No facade with name '{}' in version {}".format(name, version))
- return facade
+ """
+ try:
+ facade = getattr(CLIENTS[str(version)], name)
+ except KeyError:
+ raise ImportError("No facades found for version {}".format(version))
+ except AttributeError:
+ raise ImportError(
+ "No facade with name '{}' in version {}".format(name, version))
+ return facade
'''
return self[refname]
+
_types = TypeRegistry()
_registry = KindRegistry()
CLASSES = {}
for kind in sorted((k for k in _types if not isinstance(k, str)),
key=lambda x: str(x)):
name = _types[kind]
- if name in capture and not name in NAUGHTY_CLASSES:
+ if name in capture and name not in NAUGHTY_CLASSES:
continue
args = Args(kind)
# Write Factory class for _client.py
pprint.pformat(args.SchemaToPyMapping(), width=999),
", " if args else "",
args.as_kwargs(),
- textwrap.indent(args.get_doc(), INDENT * 2))
- ]
- assignments = args._get_arg_str(False, False)
+ textwrap.indent(args.get_doc(), INDENT * 2))]
if not args:
source.append("{}pass".format(INDENT * 2))
arg_type = arg[1]
arg_type_name = strcast(arg_type)
if arg_type in basic_types:
- source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
+ source.append("{}self.{} = {}".format(INDENT * 2,
+ arg_name,
+ arg_name))
elif issubclass(arg_type, typing.Sequence):
value_type = (
arg_type_name.__parameters__[0]
else None
)
if type(value_type) is typing.TypeVar:
- source.append("{}self.{} = [{}.from_json(o) for o in {} or []]".format(
- INDENT * 2, arg_name, strcast(value_type), arg_name))
+ source.append(
+ "{}self.{} = [{}.from_json(o) "
+ "for o in {} or []]".format(INDENT * 2,
+ arg_name,
+ strcast(value_type),
+ arg_name))
else:
- source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
+ source.append("{}self.{} = {}".format(INDENT * 2,
+ arg_name,
+ arg_name))
elif issubclass(arg_type, typing.Mapping):
value_type = (
arg_type_name.__parameters__[1]
else None
)
if type(value_type) is typing.TypeVar:
- source.append("{}self.{} = {{k: {}.from_json(v) for k, v in ({} or dict()).items()}}".format(
- INDENT * 2, arg_name, strcast(value_type), arg_name))
+ source.append(
+ "{}self.{} = {{k: {}.from_json(v) "
+ "for k, v in ({} or dict()).items()}}".format(
+ INDENT * 2,
+ arg_name,
+ strcast(value_type),
+ arg_name))
else:
- source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
+ source.append("{}self.{} = {}".format(INDENT * 2,
+ arg_name,
+ arg_name))
elif type(arg_type) is typing.TypeVar:
- source.append("{}self.{} = {}.from_json({}) if {} else None".format(
- INDENT * 2, arg_name, arg_type_name, arg_name, arg_name))
+ source.append("{}self.{} = {}.from_json({}) "
+ "if {} else None".format(INDENT * 2,
+ arg_name,
+ arg_type_name,
+ arg_name,
+ arg_name))
else:
- source.append("{}self.{} = {}".format(INDENT * 2, arg_name, arg_name))
+ source.append("{}self.{} = {}".format(INDENT * 2,
+ arg_name,
+ arg_name))
source = "\n".join(source)
capture.clear(name)
'''
# map input types to rpc msg
_params = dict()
- msg = dict(type='{cls.name}', request='{name}', version={cls.version}, params=_params)
+ msg = dict(type='{cls.name}',
+ request='{name}',
+ version={cls.version},
+ params=_params)
{assignments}
reply = {await}self.rpc(msg)
return reply
return d
def to_json(self):
- return json.dumps(self.serialize())
+ return json.dumps(self.serialize(), cls=TypeEncoder, sort_keys=True)
class Schema(dict):
if not defs:
return
for d, data in defs.items():
- if d in _registry and not d in NAUGHTY_CLASSES:
+ if d in _registry and d not in NAUGHTY_CLASSES:
continue
node = self.deref(data, d)
kind = node.get("type")
return captures
+
def setup():
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--schema", default="juju/client/schemas*")
options = parser.parse_args()
return options
+
def main():
options = setup()
write_definitions(captures, options, last_version)
write_client(captures, options)
+
if __name__ == '__main__':
main()