Commit 0d920a77 authored by Mark Beierl's avatar Mark Beierl
Browse files

Change srsLTE to Full Native Charm



Changes the charm from relying on a prepackaged image to
building srsue and srsenb from tagged source.
Updates the descriptor to use generic name for relation.
Signed-off-by: Mark Beierl's avatarbeierlm <mark.beierl@canonical.com>
parent 8bc30d96
Pipeline #71 passed with stage
in 1 minute and 22 seconds
Copyright (c) 2017-2020 Ingy döt Net
Copyright (c) 2006-2016 Kirill Simonov
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Metadata-Version: 2.1
Name: PyYAML
Version: 5.3.1
Summary: YAML parser and emitter for Python
Home-page: https://github.com/yaml/pyyaml
Author: Kirill Simonov
Author-email: xi@resolvent.net
License: MIT
Download-URL: https://pypi.org/project/PyYAML/
Platform: Any
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Cython
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Text Processing :: Markup
Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*
YAML is a data serialization format designed for human readability
and interaction with scripting languages. PyYAML is a YAML parser
and emitter for Python.
PyYAML features a complete YAML 1.1 parser, Unicode support, pickle
support, capable extension API, and sensible error messages. PyYAML
supports standard YAML tags and provides Python-specific tags that
allow to represent an arbitrary Python object.
PyYAML is applicable for a broad range of tasks from complex
configuration files to object serialization and persistence.
PyYAML-5.3.1.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
PyYAML-5.3.1.dist-info/LICENSE,sha256=xAESRJ8lS5dTBFklJIMT6ScO-jbSJrItgtTMbEPFfyk,1101
PyYAML-5.3.1.dist-info/METADATA,sha256=xTsZFjd8T4M-5rC2M3BHgx_KTTpEPy5vFDIXrbzRXPQ,1758
PyYAML-5.3.1.dist-info/RECORD,,
PyYAML-5.3.1.dist-info/WHEEL,sha256=rjs0dbLt66v8mjOfCbR-uj5Usgl2OLAlmRdCEyNo0L4,104
PyYAML-5.3.1.dist-info/top_level.txt,sha256=rpj0IVMTisAjh_1vG3Ccf9v5jpCQwAz6cD1IVU5ZdhQ,11
yaml/__init__.py,sha256=XFUNbKTg4afAd0BETjGQ1mKQ97_g5jbE1C0WoKc74dc,13170
yaml/__pycache__/__init__.cpython-37.pyc,,
yaml/__pycache__/composer.cpython-37.pyc,,
yaml/__pycache__/constructor.cpython-37.pyc,,
yaml/__pycache__/cyaml.cpython-37.pyc,,
yaml/__pycache__/dumper.cpython-37.pyc,,
yaml/__pycache__/emitter.cpython-37.pyc,,
yaml/__pycache__/error.cpython-37.pyc,,
yaml/__pycache__/events.cpython-37.pyc,,
yaml/__pycache__/loader.cpython-37.pyc,,
yaml/__pycache__/nodes.cpython-37.pyc,,
yaml/__pycache__/parser.cpython-37.pyc,,
yaml/__pycache__/reader.cpython-37.pyc,,
yaml/__pycache__/representer.cpython-37.pyc,,
yaml/__pycache__/resolver.cpython-37.pyc,,
yaml/__pycache__/scanner.cpython-37.pyc,,
yaml/__pycache__/serializer.cpython-37.pyc,,
yaml/__pycache__/tokens.cpython-37.pyc,,
yaml/composer.py,sha256=_Ko30Wr6eDWUeUpauUGT3Lcg9QPBnOPVlTnIMRGJ9FM,4883
yaml/constructor.py,sha256=O3Uaf0_J_5GQBoeI9ZNhpJAhtdagr_X2HzDgGbZOMnw,28627
yaml/cyaml.py,sha256=LiMkvchNonfoy1F6ec9L2BiUz3r0bwF4hympASJX1Ic,3846
yaml/dumper.py,sha256=PLctZlYwZLp7XmeUdwRuv4nYOZ2UBnDIUy8-lKfLF-o,2837
yaml/emitter.py,sha256=jghtaU7eFwg31bG0B7RZea_29Adi9CKmXq_QjgQpCkQ,43006
yaml/error.py,sha256=Ah9z-toHJUbE9j-M8YpxgSRM5CgLCcwVzJgLLRF2Fxo,2533
yaml/events.py,sha256=50_TksgQiE4up-lKo_V-nBy-tAIxkIPQxY5qDhKCeHw,2445
yaml/loader.py,sha256=UVa-zIqmkFSCIYq_PgSGm4NSJttHY2Rf_zQ4_b1fHN0,2061
yaml/nodes.py,sha256=gPKNj8pKCdh2d4gr3gIYINnPOaOxGhJAUiYhGRnPE84,1440
yaml/parser.py,sha256=ilWp5vvgoHFGzvOZDItFoGjD6D42nhlZrZyjAwa0oJo,25495
yaml/reader.py,sha256=0dmzirOiDG4Xo41RnuQS7K9rkY3xjHiVasfDMNTqCNw,6794
yaml/representer.py,sha256=82UM3ZxUQKqsKAF4ltWOxCS6jGPIFtXpGs7mvqyv4Xs,14184
yaml/resolver.py,sha256=DJCjpQr8YQCEYYjKEYqTl0GrsZil2H4aFOI9b0Oe-U4,8970
yaml/scanner.py,sha256=KeQIKGNlSyPE8QDwionHxy9CgbqE5teJEz05FR9-nAg,51277
yaml/serializer.py,sha256=ChuFgmhU01hj4xgI8GaKv6vfM2Bujwa9i7d2FAHj7cA,4165
yaml/tokens.py,sha256=lTQIzSVw8Mg9wv459-TjiOQe6wVziqaRlqX2_89rp54,2573
Wheel-Version: 1.0
Generator: bdist_wheel (0.35.1)
Root-Is-Purelib: false
Tag: cp37-cp37m-linux_x86_64
#!/home/mark/git/GitHub/charmed-osm/srs-enb-ue/venv/bin/python3
# -*- coding: utf-8 -*-
import re
import sys
from netaddr.cli import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())
# -*- coding: utf-8 -*-
"""Jinja is a template engine written in pure Python. It provides a
non-XML syntax that supports inline expressions and an optional
sandboxed environment.
"""
from markupsafe import escape
from markupsafe import Markup
from .bccache import BytecodeCache
from .bccache import FileSystemBytecodeCache
from .bccache import MemcachedBytecodeCache
from .environment import Environment
from .environment import Template
from .exceptions import TemplateAssertionError
from .exceptions import TemplateError
from .exceptions import TemplateNotFound
from .exceptions import TemplateRuntimeError
from .exceptions import TemplatesNotFound
from .exceptions import TemplateSyntaxError
from .exceptions import UndefinedError
from .filters import contextfilter
from .filters import environmentfilter
from .filters import evalcontextfilter
from .loaders import BaseLoader
from .loaders import ChoiceLoader
from .loaders import DictLoader
from .loaders import FileSystemLoader
from .loaders import FunctionLoader
from .loaders import ModuleLoader
from .loaders import PackageLoader
from .loaders import PrefixLoader
from .runtime import ChainableUndefined
from .runtime import DebugUndefined
from .runtime import make_logging_undefined
from .runtime import StrictUndefined
from .runtime import Undefined
from .utils import clear_caches
from .utils import contextfunction
from .utils import environmentfunction
from .utils import evalcontextfunction
from .utils import is_undefined
from .utils import select_autoescape
__version__ = "2.11.2"
# -*- coding: utf-8 -*-
# flake8: noqa
import marshal
import sys
PY2 = sys.version_info[0] == 2
PYPY = hasattr(sys, "pypy_translation_info")
_identity = lambda x: x
if not PY2:
unichr = chr
range_type = range
text_type = str
string_types = (str,)
integer_types = (int,)
iterkeys = lambda d: iter(d.keys())
itervalues = lambda d: iter(d.values())
iteritems = lambda d: iter(d.items())
import pickle
from io import BytesIO, StringIO
NativeStringIO = StringIO
def reraise(tp, value, tb=None):
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
ifilter = filter
imap = map
izip = zip
intern = sys.intern
implements_iterator = _identity
implements_to_string = _identity
encode_filename = _identity
marshal_dump = marshal.dump
marshal_load = marshal.load
else:
unichr = unichr
text_type = unicode
range_type = xrange
string_types = (str, unicode)
integer_types = (int, long)
iterkeys = lambda d: d.iterkeys()
itervalues = lambda d: d.itervalues()
iteritems = lambda d: d.iteritems()
import cPickle as pickle
from cStringIO import StringIO as BytesIO, StringIO
NativeStringIO = BytesIO
exec("def reraise(tp, value, tb=None):\n raise tp, value, tb")
from itertools import imap, izip, ifilter
intern = intern
def implements_iterator(cls):
cls.next = cls.__next__
del cls.__next__
return cls
def implements_to_string(cls):
cls.__unicode__ = cls.__str__
cls.__str__ = lambda x: x.__unicode__().encode("utf-8")
return cls
def encode_filename(filename):
if isinstance(filename, unicode):
return filename.encode("utf-8")
return filename
def marshal_dump(code, f):
if isinstance(f, file):
marshal.dump(code, f)
else:
f.write(marshal.dumps(code))
def marshal_load(f):
if isinstance(f, file):
return marshal.load(f)
return marshal.loads(f.read())
def with_metaclass(meta, *bases):
"""Create a base class with a metaclass."""
# This requires a bit of explanation: the basic idea is to make a
# dummy metaclass for one level of class instantiation that replaces
# itself with the actual metaclass.
class metaclass(type):
def __new__(cls, name, this_bases, d):
return meta(name, bases, d)
return type.__new__(metaclass, "temporary_class", (), {})
try:
from urllib.parse import quote_from_bytes as url_quote
except ImportError:
from urllib import quote as url_quote
try:
from collections import abc
except ImportError:
import collections as abc
try:
from os import fspath
except ImportError:
try:
from pathlib import PurePath
except ImportError:
PurePath = None
def fspath(path):
if hasattr(path, "__fspath__"):
return path.__fspath__()
# Python 3.5 doesn't have __fspath__ yet, use str.
if PurePath is not None and isinstance(path, PurePath):
return str(path)
return path
import re
# generated by scripts/generate_identifier_pattern.py
pattern = re.compile(
r"[\w·̀-ͯ·҃-֑҇-ׇֽֿׁׂׅׄؐ-ًؚ-ٰٟۖ-ۜ۟-۪ۤۧۨ-ܑۭܰ-݊ަ-ް߫-߳ࠖ-࠙ࠛ-ࠣࠥ-ࠧࠩ-࡙࠭-࡛ࣔ-ࣣ࣡-ःऺ-़ा-ॏ॑-ॗॢॣঁ-ঃ়া-ৄেৈো-্ৗৢৣਁ-ਃ਼ਾ-ੂੇੈੋ-੍ੑੰੱੵઁ-ઃ઼ા-ૅે-ૉો-્ૢૣଁ-ଃ଼ା-ୄେୈୋ-୍ୖୗୢୣஂா-ூெ-ைொ-்ௗఀ-ఃా-ౄె-ైొ-్ౕౖౢౣಁ-ಃ಼ಾ-ೄೆ-ೈೊ-್ೕೖೢೣഁ-ഃാ-ൄെ-ൈൊ-്ൗൢൣංඃ්ා-ුූෘ-ෟෲෳัิ-ฺ็-๎ັິ-ູົຼ່-ໍ༹༘༙༵༷༾༿ཱ-྄྆྇ྍ-ྗྙ-ྼ࿆ါ-ှၖ-ၙၞ-ၠၢ-ၤၧ-ၭၱ-ၴႂ-ႍႏႚ-ႝ፝-፟ᜒ-᜔ᜲ-᜴ᝒᝓᝲᝳ឴-៓៝᠋-᠍ᢅᢆᢩᤠ-ᤫᤰ-᤻ᨗ-ᨛᩕ-ᩞ᩠-᩿᩼᪰-᪽ᬀ-ᬄ᬴-᭄᭫-᭳ᮀ-ᮂᮡ-ᮭ᯦-᯳ᰤ-᰷᳐-᳔᳒-᳨᳭ᳲ-᳴᳸᳹᷀-᷵᷻-᷿‿⁀⁔⃐-⃥⃜⃡-⃰℘℮⳯-⵿⳱ⷠ-〪ⷿ-゙゚〯꙯ꙴ-꙽ꚞꚟ꛰꛱ꠂ꠆ꠋꠣ-ꠧꢀꢁꢴ-ꣅ꣠-꣱ꤦ-꤭ꥇ-꥓ꦀ-ꦃ꦳-꧀ꧥꨩ-ꨶꩃꩌꩍꩻ-ꩽꪰꪲ-ꪴꪷꪸꪾ꪿꫁ꫫ-ꫯꫵ꫶ꯣ-ꯪ꯬꯭ﬞ︀-️︠-︯︳︴﹍-﹏_𐇽𐋠𐍶-𐍺𐨁-𐨃𐨅𐨆𐨌-𐨏𐨸-𐨿𐨺𐫦𐫥𑀀-𑀂𑀸-𑁆𑁿-𑂂𑂰-𑂺𑄀-𑄂𑄧-𑅳𑄴𑆀-𑆂𑆳-𑇊𑇀-𑇌𑈬-𑈷𑈾𑋟-𑋪𑌀-𑌃𑌼𑌾-𑍄𑍇𑍈𑍋-𑍍𑍗𑍢𑍣𑍦-𑍬𑍰-𑍴𑐵-𑑆𑒰-𑓃𑖯-𑖵𑖸-𑗀𑗜𑗝𑘰-𑙀𑚫-𑚷𑜝-𑜫𑰯-𑰶𑰸-𑰿𑲒-𑲧𑲩-𑲶𖫰-𖫴𖬰-𖬶𖽑-𖽾𖾏-𖾒𛲝𛲞𝅥-𝅩𝅭-𝅲𝅻-𝆂𝆅-𝆋𝆪-𝆭𝉂-𝉄𝨀-𝨶𝨻-𝩬𝩵𝪄𝪛-𝪟𝪡-𝪯𞀀-𞀆𞀈-𞀘𞀛-𞀡𞀣𞀤𞀦-𞣐𞀪-𞣖𞥄-𞥊󠄀-󠇯]+" # noqa: B950
)
from functools import wraps
from . import filters
from .asyncsupport import auto_aiter
from .asyncsupport import auto_await
async def auto_to_seq(value):
seq = []
if hasattr(value, "__aiter__"):
async for item in value:
seq.append(item)
else:
for item in value:
seq.append(item)
return seq
async def async_select_or_reject(args, kwargs, modfunc, lookup_attr):
seq, func = filters.prepare_select_or_reject(args, kwargs, modfunc, lookup_attr)
if seq:
async for item in auto_aiter(seq):
if func(item):
yield item
def dualfilter(normal_filter, async_filter):
wrap_evalctx = False
if getattr(normal_filter, "environmentfilter", False) is True:
def is_async(args):
return args[0].is_async
wrap_evalctx = False
else:
has_evalctxfilter = getattr(normal_filter, "evalcontextfilter", False) is True
has_ctxfilter = getattr(normal_filter, "contextfilter", False) is True
wrap_evalctx = not has_evalctxfilter and not has_ctxfilter
def is_async(args):
return args[0].environment.is_async
@wraps(normal_filter)
def wrapper(*args, **kwargs):
b = is_async(args)
if wrap_evalctx:
args = args[1:]
if b:
return async_filter(*args, **kwargs)
return normal_filter(*args, **kwargs)
if wrap_evalctx:
wrapper.evalcontextfilter = True
wrapper.asyncfiltervariant = True
return wrapper
def asyncfiltervariant(original):
def decorator(f):
return dualfilter(original, f)
return decorator
@asyncfiltervariant(filters.do_first)
async def do_first(environment, seq):
try:
return await auto_aiter(seq).__anext__()
except StopAsyncIteration:
return environment.undefined("No first item, sequence was empty.")
@asyncfiltervariant(filters.do_groupby)
async def do_groupby(environment, value, attribute):
expr = filters.make_attrgetter(environment, attribute)
return [
filters._GroupTuple(key, await auto_to_seq(values))
for key, values in filters.groupby(
sorted(await auto_to_seq(value), key=expr), expr
)
]
@asyncfiltervariant(filters.do_join)
async def do_join(eval_ctx, value, d=u"", attribute=None):
return filters.do_join(eval_ctx, await auto_to_seq(value), d, attribute)
@asyncfiltervariant(filters.do_list)
async def do_list(value):
return await auto_to_seq(value)
@asyncfiltervariant(filters.do_reject)
async def do_reject(*args, **kwargs):
return async_select_or_reject(args, kwargs, lambda x: not x, False)
@asyncfiltervariant(filters.do_rejectattr)
async def do_rejectattr(*args, **kwargs):
return async_select_or_reject(args, kwargs, lambda x: not x, True)
@asyncfiltervariant(filters.do_select)
async def do_select(*args, **kwargs):
return async_select_or_reject(args, kwargs, lambda x: x, False)
@asyncfiltervariant(filters.do_selectattr)
async def do_selectattr(*args, **kwargs):
return async_select_or_reject(args, kwargs, lambda x: x, True)
@asyncfiltervariant(filters.do_map)
async def do_map(*args, **kwargs):
seq, func = filters.prepare_map(args, kwargs)
if seq:
async for item in auto_aiter(seq):
yield await auto_await(func(item))
@asyncfiltervariant(filters.do_sum)
async def do_sum(environment, iterable, attribute=None, start=0):
rv = start
if attribute is not None:
func = filters.make_attrgetter(environment, attribute)
else:
def func(x):
return x
async for item in auto_aiter(iterable):
rv += func(item)
return rv
@asyncfiltervariant(filters.do_slice)
async def do_slice(value, slices, fill_with=None):
return filters.do_slice(await auto_to_seq(value), slices, fill_with)
ASYNC_FILTERS = {
"first": do_first,
"groupby": do_groupby,
"join": do_join,
"list": do_list,
# we intentionally do not support do_last because that would be
# ridiculous
"reject": do_reject,
"rejectattr": do_rejectattr,
"map": do_map,
"select": do_select,
"selectattr": do_selectattr,
"sum": do_sum,
"slice": do_slice,
}
# -*- coding: utf-8 -*-
"""The code for async support. Importing this patches Jinja on supported
Python versions.
"""
import asyncio
import inspect
from functools import update_wrapper
from markupsafe import Markup
from .environment import TemplateModule
from .runtime import LoopContext
from .utils import concat
from .utils import internalcode
from .utils import missing
async def concat_async(async_gen):
rv = []
async def collect():
async for event in async_gen:
rv.append(event)
await collect()
return concat(rv)
async def generate_async(self, *args, **kwargs):
vars = dict(*args, **kwargs)
try:
async for event in self.root_render_func(self.new_context(vars)):
yield event
except Exception:
yield self.environment.handle_exception()
def wrap_generate_func(original_generate):
def _convert_generator(self, loop, args, kwargs):
async_gen = self.generate_async(*args, **kwargs)
try:
while 1:
yield loop.run_until_complete(async_gen.__anext__())
except StopAsyncIteration:
pass
def generate(self, *args, **kwargs):
if not self.environment.is_async:
return original_generate(self, *args, **kwargs)
return _convert_generator(self, asyncio.get_event_loop(), args, kwargs)
return update_wrapper(generate, original_generate)
async def render_async(self, *args, **kwargs):
if not self.environment.is_async:
raise RuntimeError("The environment was not created with async mode enabled.")
vars = dict(*args, **kwargs)
ctx = self.new_context(vars)
try:
return await concat_async(self.root_render_func(ctx))
except Exception:
return self.environment.handle_exception()
def wrap_render_func(original_render):
def render(self, *args, **kwargs):
if not self.environment.is_async:
return original_render(self, *args, **kwargs)
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.render_async(*args, **kwargs))
return update_wrapper(render, original_render)
def wrap_block_reference_call(original_call):
@internalcode
async def async_call(self):
rv = await concat_async(self._stack[self._depth](self._context))
if self._context.eval_ctx.autoescape:
rv = Markup(rv)
return rv
@internalcode
def __call__(self):
if not self._context.environment.is_async:
return original_call(self)
return async_call(self)
return update_wrapper(__call__, original_call)
def wrap_macro_invoke(original_invoke):
@internalcode
async def async_invoke(self, arguments, autoescape):
rv = await self._func(*arguments)
if autoescape:
rv = Markup(rv)
return rv
@internalcode
def _invoke(self, arguments, autoescape):
if not self._environment.is_async:
return original_invoke(self, arguments, autoescape)
return async_invoke(self, arguments, autoescape)
return update_wrapper(_invoke, original_invoke)
@internalcode
async def get_default_module_async(self):
if self._module is not None:
return self._module
self._module = rv = await self.make_module_async()
return rv
def wrap_default_module(original_default_module):
@internalcode
def _get_default_module(self):
if self.environment.is_async:
raise RuntimeError("Template module attribute is unavailable in async mode")
return original_default_module(self)
return _get_default_module
async def make_module_async(self, vars=None, shared=False, locals=None):
context = self.new_context(vars, shared, locals)
body_stream = []
async for item in self.root_render_func(context):
body_stream.append(item)
return TemplateModule(self, context, body_stream)
def patch_template():
from . import Template
Template.generate = wrap_generate_func(Template.generate)
Template.generate_async = update_wrapper(generate_async, Template.generate_async)
Template.render_async = update_wrapper(render_async, Template.render_async)
Template.render = wrap_render_func(Template.render)
Template._get_default_module = wrap_default_module(Template._get_default_module)
Template._get_default_module_async = get_default_module_async
Template.make_module_async = update_wrapper(
make_module_async, Template.make_module_async
)
def patch_runtime():
from .runtime import BlockReference, Macro
BlockReference.__call__ = wrap_block_reference_call(BlockReference.__call__)
Macro._invoke = wrap_macro_invoke(Macro._invoke)
def patch_filters():
from .filters import FILTERS
from .asyncfilters import ASYNC_FILTERS
FILTERS.update(ASYNC_FILTERS)
def patch_all():
patch_template()
patch_runtime()
patch_filters()
async def auto_await(value):
if inspect.isawaitable(value):
return await value
return value
async def auto_aiter(iterable):
if hasattr(iterable, "__aiter__"):
async for item in iterable:
yield item
return
for item in iterable:
yield item
class AsyncLoopContext(LoopContext):
_to_iterator = staticmethod(auto_aiter)
@property
async def length(self):
if self._length is not None:
return self._length
try:
self._length = len(self._iterable)
except TypeError:
iterable = [x async for x in self._iterator]
self._iterator = self._to_iterator(iterable)
self._length = len(iterable) + self.index + (self._after is not missing)
return self._length
@property
async def revindex0(self):
return await self.length - self.index
@property
async def revindex(self):
return await self.length - self.index0
async def _peek_next(self):
if self._after is not missing:
return self._after
try:
self._after = await self._iterator.__anext__()
except StopAsyncIteration:
self._after = missing
return self._after
@property
async def last(self):
return await self._peek_next() is missing
@property
async def nextitem(self):
rv = await self._peek_next()
if rv is missing:
return self._undefined("there is no next item")
return rv
def __aiter__(self):
return self
async def __anext__(self):
if self._after is not missing:
rv = self._after
self._after = missing
else:
rv = await self._iterator.__anext__()
self.index0 += 1
self._before = self._current
self._current = rv
return rv, self
async def make_async_loop_context(iterable, undefined, recurse=None, depth0=0):
import warnings
warnings.warn(
"This template must be recompiled with at least Jinja 2.11, or"
" it will fail in 3.0.",
DeprecationWarning,
stacklevel=2,
)
return AsyncLoopContext(iterable, undefined, recurse, depth0)
patch_all()
# -*- coding: utf-8 -*-
"""The optional bytecode cache system. This is useful if you have very
complex template situations and the compilation of all those templates
slows down your application too much.
Situations where this is useful are often forking web applications that
are initialized on the first request.
"""
import errno
import fnmatch
import os
import stat
import sys
import tempfile
from hashlib import sha1
from os import listdir
from os import path
from ._compat import BytesIO
from ._compat import marshal_dump
from ._compat import marshal_load
from ._compat import pickle
from ._compat import text_type
from .utils import open_if_exists
bc_version = 4
# Magic bytes to identify Jinja bytecode cache files. Contains the
# Python major and minor version to avoid loading incompatible bytecode
# if a project upgrades its Python version.
bc_magic = (
b"j2"
+ pickle.dumps(bc_version, 2)
+ pickle.dumps((sys.version_info[0] << 24) | sys.version_info[1], 2)
)
class Bucket(object):
"""Buckets are used to store the bytecode for one template. It's created
and initialized by the bytecode cache and passed to the loading functions.
The buckets get an internal checksum from the cache assigned and use this
to automatically reject outdated cache material. Individual bytecode
cache subclasses don't have to care about cache invalidation.
"""
def __init__(self, environment, key, checksum):
self.environment = environment
self.key = key
self.checksum = checksum
self.reset()
def reset(self):
"""Resets the bucket (unloads the bytecode)."""
self.code = None
def load_bytecode(self, f):
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
if magic != bc_magic:
self.reset()
return
# the source code of the file changed, we need to reload
checksum = pickle.load(f)
if self.checksum != checksum:
self.reset()
return
# if marshal_load fails then we need to reload
try:
self.code = marshal_load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
def write_bytecode(self, f):
"""Dump the bytecode into the file or file like object passed."""
if self.code is None:
raise TypeError("can't write empty bucket")
f.write(bc_magic)
pickle.dump(self.checksum, f, 2)
marshal_dump(self.code, f)
def bytecode_from_string(self, string):
"""Load bytecode from a string."""
self.load_bytecode(BytesIO(string))
def bytecode_to_string(self):
"""Return the bytecode as string."""
out = BytesIO()
self.write_bytecode(out)
return out.getvalue()
class BytecodeCache(object):
"""To implement your own bytecode cache you have to subclass this class
and override :meth:`load_bytecode` and :meth:`dump_bytecode`. Both of
these methods are passed a :class:`~jinja2.bccache.Bucket`.
A very basic bytecode cache that saves the bytecode on the file system::
from os import path
class MyCache(BytecodeCache):
def __init__(self, directory):
self.directory = directory
def load_bytecode(self, bucket):
filename = path.join(self.directory, bucket.key)
if path.exists(filename):
with open(filename, 'rb') as f:
bucket.load_bytecode(f)
def dump_bytecode(self, bucket):
filename = path.join(self.directory, bucket.key)
with open(filename, 'wb') as f:
bucket.write_bytecode(f)
A more advanced version of a filesystem based bytecode cache is part of
Jinja.
"""
def load_bytecode(self, bucket):
"""Subclasses have to override this method to load bytecode into a
bucket. If they are not able to find code in the cache for the
bucket, it must not do anything.
"""
raise NotImplementedError()
def dump_bytecode(self, bucket):
"""Subclasses have to override this method to write the bytecode
from a bucket back to the cache. If it unable to do so it must not
fail silently but raise an exception.
"""
raise NotImplementedError()
def clear(self):
"""Clears the cache. This method is not used by Jinja but should be
implemented to allow applications to clear the bytecode cache used
by a particular environment.
"""
def get_cache_key(self, name, filename=None):
"""Returns the unique hash key for this template name."""
hash = sha1(name.encode("utf-8"))
if filename is not None:
filename = "|" + filename
if isinstance(filename, text_type):
filename = filename.encode("utf-8")
hash.update(filename)
return hash.hexdigest()
def get_source_checksum(self, source):
"""Returns a checksum for the source."""
return sha1(source.encode("utf-8")).hexdigest()
def get_bucket(self, environment, name, filename, source):
"""Return a cache bucket for the given template. All arguments are
mandatory but filename may be `None`.
"""
key = self.get_cache_key(name, filename)
checksum = self.get_source_checksum(source)
bucket = Bucket(environment, key, checksum)
self.load_bytecode(bucket)
return bucket
def set_bucket(self, bucket):
"""Put the bucket into the cache."""
self.dump_bytecode(bucket)
class FileSystemBytecodeCache(BytecodeCache):
"""A bytecode cache that stores bytecode on the filesystem. It accepts
two arguments: The directory where the cache items are stored and a
pattern string that is used to build the filename.
If no directory is specified a default cache directory is selected. On
Windows the user's temp directory is used, on UNIX systems a directory
is created for the user in the system temp directory.
The pattern can be used to have multiple separate caches operate on the
same directory. The default pattern is ``'__jinja2_%s.cache'``. ``%s``
is replaced with the cache key.
>>> bcc = FileSystemBytecodeCache('/tmp/jinja_cache', '%s.cache')
This bytecode cache supports clearing of the cache using the clear method.
"""
def __init__(self, directory=None, pattern="__jinja2_%s.cache"):
if directory is None:
directory = self._get_default_cache_dir()
self.directory = directory
self.pattern = pattern
def _get_default_cache_dir(self):
def _unsafe_dir():
raise RuntimeError(
"Cannot determine safe temp directory. You "
"need to explicitly provide one."
)
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
if os.name == "nt":
return tmpdir
if not hasattr(os, "getuid"):
_unsafe_dir()
dirname = "_jinja2-cache-%d" % os.getuid()
actual_dir = os.path.join(tmpdir, dirname)
try:
os.mkdir(actual_dir, stat.S_IRWXU)
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
if (
actual_dir_stat.st_uid != os.getuid()
or not stat.S_ISDIR(actual_dir_stat.st_mode)
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU
):
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
if (
actual_dir_stat.st_uid != os.getuid()
or not stat.S_ISDIR(actual_dir_stat.st_mode)
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU
):
_unsafe_dir()
return actual_dir
def _get_cache_filename(self, bucket):
return path.join(self.directory, self.pattern % bucket.key)
def load_bytecode(self, bucket):
f = open_if_exists(self._get_cache_filename(bucket), "rb")
if f is not None:
try:
bucket.load_bytecode(f)
finally:
f.close()
def dump_bytecode(self, bucket):
f = open(self._get_cache_filename(bucket), "wb")
try:
bucket.write_bytecode(f)
finally:
f.close()
def clear(self):
# imported lazily here because google app-engine doesn't support
# write access on the file system and the function does not exist
# normally.
from os import remove
files = fnmatch.filter(listdir(self.directory), self.pattern % "*")
for filename in files:
try:
remove(path.join(self.directory, filename))
except OSError:
pass
class MemcachedBytecodeCache(BytecodeCache):
"""This class implements a bytecode cache that uses a memcache cache for
storing the information. It does not enforce a specific memcache library
(tummy's memcache or cmemcache) but will accept any class that provides
the minimal interface required.
Libraries compatible with this class:
- `cachelib <https://github.com/pallets/cachelib>`_
- `python-memcached <https://pypi.org/project/python-memcached/>`_
(Unfortunately the django cache interface is not compatible because it
does not support storing binary data, only unicode. You can however pass
the underlying cache client to the bytecode cache which is available
as `django.core.cache.cache._client`.)
The minimal interface for the client passed to the constructor is this:
.. class:: MinimalClientInterface
.. method:: set(key, value[, timeout])
Stores the bytecode in the cache. `value` is a string and
`timeout` the timeout of the key. If timeout is not provided
a default timeout or no timeout should be assumed, if it's
provided it's an integer with the number of seconds the cache
item should exist.
.. method:: get(key)
Returns the value for the cache key. If the item does not
exist in the cache the return value must be `None`.
The other arguments to the constructor are the prefix for all keys that
is added before the actual cache key and the timeout for the bytecode in
the cache system. We recommend a high (or no) timeout.
This bytecode cache does not support clearing of used items in the cache.
The clear method is a no-operation function.
.. versionadded:: 2.7
Added support for ignoring memcache errors through the
`ignore_memcache_errors` parameter.
"""
def __init__(
self,
client,
prefix="jinja2/bytecode/",
timeout=None,
ignore_memcache_errors=True,
):
self.client = client
self.prefix = prefix
self.timeout = timeout
self.ignore_memcache_errors = ignore_memcache_errors
def load_bytecode(self, bucket):
try:
code = self.client.get(self.prefix + bucket.key)
except Exception:
if not self.ignore_memcache_errors:
raise
code = None
if code is not None:
bucket.bytecode_from_string(code)
def dump_bytecode(self, bucket):
args = (self.prefix + bucket.key, bucket.bytecode_to_string())
if self.timeout is not None:
args += (self.timeout,)
try:
self.client.set(*args)
except Exception:
if not self.ignore_memcache_errors:
raise
# -*- coding: utf-8 -*-
#: list of lorem ipsum words used by the lipsum() helper function
LOREM_IPSUM_WORDS = u"""\
a ac accumsan ad adipiscing aenean aliquam aliquet amet ante aptent arcu at
auctor augue bibendum blandit class commodo condimentum congue consectetuer
consequat conubia convallis cras cubilia cum curabitur curae cursus dapibus
diam dictum dictumst dignissim dis dolor donec dui duis egestas eget eleifend
elementum elit enim erat eros est et etiam eu euismod facilisi facilisis fames
faucibus felis fermentum feugiat fringilla fusce gravida habitant habitasse hac
hendrerit hymenaeos iaculis id imperdiet in inceptos integer interdum ipsum
justo lacinia lacus laoreet lectus leo libero ligula litora lobortis lorem
luctus maecenas magna magnis malesuada massa mattis mauris metus mi molestie
mollis montes morbi mus nam nascetur natoque nec neque netus nibh nisi nisl non
nonummy nostra nulla nullam nunc odio orci ornare parturient pede pellentesque
penatibus per pharetra phasellus placerat platea porta porttitor posuere
potenti praesent pretium primis proin pulvinar purus quam quis quisque rhoncus
ridiculus risus rutrum sagittis sapien scelerisque sed sem semper senectus sit
sociis sociosqu sodales sollicitudin suscipit suspendisse taciti tellus tempor
tempus tincidunt torquent tortor tristique turpis ullamcorper ultrices
ultricies urna ut varius vehicula vel velit venenatis vestibulum vitae vivamus
viverra volutpat vulputate"""
# -*- coding: utf-8 -*-
from ._compat import range_type
from .filters import FILTERS as DEFAULT_FILTERS # noqa: F401
from .tests import TESTS as DEFAULT_TESTS # noqa: F401
from .utils import Cycler
from .utils import generate_lorem_ipsum
from .utils import Joiner
from .utils import Namespace
# defaults for the parser / lexer
BLOCK_START_STRING = "{%"
BLOCK_END_STRING = "%}"
VARIABLE_START_STRING = "{{"
VARIABLE_END_STRING = "}}"
COMMENT_START_STRING = "{#"
COMMENT_END_STRING = "#}"
LINE_STATEMENT_PREFIX = None
LINE_COMMENT_PREFIX = None
TRIM_BLOCKS = False
LSTRIP_BLOCKS = False
NEWLINE_SEQUENCE = "\n"
KEEP_TRAILING_NEWLINE = False
# default filters, tests and namespace
DEFAULT_NAMESPACE = {
"range": range_type,
"dict": dict,
"lipsum": generate_lorem_ipsum,
"cycler": Cycler,
"joiner": Joiner,
"namespace": Namespace,
}
# default policies
DEFAULT_POLICIES = {
"compiler.ascii_str": True,
"urlize.rel": "noopener",
"urlize.target": None,
"truncate.leeway": 5,
"json.dumps_function": None,
"json.dumps_kwargs": {"sort_keys": True},
"ext.i18n.trimmed": False,
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment