Commit 11cfe1c4 authored by Mark Beierl's avatar Mark Beierl
Browse files

Scripts from MR#10 Hackfest


Signed-off-by: Mark Beierl's avatarbeierlm <mark.beierl@canonical.com>
parent 9ec20785
Pipeline #6172 failed with stage
in 9 seconds
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A helper to work with the Juju version."""
import os
import re
from functools import total_ordering
@total_ordering
class JujuVersion:
"""Helper to work with the Juju version.
It knows how to parse the ``JUJU_VERSION`` environment variable, and exposes different
capabilities according to the specific version, allowing also to compare with other
versions.
"""
PATTERN = r'''^
(?P<major>\d{1,9})\.(?P<minor>\d{1,9}) # <major> and <minor> numbers are always there
((?:\.|-(?P<tag>[a-z]+))(?P<patch>\d{1,9}))? # sometimes with .<patch> or -<tag><patch>
(\.(?P<build>\d{1,9}))?$ # and sometimes with a <build> number.
'''
def __init__(self, version):
m = re.match(self.PATTERN, version, re.VERBOSE)
if not m:
raise RuntimeError('"{}" is not a valid Juju version string'.format(version))
d = m.groupdict()
self.major = int(m.group('major'))
self.minor = int(m.group('minor'))
self.tag = d['tag'] or ''
self.patch = int(d['patch'] or 0)
self.build = int(d['build'] or 0)
def __repr__(self):
if self.tag:
s = '{}.{}-{}{}'.format(self.major, self.minor, self.tag, self.patch)
else:
s = '{}.{}.{}'.format(self.major, self.minor, self.patch)
if self.build > 0:
s += '.{}'.format(self.build)
return s
def __eq__(self, other):
if self is other:
return True
if isinstance(other, str):
other = type(self)(other)
elif not isinstance(other, JujuVersion):
raise RuntimeError('cannot compare Juju version "{}" with "{}"'.format(self, other))
return (
self.major == other.major
and self.minor == other.minor
and self.tag == other.tag
and self.build == other.build
and self.patch == other.patch)
def __lt__(self, other):
if self is other:
return False
if isinstance(other, str):
other = type(self)(other)
elif not isinstance(other, JujuVersion):
raise RuntimeError('cannot compare Juju version "{}" with "{}"'.format(self, other))
if self.major != other.major:
return self.major < other.major
elif self.minor != other.minor:
return self.minor < other.minor
elif self.tag != other.tag:
if not self.tag:
return False
elif not other.tag:
return True
return self.tag < other.tag
elif self.patch != other.patch:
return self.patch < other.patch
elif self.build != other.build:
return self.build < other.build
return False
@classmethod
def from_environ(cls) -> 'JujuVersion':
"""Build a JujuVersion from JUJU_VERSION."""
v = os.environ.get('JUJU_VERSION')
if v is None:
v = '0.0.0'
return cls(v)
def has_app_data(self) -> bool:
"""Determine whether this juju version knows about app data."""
return (self.major, self.minor, self.patch) >= (2, 7, 0)
def is_dispatch_aware(self) -> bool:
"""Determine whether this juju version knows about dispatch."""
return (self.major, self.minor, self.patch) >= (2, 8, 0)
def has_controller_storage(self) -> bool:
"""Determine whether this juju version supports controller-side storage."""
return (self.major, self.minor, self.patch) >= (2, 8, 0)
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Infrastructure for the opslib functionality."""
import logging
import os
import re
import sys
from ast import literal_eval
from importlib.util import module_from_spec
from importlib.machinery import ModuleSpec
from pkgutil import get_importer
from types import ModuleType
from typing import List
__all__ = ('use', 'autoimport')
logger = logging.getLogger(__name__)
_libraries = None
_libline_re = re.compile(r'''^LIB([A-Z]+)\s*=\s*([0-9]+|['"][a-zA-Z0-9_.\-@]+['"])''')
_libname_re = re.compile(r'''^[a-z][a-z0-9]+$''')
# Not perfect, but should do for now.
_libauthor_re = re.compile(r'''^[A-Za-z0-9_+.-]+@[a-z0-9_-]+(?:\.[a-z0-9_-]+)*\.[a-z]{2,3}$''')
def use(name: str, api: int, author: str) -> ModuleType:
"""Use a library from the ops libraries.
Args:
name: the name of the library requested.
api: the API version of the library.
author: the author of the library. If not given, requests the
one in the standard library.
Raises:
ImportError: if the library cannot be found.
TypeError: if the name, api, or author are the wrong type.
ValueError: if the name, api, or author are invalid.
"""
if not isinstance(name, str):
raise TypeError("invalid library name: {!r} (must be a str)".format(name))
if not isinstance(author, str):
raise TypeError("invalid library author: {!r} (must be a str)".format(author))
if not isinstance(api, int):
raise TypeError("invalid library API: {!r} (must be an int)".format(api))
if api < 0:
raise ValueError('invalid library api: {} (must be ≥0)'.format(api))
if not _libname_re.match(name):
raise ValueError("invalid library name: {!r} (chars and digits only)".format(name))
if not _libauthor_re.match(author):
raise ValueError("invalid library author email: {!r}".format(author))
if _libraries is None:
autoimport()
versions = _libraries.get((name, author), ())
for lib in versions:
if lib.api == api:
return lib.import_module()
others = ', '.join(str(lib.api) for lib in versions)
if others:
msg = 'cannot find "{}" from "{}" with API version {} (have {})'.format(
name, author, api, others)
else:
msg = 'cannot find library "{}" from "{}"'.format(name, author)
raise ImportError(msg, name=name)
def autoimport():
"""Find all libs in the path and enable use of them.
You only need to call this if you've installed a package or
otherwise changed sys.path in the current run, and need to see the
changes. Otherwise libraries are found on first call of `use`.
"""
global _libraries
_libraries = {}
for spec in _find_all_specs(sys.path):
lib = _parse_lib(spec)
if lib is None:
continue
versions = _libraries.setdefault((lib.name, lib.author), [])
versions.append(lib)
versions.sort(reverse=True)
def _find_all_specs(path):
for sys_dir in path:
if sys_dir == "":
sys_dir = "."
try:
top_dirs = os.listdir(sys_dir)
except (FileNotFoundError, NotADirectoryError):
continue
except OSError as e:
logger.debug("Tried to look for ops.lib packages under '%s': %s", sys_dir, e)
continue
logger.debug("Looking for ops.lib packages under '%s'", sys_dir)
for top_dir in top_dirs:
opslib = os.path.join(sys_dir, top_dir, 'opslib')
try:
lib_dirs = os.listdir(opslib)
except (FileNotFoundError, NotADirectoryError):
continue
except OSError as e:
logger.debug(" Tried '%s': %s", opslib, e) # *lots* of things checked here
continue
else:
logger.debug(" Trying '%s'", opslib)
finder = get_importer(opslib)
if finder is None:
logger.debug(" Finder for '%s' is None", opslib)
continue
if not hasattr(finder, 'find_spec'):
logger.debug(" Finder for '%s' has no find_spec", opslib)
continue
for lib_dir in lib_dirs:
spec_name = "{}.opslib.{}".format(top_dir, lib_dir)
spec = finder.find_spec(spec_name)
if spec is None:
logger.debug(" No spec for %r", spec_name)
continue
if spec.loader is None:
# a namespace package; not supported
logger.debug(" No loader for %r (probably a namespace package)", spec_name)
continue
logger.debug(" Found %r", spec_name)
yield spec
# only the first this many lines of a file are looked at for the LIB* constants
_MAX_LIB_LINES = 99
# these keys, with these types, are needed to have an opslib
_NEEDED_KEYS = {'NAME': str, 'AUTHOR': str, 'API': int, 'PATCH': int}
def _join_and(keys: List[str]) -> str:
if len(keys) == 0:
return ""
if len(keys) == 1:
return keys[0]
return ", ".join(keys[:-1]) + ", and " + keys[-1]
class _Missing:
"""Helper to get the difference between what was found and what was needed when logging."""
def __init__(self, found):
self._found = found
def __str__(self):
exp = set(_NEEDED_KEYS)
got = set(self._found)
if len(got) == 0:
return "missing {}".format(_join_and(sorted(exp)))
return "got {}, but missing {}".format(
_join_and(sorted(got)),
_join_and(sorted(exp - got)))
def _parse_lib(spec):
if spec.origin is None:
# "can't happen"
logger.warning("No origin for %r (no idea why; please report)", spec.name)
return None
logger.debug(" Parsing %r", spec.name)
try:
with open(spec.origin, 'rt', encoding='utf-8') as f:
libinfo = {}
for n, line in enumerate(f):
if len(libinfo) == len(_NEEDED_KEYS):
break
if n > _MAX_LIB_LINES:
logger.debug(
" Missing opslib metadata after reading to line %d: %s",
_MAX_LIB_LINES, _Missing(libinfo))
return None
m = _libline_re.match(line)
if m is None:
continue
key, value = m.groups()
if key in _NEEDED_KEYS:
value = literal_eval(value)
if not isinstance(value, _NEEDED_KEYS[key]):
logger.debug(
" Bad type for %s: expected %s, got %s",
key, _NEEDED_KEYS[key].__name__, type(value).__name__)
return None
libinfo[key] = value
else:
if len(libinfo) != len(_NEEDED_KEYS):
logger.debug(
" Missing opslib metadata after reading to end of file: %s",
_Missing(libinfo))
return None
except Exception as e:
logger.debug(" Failed: %s", e)
return None
lib = _Lib(spec, libinfo['NAME'], libinfo['AUTHOR'], libinfo['API'], libinfo['PATCH'])
logger.debug(" Success: found library %s", lib)
return lib
class _Lib:
def __init__(self, spec: ModuleSpec, name: str, author: str, api: int, patch: int):
self.spec = spec
self.name = name
self.author = author
self.api = api
self.patch = patch
self._module = None
def __repr__(self):
return "<_Lib {}>".format(self)
def __str__(self):
return "{0.name} by {0.author}, API {0.api}, patch {0.patch}".format(self)
def import_module(self) -> ModuleType:
if self._module is None:
module = module_from_spec(self.spec)
self.spec.loader.exec_module(module)
self._module = module
return self._module
def __eq__(self, other):
if not isinstance(other, _Lib):
return NotImplemented
a = (self.name, self.author, self.api, self.patch)
b = (other.name, other.author, other.api, other.patch)
return a == b
def __lt__(self, other):
if not isinstance(other, _Lib):
return NotImplemented
a = (self.name, self.author, self.api, self.patch)
b = (other.name, other.author, other.api, other.patch)
return a < b
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface to emit messages to the Juju logging system."""
import sys
import logging
class JujuLogHandler(logging.Handler):
"""A handler for sending logs to Juju via juju-log."""
def __init__(self, model_backend, level=logging.DEBUG):
super().__init__(level)
self.model_backend = model_backend
def emit(self, record):
"""Send the specified logging record to the Juju backend.
This method is not used directly by the Operator Framework code, but by
:class:`logging.Handler` itself as part of the logging machinery.
"""
self.model_backend.juju_log(record.levelname, self.format(record))
def setup_root_logging(model_backend, debug=False):
"""Setup python logging to forward messages to juju-log.
By default, logging is set to DEBUG level, and messages will be filtered by Juju.
Charmers can also set their own default log level with::
logging.getLogger().setLevel(logging.INFO)
model_backend -- a ModelBackend to use for juju-log
debug -- if True, write logs to stderr as well as to juju-log.
"""
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(JujuLogHandler(model_backend))
if debug:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
sys.excepthook = lambda etype, value, tb: logger.error(
"Uncaught exception while in charm code:", exc_info=(etype, value, tb))
# Copyright 2019-2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Main entry point to the Operator Framework."""
import inspect
import logging
import os
import shutil
import subprocess
import sys
import typing
import warnings
from pathlib import Path
import yaml
import ops.charm
import ops.framework
import ops.model
import ops.storage
from ops.log import setup_root_logging
from ops.jujuversion import JujuVersion
CHARM_STATE_FILE = '.unit-state.db'
logger = logging.getLogger()
def _exe_path(path: Path) -> typing.Optional[Path]:
"""Find and return the full path to the given binary.
Here path is the absolute path to a binary, but might be missing an extension.
"""
p = shutil.which(path.name, mode=os.F_OK, path=str(path.parent))
if p is None:
return None
return Path(p)
def _get_charm_dir():
charm_dir = os.environ.get("JUJU_CHARM_DIR")
if charm_dir is None:
# Assume $JUJU_CHARM_DIR/lib/op/main.py structure.
charm_dir = Path('{}/../../..'.format(__file__)).resolve()
else:
charm_dir = Path(charm_dir).resolve()
return charm_dir
def _create_event_link(charm, bound_event, link_to):
"""Create a symlink for a particular event.
charm -- A charm object.
bound_event -- An event for which to create a symlink.
link_to -- What the event link should point to
"""
if issubclass(bound_event.event_type, ops.charm.HookEvent):
event_dir = charm.framework.charm_dir / 'hooks'
event_path = event_dir / bound_event.event_kind.replace('_', '-')
elif issubclass(bound_event.event_type, ops.charm.ActionEvent):
if not bound_event.event_kind.endswith("_action"):
raise RuntimeError(
'action event name {} needs _action suffix'.format(bound_event.event_kind))
event_dir = charm.framework.charm_dir / 'actions'
# The event_kind is suffixed with "_action" while the executable is not.
event_path = event_dir / bound_event.event_kind[:-len('_action')].replace('_', '-')
else:
raise RuntimeError(
'cannot create a symlink: unsupported event type {}'.format(bound_event.event_type))
event_dir.mkdir(exist_ok=True)
if not event_path.exists():
target_path = os.path.relpath(link_to, str(event_dir))
# Ignore the non-symlink files or directories
# assuming the charm author knows what they are doing.
logger.debug(
'Creating a new relative symlink at %s pointing to %s',
event_path, target_path)
event_path.symlink_to(target_path)
def _setup_event_links(charm_dir, charm):
"""Set up links for supported events that originate from Juju.
Whether a charm can handle an event or not can be determined by
introspecting which events are defined on it.
Hooks or actions are created as symlinks to the charm code file
which is determined by inspecting symlinks provided by the charm
author at hooks/install or hooks/start.
charm_dir -- A root directory of the charm.
charm -- An instance of the Charm class.
"""
# XXX: on windows this function does not accomplish what it wants to:
# it creates symlinks with no extension pointing to a .py
# and juju only knows how to handle .exe, .bat, .cmd, and .ps1
# so it does its job, but does not accomplish anything as the
# hooks aren't 'callable'.
link_to = os.path.realpath(os.environ.get("JUJU_DISPATCH_PATH", sys.argv[0]))
for bound_event in charm.on.events().values():
# Only events that originate from Juju need symlinks.
if issubclass(bound_event.event_type, (ops.charm.HookEvent, ops.charm.ActionEvent)):
_create_event_link(charm, bound_event, link_to)
def _emit_charm_event(charm, event_name):
"""Emits a charm event based on a Juju event name.
charm -- A charm instance to emit an event from.
event_name -- A Juju event name to emit on a charm.
"""
event_to_emit = None
try:
event_to_emit = getattr(charm.on, event_name)
except AttributeError:
logger.debug("Event %s not defined for %s.", event_name, charm)
# If the event is not supported by the charm implementation, do
# not error out or try to emit it. This is to support rollbacks.
if event_to_emit is not None:
args, kwargs = _get_event_args(charm, event_to_emit)
logger.debug('Emitting Juju event %s.', event_name)
event_to_emit.emit(*args, **kwargs)
def _get_event_args(charm, bound_event):
event_type = bound_event.event_type
model = charm.framework.model
if issubclass(event_type, ops.charm.RelationEvent):
relation_name = os.environ['JUJU_RELATION']
relation_id = int(os.environ['JUJU_RELATION_ID'].split(':')[-1])
relation = model.get_relation(relation_name, relation_id)
else:
relation = None
remote_app_name = os.environ.get('JUJU_REMOTE_APP', '')
remote_unit_name = os.environ.get('JUJU_REMOTE_UNIT', '')
if remote_app_name or remote_unit_name:
if not remote_app_name:
if '/' not in remote_unit_name:
raise RuntimeError('invalid remote unit name: {}'.format(remote_unit_name))
remote_app_name = remote_unit_name.split('/')[0]
args = [relation, model.get_app(remote_app_name)]
if remote_unit_name:
args.append(model.get_unit(remote_unit_name))
return args, {}
elif relation:
return [relation], {}
return [], {}
class _Dispatcher:
"""Encapsulate how to figure out what event Juju wants us to run.
Also knows how to run “legacy” hooks when Juju called us via a top-level
``dispatch`` binary.
Args:
charm_dir: the toplevel directory of the charm
Attributes:
event_name: the name of the event to run
is_dispatch_aware: are we running under a Juju that knows about the
dispatch binary, and is that binary present?
"""
def __init__(self, charm_dir: Path):
self._charm_dir = charm_dir
self._exec_path = Path(os.environ.get('JUJU_DISPATCH_PATH', sys.argv[0]))
dispatch = charm_dir / 'dispatch'
if JujuVersion.from_environ().is_dispatch_aware() and _exe_path(dispatch) is not None:
self._init_dispatch()
else:
self._init_legacy()
def ensure_event_links(self, charm):
"""Make sure necessary symlinks are present on disk."""
if self.is_dispatch_aware:
# links aren't needed
return
# When a charm is force-upgraded and a unit is in an error state Juju
# does not run upgrade-charm and instead runs the failed hook followed
# by config-changed. Given the nature of force-upgrading the hook setup
# code is not triggered on config-changed.
#
# 'start' event is included as Juju does not fire the install event for
# K8s charms (see LP: #1854635).
if (self.event_name in ('install', 'start', 'upgrade_charm')
or self.event_name.endswith('_storage_attached')):
_setup_event_links(self._charm_dir, charm)
def run_any_legacy_hook(self):
"""Run any extant legacy hook.
If there is both a dispatch file and a legacy hook for the
current event, run the wanted legacy hook.
"""
if not self.is_dispatch_aware:
# we *are* the legacy hook
return
dispatch_path = _exe_path(self._charm_dir / self._dispatch_path)
if dispatch_path is None:
logger.debug("Legacy %s does not exist.", self._dispatch_path)
return
# super strange that there isn't an is_executable
if not os.access(str(dispatch_path), os.X_OK):
logger.warning("Legacy %s exists but is not executable.", self._dispatch_path)
return
if dispatch_path.resolve() == Path(sys.argv[0]).resolve():
logger.debug("Legacy %s is just a link to ourselves.", self._dispatch_path)
return
argv = sys.argv.copy()
argv[0] = str(dispatch_path)
logger.info("Running legacy %s.", self._dispatch_path)
try:
subprocess.run(argv, check=True)
except subprocess.CalledProcessError as e:
logger.warning("Legacy %s exited with status %d.", self._dispatch_path, e.returncode)
sys.exit(e.returncode)
except OSError as e:
logger.warning("Unable to run legacy %s: %s", self._dispatch_path, e)
sys.exit(1)
else:
logger.debug("Legacy %s exited with status 0.", self._dispatch_path)
def _set_name_from_path(self, path: Path):
"""Sets the name attribute to that which can be inferred from the given path."""
name = path.name.replace('-', '_')
if path.parent.name == 'actions':
name = '{}_action'.format(name)
self.event_name = name
def _init_legacy(self):
"""Set up the 'legacy' dispatcher.
The current Juju doesn't know about 'dispatch' and calls hooks
explicitly.
"""
self.is_dispatch_aware = False
self._set_name_from_path(self._exec_path)
def _init_dispatch(self):
"""Set up the new 'dispatch' dispatcher.
The current Juju will run 'dispatch' if it exists, and otherwise fall
back to the old behaviour.
JUJU_DISPATCH_PATH will be set to the wanted hook, e.g. hooks/install,
in both cases.
"""
self._dispatch_path = Path(os.environ['JUJU_DISPATCH_PATH'])
if 'OPERATOR_DISPATCH' in os.environ:
logger.debug("Charm called itself via %s.", self._dispatch_path)
sys.exit(0)
os.environ['OPERATOR_DISPATCH'] = '1'
self.is_dispatch_aware = True
self._set_name_from_path(self._dispatch_path)
def is_restricted_context(self):
"""Return True if we are running in a restricted Juju context.
When in a restricted context, most commands (relation-get, config-get,
state-get) are not available. As such, we change how we interact with
Juju.
"""
return self.event_name in ('collect_metrics',)
def _should_use_controller_storage(db_path: Path, meta: ops.charm.CharmMeta) -> bool:
"""Figure out whether we want to use controller storage or not."""
# if you've previously used local state, carry on using that
if db_path.exists():
logger.debug("Using local storage: %s already exists", db_path)
return False
# if you're not in k8s you don't need controller storage
if 'kubernetes' not in meta.series:
logger.debug("Using local storage: not a kubernetes charm")
return False
# are we in a new enough Juju?
cur_version = JujuVersion.from_environ()
if cur_version.has_controller_storage():
logger.debug("Using controller storage: JUJU_VERSION=%s", cur_version)
return True
else:
logger.debug("Using local storage: JUJU_VERSION=%s", cur_version)
return False
def main(charm_class: ops.charm.CharmBase, use_juju_for_storage: bool = None):
"""Setup the charm and dispatch the observed event.
The event name is based on the way this executable was called (argv[0]).
Args:
charm_class: your charm class.
use_juju_for_storage: whether to use controller-side storage. If not specified
then kubernetes charms that haven't previously used local storage and that
are running on a new enough Juju default to controller-side storage,
otherwise local storage is used.
"""
charm_dir = _get_charm_dir()
model_backend = ops.model._ModelBackend()
debug = ('JUJU_DEBUG' in os.environ)
setup_root_logging(model_backend, debug=debug)
logger.debug("Operator Framework %s up and running.", ops.__version__)
dispatcher = _Dispatcher(charm_dir)
dispatcher.run_any_legacy_hook()
metadata = (charm_dir / 'metadata.yaml').read_text()
actions_meta = charm_dir / 'actions.yaml'
if actions_meta.exists():
actions_metadata = actions_meta.read_text()
else:
actions_metadata = None
if not yaml.__with_libyaml__:
logger.debug('yaml does not have libyaml extensions, using slower pure Python yaml loader')
meta = ops.charm.CharmMeta.from_yaml(metadata, actions_metadata)
model = ops.model.Model(meta, model_backend)
charm_state_path = charm_dir / CHARM_STATE_FILE
if use_juju_for_storage and not ops.storage.juju_backend_available():
# raise an exception; the charm is broken and needs fixing.
msg = 'charm set use_juju_for_storage=True, but Juju version {} does not support it'
raise RuntimeError(msg.format(JujuVersion.from_environ()))
if use_juju_for_storage is None:
use_juju_for_storage = _should_use_controller_storage(charm_state_path, meta)
if use_juju_for_storage:
if dispatcher.is_restricted_context():
# TODO: jam 2020-06-30 This unconditionally avoids running a collect metrics event
# Though we eventually expect that juju will run collect-metrics in a
# non-restricted context. Once we can determine that we are running collect-metrics
# in a non-restricted context, we should fire the event as normal.
logger.debug('"%s" is not supported when using Juju for storage\n'
'see: https://github.com/canonical/operator/issues/348',
dispatcher.event_name)
# Note that we don't exit nonzero, because that would cause Juju to rerun the hook
return
store = ops.storage.JujuStorage()
else:
store = ops.storage.SQLiteStorage(charm_state_path)
framework = ops.framework.Framework(store, charm_dir, meta, model)
framework.set_breakpointhook()
try:
sig = inspect.signature(charm_class)
try:
sig.bind(framework)
except TypeError:
msg = (
"the second argument, 'key', has been deprecated and will be "
"removed after the 0.7 release")
warnings.warn(msg, DeprecationWarning)
charm = charm_class(framework, None)
else:
charm = charm_class(framework)
dispatcher.ensure_event_links(charm)
# TODO: Remove the collect_metrics check below as soon as the relevant
# Juju changes are made. Also adjust the docstring on
# EventBase.defer().
#
# Skip reemission of deferred events for collect-metrics events because
# they do not have the full access to all hook tools.
if not dispatcher.is_restricted_context():
framework.reemit()
_emit_charm_event(charm, dispatcher.event_name)
framework.commit()
finally:
framework.close()
__all__ = ['Composer', 'ComposerError']
from .error import MarkedYAMLError
from .events import *
from .nodes import *
class ComposerError(MarkedYAMLError):
pass
class Composer:
def __init__(self):
self.anchors = {}
def check_node(self):
# Drop the STREAM-START event.
if self.check_event(StreamStartEvent):
self.get_event()
# If there are more documents available?
return not self.check_event(StreamEndEvent)
def get_node(self):
# Get the root node of the next document.
if not self.check_event(StreamEndEvent):
return self.compose_document()
def get_single_node(self):
# Drop the STREAM-START event.
self.get_event()
# Compose a document if the stream is not empty.
document = None
if not self.check_event(StreamEndEvent):
document = self.compose_document()
# Ensure that the stream contains no more documents.
if not self.check_event(StreamEndEvent):
event = self.get_event()
raise ComposerError("expected a single document in the stream",
document.start_mark, "but found another document",
event.start_mark)
# Drop the STREAM-END event.
self.get_event()
return document
def compose_document(self):
# Drop the DOCUMENT-START event.
self.get_event()
# Compose the root node.
node = self.compose_node(None, None)
# Drop the DOCUMENT-END event.
self.get_event()
self.anchors = {}
return node
def compose_node(self, parent, index):
if self.check_event(AliasEvent):
event = self.get_event()
anchor = event.anchor
if anchor not in self.anchors:
raise ComposerError(None, None, "found undefined alias %r"
% anchor, event.start_mark)
return self.anchors[anchor]
event = self.peek_event()
anchor = event.anchor
if anchor is not None:
if anchor in self.anchors:
raise ComposerError("found duplicate anchor %r; first occurrence"
% anchor, self.anchors[anchor].start_mark,
"second occurrence", event.start_mark)
self.descend_resolver(parent, index)
if self.check_event(ScalarEvent):
node = self.compose_scalar_node(anchor)
elif self.check_event(SequenceStartEvent):
node = self.compose_sequence_node(anchor)
elif self.check_event(MappingStartEvent):
node = self.compose_mapping_node(anchor)
self.ascend_resolver()
return node
def compose_scalar_node(self, anchor):
event = self.get_event()
tag = event.tag
if tag is None or tag == '!':
tag = self.resolve(ScalarNode, event.value, event.implicit)
node = ScalarNode(tag, event.value,
event.start_mark, event.end_mark, style=event.style)
if anchor is not None:
self.anchors[anchor] = node
return node
def compose_sequence_node(self, anchor):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(SequenceNode, None, start_event.implicit)
node = SequenceNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
if anchor is not None:
self.anchors[anchor] = node
index = 0
while not self.check_event(SequenceEndEvent):
node.value.append(self.compose_node(node, index))
index += 1
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
def compose_mapping_node(self, anchor):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(MappingNode, None, start_event.implicit)
node = MappingNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
if anchor is not None:
self.anchors[anchor] = node
while not self.check_event(MappingEndEvent):
#key_event = self.peek_event()
item_key = self.compose_node(node, None)
#if item_key in node.value:
# raise ComposerError("while composing a mapping", start_event.start_mark,
# "found duplicate key", key_event.start_mark)
item_value = self.compose_node(node, item_key)
#node.value[item_key] = item_value
node.value.append((item_key, item_value))
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
__all__ = [
'CBaseLoader', 'CSafeLoader', 'CFullLoader', 'CUnsafeLoader', 'CLoader',
'CBaseDumper', 'CSafeDumper', 'CDumper'
]
from yaml._yaml import CParser, CEmitter
from .constructor import *
from .serializer import *
from .representer import *
from .resolver import *
class CBaseLoader(CParser, BaseConstructor, BaseResolver):
def __init__(self, stream):
CParser.__init__(self, stream)
BaseConstructor.__init__(self)
BaseResolver.__init__(self)
class CSafeLoader(CParser, SafeConstructor, Resolver):
def __init__(self, stream):
CParser.__init__(self, stream)
SafeConstructor.__init__(self)
Resolver.__init__(self)
class CFullLoader(CParser, FullConstructor, Resolver):
def __init__(self, stream):
CParser.__init__(self, stream)
FullConstructor.__init__(self)
Resolver.__init__(self)
class CUnsafeLoader(CParser, UnsafeConstructor, Resolver):
def __init__(self, stream):
CParser.__init__(self, stream)
UnsafeConstructor.__init__(self)
Resolver.__init__(self)
class CLoader(CParser, Constructor, Resolver):
def __init__(self, stream):
CParser.__init__(self, stream)
Constructor.__init__(self)
Resolver.__init__(self)
class CBaseDumper(CEmitter, BaseRepresenter, BaseResolver):
def __init__(self, stream,
default_style=None, default_flow_style=False,
canonical=None, indent=None, width=None,
allow_unicode=None, line_break=None,
encoding=None, explicit_start=None, explicit_end=None,
version=None, tags=None, sort_keys=True):
CEmitter.__init__(self, stream, canonical=canonical,
indent=indent, width=width, encoding=encoding,
allow_unicode=allow_unicode, line_break=line_break,
explicit_start=explicit_start, explicit_end=explicit_end,
version=version, tags=tags)
Representer.__init__(self, default_style=default_style,
default_flow_style=default_flow_style, sort_keys=sort_keys)
Resolver.__init__(self)
class CSafeDumper(CEmitter, SafeRepresenter, Resolver):
def __init__(self, stream,
default_style=None, default_flow_style=False,
canonical=None, indent=None, width=None,
allow_unicode=None, line_break=None,
encoding=None, explicit_start=None, explicit_end=None,
version=None, tags=None, sort_keys=True):
CEmitter.__init__(self, stream, canonical=canonical,
indent=indent, width=width, encoding=encoding,
allow_unicode=allow_unicode, line_break=line_break,
explicit_start=explicit_start, explicit_end=explicit_end,
version=version, tags=tags)
SafeRepresenter.__init__(self, default_style=default_style,
default_flow_style=default_flow_style, sort_keys=sort_keys)
Resolver.__init__(self)
class CDumper(CEmitter, Serializer, Representer, Resolver):
def __init__(self, stream,
default_style=None, default_flow_style=False,
canonical=None, indent=None, width=None,
allow_unicode=None, line_break=None,
encoding=None, explicit_start=None, explicit_end=None,
version=None, tags=None, sort_keys=True):
CEmitter.__init__(self, stream, canonical=canonical,
indent=indent, width=width, encoding=encoding,
allow_unicode=allow_unicode, line_break=line_break,
explicit_start=explicit_start, explicit_end=explicit_end,
version=version, tags=tags)
Representer.__init__(self, default_style=default_style,
default_flow_style=default_flow_style, sort_keys=sort_keys)
Resolver.__init__(self)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment