Commit 17583c8b authored by Mark Beierl's avatar Mark Beierl
Browse files

Merge branch 'master' into 'master'

Update squid charm

See merge request !105
parents e2197769 9c012576
Pipeline #143 passed with stage
in 1 minute and 34 seconds
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import itertools
import os
import re
import subprocess
import sys
import tempfile
import unittest
from unittest.mock import patch
import autopep8
from flake8.api.legacy import get_style_guide
def get_python_filepaths():
"""Helper to retrieve paths of Python files."""
python_paths = ['setup.py']
for root in ['ops', 'test']:
for dirpath, dirnames, filenames in os.walk(root):
for filename in filenames:
if filename.endswith(".py"):
python_paths.append(os.path.join(dirpath, filename))
return python_paths
class InfrastructureTests(unittest.TestCase):
def test_pep8(self):
# verify all files are nicely styled
python_filepaths = get_python_filepaths()
style_guide = get_style_guide()
fake_stdout = io.StringIO()
with patch('sys.stdout', fake_stdout):
report = style_guide.check_files(python_filepaths)
# if flake8 didnt' report anything, we're done
if report.total_errors == 0:
return
# grab on which files we have issues
flake8_issues = fake_stdout.getvalue().split('\n')
broken_filepaths = {item.split(':')[0] for item in flake8_issues if item}
# give hints to the developer on how files' style could be improved
options = autopep8.parse_args([''])
options.aggressive = 1
options.diff = True
options.max_line_length = 99
issues = []
for filepath in broken_filepaths:
diff = autopep8.fix_file(filepath, options=options)
if diff:
issues.append(diff)
report = ["Please fix files as suggested by autopep8:"] + issues
report += ["\n-- Original flake8 reports:"] + flake8_issues
self.fail("\n".join(report))
def test_quote_backslashes(self):
# ensure we're not using unneeded backslash to escape strings
issues = []
for filepath in get_python_filepaths():
with open(filepath, "rt", encoding="utf8") as fh:
for idx, line in enumerate(fh, 1):
if (r'\"' in line or r"\'" in line) and "NOQA" not in line:
issues.append((filepath, idx, line.rstrip()))
if issues:
msgs = ["{}:{:d}:{}".format(*issue) for issue in issues]
self.fail("Spurious backslashes found, please fix these quotings:\n" + "\n".join(msgs))
def test_ensure_copyright(self):
# all non-empty Python files must have a proper copyright somewhere in the first 5 lines
issues = []
regex = re.compile(r"# Copyright \d\d\d\d(-\d\d\d\d)? Canonical Ltd.\n")
for filepath in get_python_filepaths():
if os.stat(filepath).st_size == 0:
continue
with open(filepath, "rt", encoding="utf8") as fh:
for line in itertools.islice(fh, 5):
if regex.match(line):
break
else:
issues.append(filepath)
if issues:
self.fail("Please add copyright headers to the following files:\n" + "\n".join(issues))
class ImportersTestCase(unittest.TestCase):
template = "from ops import {module_name}"
def test_imports(self):
mod_names = [
'charm',
'framework',
'main',
'model',
'testing',
]
for name in mod_names:
with self.subTest(name=name):
self.check(name)
def check(self, name):
"""Helper function to run the test."""
_, testfile = tempfile.mkstemp()
self.addCleanup(os.unlink, testfile)
with open(testfile, 'wt', encoding='utf8') as fh:
fh.write(self.template.format(module_name=name))
proc = subprocess.run([sys.executable, testfile], env={'PYTHONPATH': os.getcwd()})
self.assertEqual(proc.returncode, 0)
#!/usr/bin/env python3
# Copyright 2019 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from ops.jujuversion import JujuVersion
class TestJujuVersion(unittest.TestCase):
def test_parsing(self):
test_cases = [
("0.0.0", 0, 0, '', 0, 0),
("0.0.2", 0, 0, '', 2, 0),
("0.1.0", 0, 1, '', 0, 0),
("0.2.3", 0, 2, '', 3, 0),
("10.234.3456", 10, 234, '', 3456, 0),
("10.234.3456.1", 10, 234, '', 3456, 1),
("1.21-alpha12", 1, 21, 'alpha', 12, 0),
("1.21-alpha1.34", 1, 21, 'alpha', 1, 34),
("2.7", 2, 7, '', 0, 0)
]
for vs, major, minor, tag, patch, build in test_cases:
v = JujuVersion(vs)
self.assertEqual(v.major, major)
self.assertEqual(v.minor, minor)
self.assertEqual(v.tag, tag)
self.assertEqual(v.patch, patch)
self.assertEqual(v.build, build)
def test_parsing_errors(self):
invalid_versions = [
"xyz",
"foo.bar",
"foo.bar.baz",
"dead.beef.ca.fe",
"1234567890.2.1", # The major version is too long.
"0.2..1", # Two periods next to each other.
"1.21.alpha1", # Tag comes after period.
"1.21-alpha", # No patch number but a tag is present.
"1.21-alpha1beta", # Non-numeric string after the patch number.
"1.21-alpha-dev", # Tag duplication.
"1.21-alpha_dev3", # Underscore in a tag.
"1.21-alpha123dev3", # Non-numeric string after the patch number.
]
for v in invalid_versions:
with self.assertRaises(RuntimeError):
JujuVersion(v)
def test_equality(self):
test_cases = [
("1.0.0", "1.0.0", True),
("01.0.0", "1.0.0", True),
("10.0.0", "9.0.0", False),
("1.0.0", "1.0.1", False),
("1.0.1", "1.0.0", False),
("1.0.0", "1.1.0", False),
("1.1.0", "1.0.0", False),
("1.0.0", "2.0.0", False),
("1.2-alpha1", "1.2.0", False),
("1.2-alpha2", "1.2-alpha1", False),
("1.2-alpha2.1", "1.2-alpha2", False),
("1.2-alpha2.2", "1.2-alpha2.1", False),
("1.2-beta1", "1.2-alpha1", False),
("1.2-beta1", "1.2-alpha2.1", False),
("1.2-beta1", "1.2.0", False),
("1.2.1", "1.2.0", False),
("2.0.0", "1.0.0", False),
("2.0.0.0", "2.0.0", True),
("2.0.0.0", "2.0.0.0", True),
("2.0.0.1", "2.0.0.0", False),
("2.0.1.10", "2.0.0.0", False),
]
for a, b, expected in test_cases:
self.assertEqual(JujuVersion(a) == JujuVersion(b), expected)
self.assertEqual(JujuVersion(a) == b, expected)
def test_comparison(self):
test_cases = [
("1.0.0", "1.0.0", False, True),
("01.0.0", "1.0.0", False, True),
("10.0.0", "9.0.0", False, False),
("1.0.0", "1.0.1", True, True),
("1.0.1", "1.0.0", False, False),
("1.0.0", "1.1.0", True, True),
("1.1.0", "1.0.0", False, False),
("1.0.0", "2.0.0", True, True),
("1.2-alpha1", "1.2.0", True, True),
("1.2-alpha2", "1.2-alpha1", False, False),
("1.2-alpha2.1", "1.2-alpha2", False, False),
("1.2-alpha2.2", "1.2-alpha2.1", False, False),
("1.2-beta1", "1.2-alpha1", False, False),
("1.2-beta1", "1.2-alpha2.1", False, False),
("1.2-beta1", "1.2.0", True, True),
("1.2.1", "1.2.0", False, False),
("2.0.0", "1.0.0", False, False),
("2.0.0.0", "2.0.0", False, True),
("2.0.0.0", "2.0.0.0", False, True),
("2.0.0.1", "2.0.0.0", False, False),
("2.0.1.10", "2.0.0.0", False, False),
]
for a, b, expected_strict, expected_weak in test_cases:
self.assertEqual(JujuVersion(a) < JujuVersion(b), expected_strict)
self.assertEqual(JujuVersion(a) <= JujuVersion(b), expected_weak)
self.assertEqual(JujuVersion(b) > JujuVersion(a), expected_strict)
self.assertEqual(JujuVersion(b) >= JujuVersion(a), expected_weak)
# Implicit conversion.
self.assertEqual(JujuVersion(a) < b, expected_strict)
self.assertEqual(JujuVersion(a) <= b, expected_weak)
self.assertEqual(b > JujuVersion(a), expected_strict)
self.assertEqual(b >= JujuVersion(a), expected_weak)
if __name__ == "__main__":
unittest.main()
#!/usr/bin/python3
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import unittest
from unittest.mock import patch
import importlib
import logging
import ops.log
class FakeModelBackend:
def __init__(self):
self._calls = []
def calls(self, clear=False):
calls = self._calls
if clear:
self._calls = []
return calls
def juju_log(self, message, level):
self._calls.append((message, level))
def reset_logging():
logging.shutdown()
importlib.reload(logging)
class TestLogging(unittest.TestCase):
def setUp(self):
self.backend = FakeModelBackend()
reset_logging()
self.addCleanup(reset_logging)
def test_default_logging(self):
ops.log.setup_root_logging(self.backend)
logger = logging.getLogger()
self.assertEqual(logger.level, logging.DEBUG)
self.assertIsInstance(logger.handlers[0], ops.log.JujuLogHandler)
test_cases = [(
lambda: logger.critical('critical'), [('CRITICAL', 'critical')]
), (
lambda: logger.error('error'), [('ERROR', 'error')]
), (
lambda: logger.warning('warning'), [('WARNING', 'warning')]
), (
lambda: logger.info('info'), [('INFO', 'info')]
), (
lambda: logger.debug('debug'), [('DEBUG', 'debug')]
)]
for do, res in test_cases:
do()
calls = self.backend.calls(clear=True)
self.assertEqual(calls, res)
def test_handler_filtering(self):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(ops.log.JujuLogHandler(self.backend, logging.WARNING))
logger.info('foo')
self.assertEqual(self.backend.calls(), [])
logger.warning('bar')
self.assertEqual(self.backend.calls(), [('WARNING', 'bar')])
def test_no_stderr_without_debug(self):
buffer = io.StringIO()
with patch('sys.stderr', buffer):
ops.log.setup_root_logging(self.backend, debug=False)
logger = logging.getLogger()
logger.debug('debug message')
logger.info('info message')
logger.warning('warning message')
logger.critical('critical message')
self.assertEqual(
self.backend.calls(),
[('DEBUG', 'debug message'),
('INFO', 'info message'),
('WARNING', 'warning message'),
('CRITICAL', 'critical message'),
])
self.assertEqual(buffer.getvalue(), "")
def test_debug_logging(self):
buffer = io.StringIO()
with patch('sys.stderr', buffer):
ops.log.setup_root_logging(self.backend, debug=True)
logger = logging.getLogger()
logger.debug('debug message')
logger.info('info message')
logger.warning('warning message')
logger.critical('critical message')
self.assertEqual(
self.backend.calls(),
[('DEBUG', 'debug message'),
('INFO', 'info message'),
('WARNING', 'warning message'),
('CRITICAL', 'critical message'),
])
self.assertRegex(
buffer.getvalue(),
r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d DEBUG debug message\n"
r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d INFO info message\n"
r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d WARNING warning message\n"
r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d CRITICAL critical message\n"
)
def test_reduced_logging(self):
ops.log.setup_root_logging(self.backend)
logger = logging.getLogger()
logger.setLevel(logging.WARNING)
logger.debug('debug')
logger.info('info')
logger.warning('warning')
self.assertEqual(self.backend.calls(), [('WARNING', 'warning')])
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python3
# Copyright 2019 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import base64
import logging
import os
import pickle
import shutil
import subprocess
import sys
import tempfile
import unittest
import importlib.util
from pathlib import Path
from ops.charm import (
CharmBase,
CharmEvents,
HookEvent,
InstallEvent,
StartEvent,
ConfigChangedEvent,
UpgradeCharmEvent,
UpdateStatusEvent,
LeaderSettingsChangedEvent,
RelationJoinedEvent,
RelationChangedEvent,
RelationDepartedEvent,
RelationBrokenEvent,
RelationEvent,
StorageAttachedEvent,
ActionEvent,
CollectMetricsEvent,
)
from .test_helpers import fake_script, fake_script_calls
# This relies on the expected repository structure to find a path to
# source of the charm under test.
TEST_CHARM_DIR = Path(__file__ + '/../charms/test_main').resolve()
logger = logging.getLogger(__name__)
class SymlinkTargetError(Exception):
pass
class EventSpec:
def __init__(self, event_type, event_name, env_var=None,
relation_id=None, remote_app=None, remote_unit=None,
charm_config=None):
self.event_type = event_type
self.event_name = event_name
self.env_var = env_var
self.relation_id = relation_id
self.remote_app = remote_app
self.remote_unit = remote_unit
self.charm_config = charm_config
class TestMain(abc.ABC):
@abc.abstractmethod
def _setup_entry_point(self, directory, entry_point):
"""Set up the given entry point in the given directory.
If not using dispatch, that would be a symlink <dir>/<entry_point>
pointing at src/charm.py; if using dispatch that would be the dispatch
symlink. It could also not be a symlink...
"""
return NotImplemented
@abc.abstractmethod
def _call_event(self, rel_path, env):
"""Set up the environment and call (i.e. run) the given event."""
return NotImplemented
@abc.abstractmethod
def test_setup_event_links(self):
"""Test auto-creation of symlinks caused by initial events.
Depending on the combination of dispatch and non-dispatch, this should
be checking for the creation or the _lack_ of creation, as appropriate.
"""
return NotImplemented
def setUp(self):
self._setup_charm_dir()
_, tmp_file = tempfile.mkstemp()
self._state_file = Path(tmp_file)
self.addCleanup(self._state_file.unlink)
# Relations events are defined dynamically and modify the class attributes.
# We use a subclass temporarily to prevent these side effects from leaking.
class TestCharmEvents(CharmEvents):
pass
CharmBase.on = TestCharmEvents()
def cleanup():
shutil.rmtree(str(self.JUJU_CHARM_DIR))
CharmBase.on = CharmEvents()
self.addCleanup(cleanup)
fake_script(self, 'juju-log', "exit 0")
# set to something other than None for tests that care
self.stdout = None
self.stderr = None
def _setup_charm_dir(self):
self.JUJU_CHARM_DIR = Path(tempfile.mkdtemp()) / 'test_main'
self.hooks_dir = self.JUJU_CHARM_DIR / 'hooks'
charm_path = str(self.JUJU_CHARM_DIR / 'src/charm.py')
self.charm_exec_path = os.path.relpath(charm_path,
str(self.hooks_dir))
shutil.copytree(str(TEST_CHARM_DIR), str(self.JUJU_CHARM_DIR))
charm_spec = importlib.util.spec_from_file_location("charm", charm_path)
self.charm_module = importlib.util.module_from_spec(charm_spec)
charm_spec.loader.exec_module(self.charm_module)
self._prepare_initial_hooks()
def _prepare_initial_hooks(self):
initial_hooks = ('install', 'start', 'upgrade-charm', 'disks-storage-attached')
self.hooks_dir.mkdir()
for hook in initial_hooks:
self._setup_entry_point(self.hooks_dir, hook)
def _prepare_actions(self):
actions_meta = '''
foo-bar:
description: Foos the bar.
title: foo-bar
params:
foo-name:
type: string
description: A foo name to bar.
silent:
type: boolean
description:
default: false
required:
- foo-name
start:
description: Start the unit.'''
actions_dir_name = 'actions'
actions_meta_file = 'actions.yaml'
with (self.JUJU_CHARM_DIR / actions_meta_file).open('w+t') as f:
f.write(actions_meta)
actions_dir = self.JUJU_CHARM_DIR / actions_dir_name
actions_dir.mkdir()
for action_name in ('start', 'foo-bar'):
self._setup_entry_point(actions_dir, action_name)
def _read_and_clear_state(self):
state = None
if self._state_file.stat().st_size:
with self._state_file.open('r+b') as state_file:
state = pickle.load(state_file)
state_file.truncate(0)
return state
def _simulate_event(self, event_spec):
env = {
'PATH': "{}:{}".format(Path(__file__).parent / 'bin', os.environ['PATH']),
'JUJU_CHARM_DIR': str(self.JUJU_CHARM_DIR),
'JUJU_UNIT_NAME': 'test_main/0',
'CHARM_CONFIG': event_spec.charm_config,
}
if issubclass(event_spec.event_type, RelationEvent):
rel_name = event_spec.event_name.split('_')[0]
env.update({
'JUJU_RELATION': rel_name,
'JUJU_RELATION_ID': str(event_spec.relation_id),
})
remote_app = event_spec.remote_app
# For juju < 2.7 app name is extracted from JUJU_REMOTE_UNIT.
if remote_app is not None:
env['JUJU_REMOTE_APP'] = remote_app
remote_unit = event_spec.remote_unit
if remote_unit is None:
remote_unit = ''
env['JUJU_REMOTE_UNIT'] = remote_unit
else:
env.update({
'JUJU_REMOTE_UNIT': '',
'JUJU_REMOTE_APP': '',
})
if issubclass(event_spec.event_type, ActionEvent):
event_filename = event_spec.event_name[:-len('_action')].replace('_', '-')
env.update({
event_spec.env_var: event_filename,
})
if event_spec.env_var == 'JUJU_ACTION_NAME':
event_dir = 'actions'
else:
raise RuntimeError('invalid envar name specified for a action event')
else:
event_filename = event_spec.event_name.replace('_', '-')
event_dir = 'hooks'
self._call_event(Path(event_dir, event_filename), env)
return self._read_and_clear_state()
def test_event_reemitted(self):
# base64 encoding is used to avoid null bytes.
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
# First run "install" to make sure all hooks are set up.
state = self._simulate_event(EventSpec(InstallEvent, 'install', charm_config=charm_config))
self.assertEqual(state['observed_event_types'], [InstallEvent])
state = self._simulate_event(EventSpec(ConfigChangedEvent, 'config-changed',
charm_config=charm_config))
self.assertEqual(state['observed_event_types'], [ConfigChangedEvent])
# Re-emit should pick the deferred config-changed.
state = self._simulate_event(EventSpec(UpdateStatusEvent, 'update-status',
charm_config=charm_config))
self.assertEqual(state['observed_event_types'], [ConfigChangedEvent, UpdateStatusEvent])
def test_no_reemission_on_collect_metrics(self):
# base64 encoding is used to avoid null bytes.
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
fake_script(self, 'add-metric', 'exit 0')
# First run "install" to make sure all hooks are set up.
state = self._simulate_event(EventSpec(InstallEvent, 'install', charm_config=charm_config))
self.assertEqual(state['observed_event_types'], [InstallEvent])
state = self._simulate_event(EventSpec(ConfigChangedEvent, 'config-changed',
charm_config=charm_config))
self.assertEqual(state['observed_event_types'], [ConfigChangedEvent])
# Re-emit should not pick the deferred config-changed because
# collect-metrics runs in a restricted context.
state = self._simulate_event(EventSpec(CollectMetricsEvent, 'collect-metrics',
charm_config=charm_config))
self.assertEqual(state['observed_event_types'], [CollectMetricsEvent])
def test_multiple_events_handled(self):
self._prepare_actions()
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
actions_charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
'USE_ACTIONS': True,
}))
fake_script(self, 'action-get', "echo '{}'")
# Sample events with a different amount of dashes used
# and with endpoints from different sections of metadata.yaml
events_under_test = [(
EventSpec(InstallEvent, 'install',
charm_config=charm_config),
{},
), (
EventSpec(StartEvent, 'start',
charm_config=charm_config),
{},
), (
EventSpec(UpdateStatusEvent, 'update_status',
charm_config=charm_config),
{},
), (
EventSpec(LeaderSettingsChangedEvent, 'leader_settings_changed',
charm_config=charm_config),
{},
), (
EventSpec(RelationJoinedEvent, 'db_relation_joined',
relation_id=1,
remote_app='remote', remote_unit='remote/0',
charm_config=charm_config),
{'relation_name': 'db',
'relation_id': 1,
'app_name': 'remote',
'unit_name': 'remote/0'},
), (
EventSpec(RelationChangedEvent, 'mon_relation_changed',
relation_id=2,
remote_app='remote', remote_unit='remote/0',
charm_config=charm_config),
{'relation_name': 'mon',
'relation_id': 2,
'app_name': 'remote',
'unit_name': 'remote/0'},
), (
EventSpec(RelationChangedEvent, 'mon_relation_changed',
relation_id=2,
remote_app='remote', remote_unit=None,
charm_config=charm_config),
{'relation_name': 'mon',
'relation_id': 2,
'app_name': 'remote',
'unit_name': None},
), (
EventSpec(RelationDepartedEvent, 'mon_relation_departed',
relation_id=2,
remote_app='remote', remote_unit='remote/0',
charm_config=charm_config),
{'relation_name': 'mon',
'relation_id': 2,
'app_name': 'remote',
'unit_name': 'remote/0'},
), (
EventSpec(RelationBrokenEvent, 'ha_relation_broken',
relation_id=3,
charm_config=charm_config),
{'relation_name': 'ha',
'relation_id': 3},
), (
# Events without a remote app specified (for Juju < 2.7).
EventSpec(RelationJoinedEvent, 'db_relation_joined',
relation_id=1,
remote_unit='remote/0',
charm_config=charm_config),
{'relation_name': 'db',
'relation_id': 1,
'app_name': 'remote',
'unit_name': 'remote/0'},
), (
EventSpec(RelationChangedEvent, 'mon_relation_changed',
relation_id=2,
remote_unit='remote/0',
charm_config=charm_config),
{'relation_name': 'mon',
'relation_id': 2,
'app_name': 'remote',
'unit_name': 'remote/0'},
), (
EventSpec(RelationDepartedEvent, 'mon_relation_departed',
relation_id=2,
remote_unit='remote/0',
charm_config=charm_config),
{'relation_name': 'mon',
'relation_id': 2,
'app_name': 'remote',
'unit_name': 'remote/0'},
), (
EventSpec(ActionEvent, 'start_action',
env_var='JUJU_ACTION_NAME',
charm_config=actions_charm_config),
{},
), (
EventSpec(ActionEvent, 'foo_bar_action',
env_var='JUJU_ACTION_NAME',
charm_config=actions_charm_config),
{},
)]
logger.debug('Expected events %s', events_under_test)
# First run "install" to make sure all hooks are set up.
self._simulate_event(EventSpec(InstallEvent, 'install', charm_config=charm_config))
# Simulate hook executions for every event.
for event_spec, expected_event_data in events_under_test:
state = self._simulate_event(event_spec)
state_key = 'on_' + event_spec.event_name
handled_events = state.get(state_key, [])
# Make sure that a handler for that event was called once.
self.assertEqual(len(handled_events), 1)
# Make sure the event handled by the Charm has the right type.
handled_event_type = handled_events[0]
self.assertEqual(handled_event_type, event_spec.event_type)
self.assertEqual(state['observed_event_types'], [event_spec.event_type])
if event_spec.event_name in expected_event_data:
self.assertEqual(state[event_spec.event_name + '_data'],
expected_event_data[event_spec.event_name])
def test_event_not_implemented(self):
"""Make sure events without implementation do not cause non-zero exit.
"""
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
# Simulate a scenario where there is a symlink for an event that
# a charm does not know how to handle.
hook_path = self.JUJU_CHARM_DIR / 'hooks/not-implemented-event'
# This will be cleared up in tearDown.
hook_path.symlink_to('install')
try:
self._simulate_event(EventSpec(HookEvent, 'not-implemented-event',
charm_config=charm_config))
except subprocess.CalledProcessError:
self.fail('Event simulation for an unsupported event'
' results in a non-zero exit code returned')
def test_collect_metrics(self):
indicator_file = self.JUJU_CHARM_DIR / 'indicator'
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
'INDICATOR_FILE': indicator_file
}))
fake_script(self, 'add-metric', 'exit 0')
fake_script(self, 'juju-log', 'exit 0')
self._simulate_event(EventSpec(InstallEvent, 'install', charm_config=charm_config))
# Clear the calls during 'install'
fake_script_calls(self, clear=True)
self._simulate_event(EventSpec(CollectMetricsEvent, 'collect_metrics',
charm_config=charm_config))
self.assertEqual(
fake_script_calls(self),
[['juju-log', '--log-level', 'DEBUG', 'Emitting Juju event collect_metrics'],
['add-metric', '--labels', 'bar=4.2', 'foo=42']])
def test_logger(self):
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
'USE_LOG_ACTIONS': True,
}))
fake_script(self, 'action-get', "echo '{}'")
actions_yaml = self.JUJU_CHARM_DIR / 'actions.yaml'
actions_yaml.write_text(
'''
log_critical: {}
log_error: {}
log_warning: {}
log_info: {}
log_debug: {}
''')
test_cases = [(
EventSpec(ActionEvent, 'log_critical_action', env_var='JUJU_ACTION_NAME',
charm_config=charm_config),
['juju-log', '--log-level', 'CRITICAL', 'super critical'],
), (
EventSpec(ActionEvent, 'log_error_action',
env_var='JUJU_ACTION_NAME',
charm_config=charm_config),
['juju-log', '--log-level', 'ERROR', 'grave error'],
), (
EventSpec(ActionEvent, 'log_warning_action',
env_var='JUJU_ACTION_NAME',
charm_config=charm_config),
['juju-log', '--log-level', 'WARNING', 'wise warning'],
), (
EventSpec(ActionEvent, 'log_info_action',
env_var='JUJU_ACTION_NAME',
charm_config=charm_config),
['juju-log', '--log-level', 'INFO', 'useful info'],
)]
# Set up action symlinks.
self._simulate_event(EventSpec(InstallEvent, 'install',
charm_config=charm_config))
for event_spec, calls in test_cases:
self._simulate_event(event_spec)
self.assertIn(calls, fake_script_calls(self, clear=True))
class TestMainWithNoDispatch(TestMain, unittest.TestCase):
def _setup_entry_point(self, directory, entry_point):
path = directory / entry_point
path.symlink_to(self.charm_exec_path)
def _call_event(self, rel_path, env):
event_file = self.JUJU_CHARM_DIR / rel_path
# Note that sys.executable is used to make sure we are using the same
# interpreter for the child process to support virtual environments.
subprocess.run(
[sys.executable, str(event_file)],
check=True, env=env, cwd=str(self.JUJU_CHARM_DIR))
def test_setup_event_links(self):
"""Test auto-creation of symlinks caused by initial events.
"""
all_event_hooks = ['hooks/' + e.replace("_", "-")
for e in self.charm_module.Charm.on.events().keys()]
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
initial_events = {
EventSpec(InstallEvent, 'install', charm_config=charm_config),
EventSpec(StorageAttachedEvent, 'disks-storage-attached', charm_config=charm_config),
EventSpec(StartEvent, 'start', charm_config=charm_config),
EventSpec(UpgradeCharmEvent, 'upgrade-charm', charm_config=charm_config),
}
def _assess_event_links(event_spec):
self.assertTrue(self.hooks_dir / event_spec.event_name in self.hooks_dir.iterdir())
for event_hook in all_event_hooks:
self.assertTrue((self.JUJU_CHARM_DIR / event_hook).exists(),
'Missing hook: ' + event_hook)
self.assertEqual(os.readlink(str(self.JUJU_CHARM_DIR / event_hook)),
self.charm_exec_path)
for initial_event in initial_events:
self._setup_charm_dir()
self._simulate_event(initial_event)
_assess_event_links(initial_event)
# Make sure it is idempotent.
self._simulate_event(initial_event)
_assess_event_links(initial_event)
def test_setup_action_links(self):
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
actions_yaml = self.JUJU_CHARM_DIR / 'actions.yaml'
actions_yaml.write_text('test: {}')
self._simulate_event(EventSpec(InstallEvent, 'install', charm_config=charm_config))
action_hook = self.JUJU_CHARM_DIR / 'actions' / 'test'
self.assertTrue(action_hook.exists())
class TestMainWithDispatch(TestMain, unittest.TestCase):
def _setup_entry_point(self, directory, entry_point):
path = self.JUJU_CHARM_DIR / 'dispatch'
if not path.exists():
path.symlink_to('src/charm.py')
def _call_event(self, rel_path, env):
env["JUJU_DISPATCH_PATH"] = str(rel_path)
dispatch = self.JUJU_CHARM_DIR / 'dispatch'
subprocess.run(
[sys.executable, str(dispatch)],
stdout=self.stdout,
stderr=self.stderr,
check=True, env=env, cwd=str(self.JUJU_CHARM_DIR))
def test_setup_event_links(self):
"""Test auto-creation of symlinks caused by initial events does _not_ happen when using dispatch.
"""
all_event_hooks = ['hooks/' + e.replace("_", "-")
for e in self.charm_module.Charm.on.events().keys()]
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
initial_events = {
EventSpec(InstallEvent, 'install', charm_config=charm_config),
EventSpec(StorageAttachedEvent, 'disks-storage-attached', charm_config=charm_config),
EventSpec(StartEvent, 'start', charm_config=charm_config),
EventSpec(UpgradeCharmEvent, 'upgrade-charm', charm_config=charm_config),
}
def _assess_event_links(event_spec):
self.assertNotIn(self.hooks_dir / event_spec.event_name, self.hooks_dir.iterdir())
for event_hook in all_event_hooks:
self.assertFalse((self.JUJU_CHARM_DIR / event_hook).exists(),
'Spurious hook: ' + event_hook)
for initial_event in initial_events:
self._setup_charm_dir()
self._simulate_event(initial_event)
_assess_event_links(initial_event)
def test_hook_and_dispatch(self):
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
self.fake_script_path = self.hooks_dir
fake_script(self, 'install', 'exit 0')
state = self._simulate_event(EventSpec(InstallEvent, 'install', charm_config=charm_config))
# the script was called, *and*, the .on. was called
self.assertEqual(fake_script_calls(self), [['install', '']])
self.assertEqual(state['observed_event_types'], [InstallEvent])
def test_hook_and_dispatch_with_failing_hook(self):
self.stdout = self.stderr = tempfile.TemporaryFile()
self.addCleanup(self.stdout.close)
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
old_path = self.fake_script_path
self.fake_script_path = self.hooks_dir
fake_script(self, 'install', 'exit 42')
event = EventSpec(InstallEvent, 'install', charm_config=charm_config)
with self.assertRaises(subprocess.CalledProcessError):
self._simulate_event(event)
self.fake_script_path = old_path
self.stdout.seek(0)
self.assertEqual(self.stdout.read(), b'')
calls = fake_script_calls(self)
self.assertEqual(len(calls), 1, 'unexpect call result: {}'.format(calls))
self.assertEqual(len(calls[0]), 4, 'unexpect call result: {}'.format(calls[0]))
self.assertEqual(
calls[0][:3],
['juju-log', '--log-level', 'WARNING']
)
self.assertRegex(calls[0][3], r'hook /\S+/install exited with status 42')
def test_hook_and_dispatch_but_hook_is_dispatch(self):
charm_config = base64.b64encode(pickle.dumps({
'STATE_FILE': self._state_file,
}))
event = EventSpec(InstallEvent, 'install', charm_config=charm_config)
hook_path = self.hooks_dir / 'install'
for ((rel, ind), path) in {
# relative and indirect
(True, True): Path('../dispatch'),
# relative and direct
(True, False): Path(self.charm_exec_path),
# absolute and direct
(False, False): (self.hooks_dir / self.charm_exec_path).resolve(),
# absolute and indirect
(False, True): self.JUJU_CHARM_DIR / 'dispatch',
}.items():
with self.subTest(path=path, rel=rel, ind=ind):
# sanity check
self.assertEqual(path.is_absolute(), not rel)
self.assertEqual(path.name == 'dispatch', ind)
try:
hook_path.symlink_to(path)
state = self._simulate_event(event)
# the .on. was only called once
self.assertEqual(state['observed_event_types'], [InstallEvent])
self.assertEqual(state['on_install'], [InstallEvent])
finally:
hook_path.unlink()
# TODO: this does not work
# class TestMainWithDispatchAsScript(TestMainWithDispatch):
# """Here dispatch is a script that execs the charm.py instead of a symlink.
# """
# def _setup_entry_point(self, directory, entry_point):
# path = self.JUJU_CHARM_DIR / 'dispatch'
# if not path.exists():
# path.write_text('#!/bin/sh\nexec "{}" "{}"\n'.format(
# sys.executable,
# self.JUJU_CHARM_DIR / 'src/charm.py'))
# path.chmod(0o755)
# def _call_event(self, rel_path, env):
# env["JUJU_DISPATCH_PATH"] = str(rel_path)
# dispatch = self.JUJU_CHARM_DIR / 'dispatch'
# subprocess.check_call([str(dispatch)],
# env=env, cwd=str(self.JUJU_CHARM_DIR))
if __name__ == "__main__":
unittest.main()
#!/usr/bin/python3
# Copyright 2019 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pathlib
import unittest
import json
import ipaddress
from collections import OrderedDict
import ops.model
import ops.charm
from ops.charm import RelationMeta
from test.test_helpers import fake_script, fake_script_calls
class TestModel(unittest.TestCase):
def setUp(self):
def restore_env(env):
os.environ.clear()
os.environ.update(env)
self.addCleanup(restore_env, os.environ.copy())
os.environ['JUJU_UNIT_NAME'] = 'myapp/0'
self.backend = ops.model.ModelBackend()
meta = ops.charm.CharmMeta()
meta.relations = {
'db0': RelationMeta('provides', 'db0', {'interface': 'db0', 'scope': 'global'}),
'db1': RelationMeta('requires', 'db1', {'interface': 'db1', 'scope': 'global'}),
'db2': RelationMeta('peers', 'db2', {'interface': 'db2', 'scope': 'global'}),
}
self.model = ops.model.Model('myapp/0', meta, self.backend)
fake_script(self, 'relation-ids', """([ "$1" = db0 ] && echo '["db0:4"]') || echo '[]'""")
def test_model(self):
self.assertIs(self.model.app, self.model.unit.app)
def test_relations_keys(self):
fake_script(self, 'relation-ids',
"""[ "$1" = db2 ] && echo '["db2:5", "db2:6"]' || echo '[]'""")
fake_script(self, 'relation-list',
"""
case "$2" in
5)
echo '["remoteapp1/0", "remoteapp1/1"]'
;;
6)
echo '["remoteapp2/0"]'
;;
*)
exit 2
;;
esac
""")
for relation in self.model.relations['db2']:
self.assertIn(self.model.unit, relation.data)
unit_from_rel = next(filter(lambda u: u.name == 'myapp/0', relation.data.keys()))
self.assertIs(self.model.unit, unit_from_rel)
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db2', '--format=json'],
['relation-list', '-r', '5', '--format=json'],
['relation-list', '-r', '6', '--format=json']
])
def test_get_relation(self):
err_msg = 'ERROR invalid value "$2" for option -r: relation not found'
fake_script(self, 'relation-ids', '''
case "$1" in
db1)
echo '["db1:4"]'
;;
db2)
echo '["db2:5", "db2:6"]'
;;
*)
echo '[]'
;;
esac
''')
fake_script(self, 'relation-list', '''
if [ "$2" = 4 ]; then
echo '["remoteapp1/0"]'
else
echo {} >&2
exit 2
fi
'''.format(err_msg))
fake_script(self, 'relation-get',
"""echo {} >&2 ; exit 2""".format(err_msg))
with self.assertRaises(ops.model.ModelError):
self.model.get_relation('db1', 'db1:4')
db1_4 = self.model.get_relation('db1', 4)
self.assertIsInstance(db1_4, ops.model.Relation)
dead_rel = self.model.get_relation('db1', 7)
self.assertIsInstance(dead_rel, ops.model.Relation)
self.assertEqual(set(dead_rel.data.keys()), {self.model.unit, self.model.unit.app})
self.assertEqual(dead_rel.data[self.model.unit], {})
self.assertIsNone(self.model.get_relation('db0'))
self.assertIs(self.model.get_relation('db1'), db1_4)
with self.assertRaises(ops.model.TooManyRelatedAppsError):
self.model.get_relation('db2')
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['relation-list', '-r', '7', '--format=json'],
['relation-get', '-r', '7', '-', 'myapp/0', '--app=False', '--format=json'],
['relation-ids', 'db0', '--format=json'],
['relation-ids', 'db2', '--format=json'],
['relation-list', '-r', '5', '--format=json'],
['relation-list', '-r', '6', '--format=json']
])
def test_peer_relation_app(self):
meta = ops.charm.CharmMeta()
meta.relations = {'dbpeer': RelationMeta('peers', 'dbpeer',
{'interface': 'dbpeer', 'scope': 'global'})}
self.model = ops.model.Model('myapp/0', meta, self.backend)
err_msg = 'ERROR invalid value "$2" for option -r: relation not found'
fake_script(self, 'relation-ids',
'''([ "$1" = dbpeer ] && echo '["dbpeer:0"]') || echo "[]"''')
fake_script(self, 'relation-list',
'''([ "$2" = 0 ] && echo "[]") || (echo {} >&2 ; exit 2)'''.format(err_msg))
db1_4 = self.model.get_relation('dbpeer')
self.assertIs(db1_4.app, self.model.app)
def test_remote_units_is_our(self):
fake_script(self, 'relation-ids',
"""[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list',
"""[ "$2" = 4 ] && echo '["remoteapp1/0", "remoteapp1/1"]' || exit 2""")
for u in self.model.get_relation('db1').units:
self.assertFalse(u._is_our_unit)
self.assertFalse(u.app._is_our_app)
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json']
])
def test_our_unit_is_our(self):
self.assertTrue(self.model.unit._is_our_unit)
self.assertTrue(self.model.unit.app._is_our_app)
def test_unit_relation_data(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', """[ "$2" = 4 ] && echo '["remoteapp1/0"]' || exit 2""")
fake_script(self, 'relation-get', """
if [ "$2" = 4 ] && [ "$4" = "remoteapp1/0" ]; then
echo '{"host": "remoteapp1-0"}'
else
exit 2
fi
""")
random_unit = self.model._cache.get(ops.model.Unit, 'randomunit/0')
with self.assertRaises(KeyError):
self.model.get_relation('db1').data[random_unit]
remoteapp1_0 = next(filter(lambda u: u.name == 'remoteapp1/0',
self.model.get_relation('db1').units))
self.assertEqual(self.model.get_relation('db1').data[remoteapp1_0],
{'host': 'remoteapp1-0'})
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['relation-get', '-r', '4', '-', 'remoteapp1/0', '--app=False', '--format=json']
])
def test_remote_app_relation_data(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', '''
if [ "$2" = 4 ]; then
echo '["remoteapp1/0", "remoteapp1/1"]'
else
exit 2
fi
''')
fake_script(self, 'relation-get', '''
if [ "$2" = 4 ] && [ "$4" = remoteapp1 ]; then
echo '{"secret": "cafedeadbeef"}'
else
exit 2
fi
''')
# Try to get relation data for an invalid remote application.
random_app = self.model._cache.get(ops.model.Application, 'randomapp')
with self.assertRaises(KeyError):
self.model.get_relation('db1').data[random_app]
remoteapp1 = self.model.get_relation('db1').app
self.assertEqual(self.model.get_relation('db1').data[remoteapp1],
{'secret': 'cafedeadbeef'})
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['relation-get', '-r', '4', '-', 'remoteapp1', '--app=True', '--format=json'],
])
def test_relation_data_modify_remote(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', """[ "$2" = 4 ] && echo '["remoteapp1/0"]' || exit 2""")
fake_script(self, 'relation-get', """
if [ "$2" = 4 ] && [ "$4" = "remoteapp1/0" ]; then
echo '{"host": "remoteapp1-0"}'
else
exit 2
fi
""")
rel_db1 = self.model.get_relation('db1')
remoteapp1_0 = next(filter(lambda u: u.name == 'remoteapp1/0',
self.model.get_relation('db1').units))
# Force memory cache to be loaded.
self.assertIn('host', rel_db1.data[remoteapp1_0])
with self.assertRaises(ops.model.RelationDataError):
rel_db1.data[remoteapp1_0]['foo'] = 'bar'
self.assertNotIn('foo', rel_db1.data[remoteapp1_0])
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['relation-get', '-r', '4', '-', 'remoteapp1/0', '--app=False', '--format=json']
])
def test_relation_data_modify_our(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', """[ "$2" = 4 ] && echo '["remoteapp1/0"]' || exit 2""")
fake_script(self, 'relation-set', '''[ "$2" = 4 ] && exit 0 || exit 2''')
fake_script(self, 'relation-get', '''
if [ "$2" = 4 ] && [ "$4" = "myapp/0" ]; then
echo '{"host": "bar"}'
else
exit 2
fi
''')
rel_db1 = self.model.get_relation('db1')
# Force memory cache to be loaded.
self.assertIn('host', rel_db1.data[self.model.unit])
rel_db1.data[self.model.unit]['host'] = 'bar'
self.assertEqual(rel_db1.data[self.model.unit]['host'], 'bar')
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['relation-get', '-r', '4', '-', 'myapp/0', '--app=False', '--format=json'],
['relation-set', '-r', '4', 'host=bar', '--app=False']
])
def test_app_relation_data_modify_local_as_leader(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', '''
if [ "$2" = 4 ]; then
echo '["remoteapp1/0", "remoteapp1/1"]'
else
exit 2
fi
''')
fake_script(self, 'relation-get', '''
if [ "$2" = 4 ] && [ "$4" = myapp ]; then
echo '{"password": "deadbeefcafe"}'
else
exit 2
fi
''')
fake_script(self, 'relation-set', """[ "$2" = 4 ] && exit 0 || exit 2""")
fake_script(self, 'is-leader', 'echo true')
local_app = self.model.unit.app
rel_db1 = self.model.get_relation('db1')
self.assertEqual(rel_db1.data[local_app], {'password': 'deadbeefcafe'})
rel_db1.data[local_app]['password'] = 'foo'
self.assertEqual(rel_db1.data[local_app]['password'], 'foo')
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['relation-get', '-r', '4', '-', 'myapp', '--app=True', '--format=json'],
['is-leader', '--format=json'],
['relation-set', '-r', '4', 'password=foo', '--app=True'],
])
def test_app_relation_data_modify_local_as_minion(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', '''
if [ "$2" = 4 ]; then
echo '["remoteapp1/0", "remoteapp1/1"]'
else
exit 2
fi
''')
fake_script(self, 'relation-get', '''
if [ "$2" = 4 ] && [ "$4" = myapp ]; then
echo '{"password": "deadbeefcafe"}'
else
exit 2
fi
''')
fake_script(self, 'is-leader', 'echo false')
local_app = self.model.unit.app
rel_db1 = self.model.get_relation('db1')
self.assertEqual(rel_db1.data[local_app], {'password': 'deadbeefcafe'})
with self.assertRaises(ops.model.RelationDataError):
rel_db1.data[local_app]['password'] = 'foobar'
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['relation-get', '-r', '4', '-', 'myapp', '--app=True', '--format=json'],
['is-leader', '--format=json'],
])
def test_relation_data_del_key(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', """[ "$2" = 4 ] && echo '["remoteapp1/0"]' || exit 2""")
fake_script(self, 'relation-set', '''[ "$2" = 4 ] && exit 0 || exit 2''')
fake_script(self, 'relation-get', '''
if [ "$2" = 4 ] && [ "$4" = "myapp/0" ]; then
echo '{"host": "bar"}'
else
exit 2
fi
''')
rel_db1 = self.model.get_relation('db1')
# Force memory cache to be loaded.
self.assertIn('host', rel_db1.data[self.model.unit])
del rel_db1.data[self.model.unit]['host']
fake_script(self, 'relation-get', '''
if [ "$2" = 4 ] && [ "$4" = "myapp/0" ]; then
echo '{}'
else
exit 2
fi
''')
self.assertNotIn('host', rel_db1.data[self.model.unit])
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['relation-get', '-r', '4', '-', 'myapp/0', '--app=False', '--format=json'],
['relation-set', '-r', '4', 'host=', '--app=False']
])
def test_relation_set_fail(self):
fake_script(self, 'relation-ids', """[ "$1" = db2 ] && echo '["db2:5"]' || echo '[]'""")
fake_script(self, 'relation-list',
"""[ "$2" = 5 ] && echo '["remoteapp1/0"]' || exit 2""")
fake_script(self, 'relation-get', '''
if [ "$2" = 5 ] && [ "$4" = "myapp/0" ]; then
echo '{"host": "myapp-0"}'
else
exit 2
fi
''')
fake_script(self, 'relation-set', 'exit 2')
rel_db2 = self.model.relations['db2'][0]
# Force memory cache to be loaded.
self.assertIn('host', rel_db2.data[self.model.unit])
with self.assertRaises(ops.model.ModelError):
rel_db2.data[self.model.unit]['host'] = 'bar'
self.assertEqual(rel_db2.data[self.model.unit]['host'], 'myapp-0')
with self.assertRaises(ops.model.ModelError):
del rel_db2.data[self.model.unit]['host']
self.assertIn('host', rel_db2.data[self.model.unit])
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db2', '--format=json'],
['relation-list', '-r', '5', '--format=json'],
['relation-get', '-r', '5', '-', 'myapp/0', '--app=False', '--format=json'],
['relation-set', '-r', '5', 'host=bar', '--app=False'],
['relation-set', '-r', '5', 'host=', '--app=False']
])
def test_relation_get_set_is_app_arg(self):
self.backend = ops.model.ModelBackend()
# No is_app provided.
with self.assertRaises(TypeError):
self.backend.relation_set(1, 'fookey', 'barval')
with self.assertRaises(TypeError):
self.backend.relation_get(1, 'fooentity')
# Invalid types for is_app.
for is_app_v in [None, 1, 2.0, 'a', b'beef']:
with self.assertRaises(TypeError):
self.backend.relation_set(1, 'fookey', 'barval', is_app=is_app_v)
with self.assertRaises(TypeError):
self.backend.relation_get(1, 'fooentity', is_app=is_app_v)
def test_relation_data_type_check(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list',
"""[ "$2" = 4 ] && echo '["remoteapp1/0"]' || exit 2""")
fake_script(self, 'relation-get', '''
if [ "$2" = 4 ] && [ "$4" = "myapp/0" ]; then
echo '{"host": "myapp-0"}'
else
exit 2
fi
''')
rel_db1 = self.model.get_relation('db1')
with self.assertRaises(ops.model.RelationDataError):
rel_db1.data[self.model.unit]['foo'] = 1
with self.assertRaises(ops.model.RelationDataError):
rel_db1.data[self.model.unit]['foo'] = {'foo': 'bar'}
with self.assertRaises(ops.model.RelationDataError):
rel_db1.data[self.model.unit]['foo'] = None
self.assertEqual(fake_script_calls(self), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json']
])
def test_config(self):
fake_script(self, 'config-get', """echo '{"foo":"foo","bar":1,"qux":true}'""")
self.assertEqual(self.model.config, {
'foo': 'foo',
'bar': 1,
'qux': True,
})
with self.assertRaises(TypeError):
# Confirm that we cannot modify config values.
self.model.config['foo'] = 'bar'
self.assertEqual(fake_script_calls(self), [['config-get', '--format=json']])
def test_is_leader(self):
def check_remote_units():
fake_script(self, 'relation-ids',
"""[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list',
"""[ "$2" = 4 ] && echo '["remoteapp1/0", "remoteapp1/1"]' || exit 2""")
# Cannot determine leadership for remote units.
for u in self.model.get_relation('db1').units:
with self.assertRaises(RuntimeError):
u.is_leader()
fake_script(self, 'is-leader', 'echo true')
self.assertTrue(self.model.unit.is_leader())
check_remote_units()
# Create a new model and backend to drop a cached is-leader output.
self.backend = ops.model.ModelBackend()
meta = ops.charm.CharmMeta()
meta.relations = {
'db0': RelationMeta('provides', 'db0', {'interface': 'db0', 'scope': 'global'}),
'db1': RelationMeta('requires', 'db1', {'interface': 'db1', 'scope': 'global'}),
'db2': RelationMeta('peers', 'db2', {'interface': 'db2', 'scope': 'global'}),
}
self.model = ops.model.Model('myapp/0', meta, self.backend)
fake_script(self, 'is-leader', 'echo false')
self.assertFalse(self.model.unit.is_leader())
check_remote_units()
self.assertEqual(fake_script_calls(self), [
['is-leader', '--format=json'],
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
['is-leader', '--format=json'],
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
])
def test_is_leader_refresh(self):
fake_script(self, 'is-leader', 'echo false')
self.assertFalse(self.model.unit.is_leader())
# Change the leadership status and force a recheck.
fake_script(self, 'is-leader', 'echo true')
self.backend._leader_check_time = None
self.assertTrue(self.model.unit.is_leader())
# Force a recheck without changing the leadership status.
fake_script(self, 'is-leader', 'echo true')
self.backend._leader_check_time = None
self.assertTrue(self.model.unit.is_leader())
def test_workload_version(self):
fake_script(self, 'application-version-set', 'exit 0')
self.model.unit.set_workload_version('1.2.3')
self.assertEqual(fake_script_calls(self), [['application-version-set', '--', '1.2.3']])
def test_workload_version_invalid(self):
fake_script(self, 'application-version-set', 'exit 0')
with self.assertRaises(TypeError) as cm:
self.model.unit.set_workload_version(5)
self.assertEqual(str(cm.exception), "workload version must be a str, not int: 5")
self.assertEqual(fake_script_calls(self), [])
def test_resources(self):
meta = ops.charm.CharmMeta()
meta.resources = {'foo': None, 'bar': None}
model = ops.model.Model('myapp/0', meta, self.backend)
with self.assertRaises(RuntimeError):
model.resources.fetch('qux')
fake_script(self, 'resource-get', 'exit 1')
with self.assertRaises(ops.model.ModelError):
model.resources.fetch('foo')
fake_script(self, 'resource-get',
'echo /var/lib/juju/agents/unit-test-0/resources/$1/$1.tgz')
self.assertEqual(model.resources.fetch('foo').name, 'foo.tgz')
self.assertEqual(model.resources.fetch('bar').name, 'bar.tgz')
def test_pod_spec(self):
fake_script(self, 'pod-spec-set', """
cat $2 > $(dirname $0)/spec.json
[[ -n $4 ]] && cat $4 > $(dirname $0)/k8s_res.json || true
""")
fake_script(self, 'is-leader', 'echo true')
spec_path = self.fake_script_path / 'spec.json'
k8s_res_path = self.fake_script_path / 'k8s_res.json'
def check_calls(calls):
# There may 1 or 2 calls because of is-leader.
self.assertLessEqual(len(fake_calls), 2)
pod_spec_call = next(filter(lambda c: c[0] == 'pod-spec-set', calls))
self.assertEqual(pod_spec_call[:2], ['pod-spec-set', '--file'])
# 8 bytes are used as of python 3.4.0, see Python bug #12015.
# Other characters are from POSIX 3.282 (Portable Filename
# Character Set) a subset of which Python's mkdtemp uses.
self.assertRegex(pod_spec_call[2], '.*/tmp[A-Za-z0-9._-]{8}-pod-spec-set')
self.model.pod.set_spec({'foo': 'bar'})
self.assertEqual(spec_path.read_text(), '{"foo": "bar"}')
self.assertFalse(k8s_res_path.exists())
fake_calls = fake_script_calls(self, clear=True)
check_calls(fake_calls)
self.model.pod.set_spec({'bar': 'foo'}, {'qux': 'baz'})
self.assertEqual(spec_path.read_text(), '{"bar": "foo"}')
self.assertEqual(k8s_res_path.read_text(), '{"qux": "baz"}')
fake_calls = fake_script_calls(self, clear=True)
check_calls(fake_calls)
# Create a new model to drop is-leader caching result.
self.backend = ops.model.ModelBackend()
meta = ops.charm.CharmMeta()
self.model = ops.model.Model('myapp/0', meta, self.backend)
fake_script(self, 'is-leader', 'echo false')
with self.assertRaises(ops.model.ModelError):
self.model.pod.set_spec({'foo': 'bar'})
def test_base_status_instance_raises(self):
with self.assertRaises(TypeError):
ops.model.StatusBase('test')
def test_active_message_default(self):
self.assertEqual(ops.model.ActiveStatus().message, '')
def test_local_set_valid_unit_status(self):
test_cases = [(
ops.model.ActiveStatus('Green'),
lambda: fake_script(self, 'status-set', 'exit 0'),
lambda: self.assertEqual(fake_script_calls(self, True),
[['status-set', '--application=False', 'active', 'Green']]),
), (
ops.model.MaintenanceStatus('Yellow'),
lambda: fake_script(self, 'status-set', 'exit 0'),
lambda: self.assertEqual(
fake_script_calls(self, True),
[['status-set', '--application=False', 'maintenance', 'Yellow']]),
), (
ops.model.BlockedStatus('Red'),
lambda: fake_script(self, 'status-set', 'exit 0'),
lambda: self.assertEqual(fake_script_calls(self, True),
[['status-set', '--application=False', 'blocked', 'Red']]),
), (
ops.model.WaitingStatus('White'),
lambda: fake_script(self, 'status-set', 'exit 0'),
lambda: self.assertEqual(fake_script_calls(self, True),
[['status-set', '--application=False', 'waiting', 'White']]),
)]
for target_status, setup_tools, check_tool_calls in test_cases:
setup_tools()
self.model.unit.status = target_status
self.assertEqual(self.model.unit.status, target_status)
check_tool_calls()
def test_local_set_valid_app_status(self):
fake_script(self, 'is-leader', 'echo true')
test_cases = [(
ops.model.ActiveStatus('Green'),
lambda: fake_script(self, 'status-set', 'exit 0'),
lambda: self.assertIn(['status-set', '--application=True', 'active', 'Green'],
fake_script_calls(self, True)),
), (
ops.model.MaintenanceStatus('Yellow'),
lambda: fake_script(self, 'status-set', 'exit 0'),
lambda: self.assertIn(['status-set', '--application=True', 'maintenance', 'Yellow'],
fake_script_calls(self, True)),
), (
ops.model.BlockedStatus('Red'),
lambda: fake_script(self, 'status-set', 'exit 0'),
lambda: self.assertIn(['status-set', '--application=True', 'blocked', 'Red'],
fake_script_calls(self, True)),
), (
ops.model.WaitingStatus('White'),
lambda: fake_script(self, 'status-set', 'exit 0'),
lambda: self.assertIn(['status-set', '--application=True', 'waiting', 'White'],
fake_script_calls(self, True)),
)]
for target_status, setup_tools, check_tool_calls in test_cases:
setup_tools()
self.model.app.status = target_status
self.assertEqual(self.model.app.status, target_status)
check_tool_calls()
def test_set_app_status_non_leader_raises(self):
fake_script(self, 'is-leader', 'echo false')
with self.assertRaises(RuntimeError):
self.model.app.status
with self.assertRaises(RuntimeError):
self.model.app.status = ops.model.ActiveStatus()
def test_local_set_invalid_status(self):
fake_script(self, 'status-set', 'exit 1')
fake_script(self, 'is-leader', 'echo true')
with self.assertRaises(ops.model.ModelError):
self.model.unit.status = ops.model.UnknownStatus()
self.assertEqual(fake_script_calls(self, True), [
['status-set', '--application=False', 'unknown', ''],
])
with self.assertRaises(ops.model.ModelError):
self.model.app.status = ops.model.UnknownStatus()
# A leadership check is needed for application status.
self.assertEqual(fake_script_calls(self, True), [
['is-leader', '--format=json'],
['status-set', '--application=True', 'unknown', ''],
])
def test_status_set_is_app_not_bool_raises(self):
self.backend = ops.model.ModelBackend()
for is_app_v in [None, 1, 2.0, 'a', b'beef', object]:
with self.assertRaises(TypeError):
self.backend.status_set(ops.model.ActiveStatus, is_app=is_app_v)
def test_remote_unit_status(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', '''
if [ "$2" = 4 ]; then
echo '["remoteapp1/0", "remoteapp1/1"]'
else
exit 2
fi
''')
remote_unit = next(filter(lambda u: u.name == 'remoteapp1/0',
self.model.get_relation('db1').units))
test_statuses = (
ops.model.UnknownStatus(),
ops.model.ActiveStatus('Green'),
ops.model.MaintenanceStatus('Yellow'),
ops.model.BlockedStatus('Red'),
ops.model.WaitingStatus('White'),
)
for target_status in test_statuses:
with self.assertRaises(RuntimeError):
remote_unit.status = target_status
def test_remote_app_status(self):
fake_script(self, 'relation-ids', """[ "$1" = db1 ] && echo '["db1:4"]' || echo '[]'""")
fake_script(self, 'relation-list', '''
if [ "$2" = 4 ]; then
echo '["remoteapp1/0", "remoteapp1/1"]'
else
exit 2
fi
''')
remoteapp1 = self.model.get_relation('db1').app
# Remote application status is always unknown.
self.assertIsInstance(remoteapp1.status, ops.model.UnknownStatus)
test_statuses = (
ops.model.UnknownStatus(),
ops.model.ActiveStatus(),
ops.model.MaintenanceStatus('Upgrading software'),
ops.model.BlockedStatus('Awaiting manual resolution'),
ops.model.WaitingStatus('Awaiting related app updates'),
)
for target_status in test_statuses:
with self.assertRaises(RuntimeError):
remoteapp1.status = target_status
self.assertEqual(fake_script_calls(self, clear=True), [
['relation-ids', 'db1', '--format=json'],
['relation-list', '-r', '4', '--format=json'],
])
def test_storage(self):
meta = ops.charm.CharmMeta()
meta.storages = {'disks': None, 'data': None}
self.model = ops.model.Model('myapp/0', meta, self.backend)
fake_script(self, 'storage-list', '''
if [ "$1" = disks ]; then
echo '["disks/0", "disks/1"]'
else
echo '[]'
fi
''')
fake_script(self, 'storage-get', '''
if [ "$2" = disks/0 ]; then
echo '"/var/srv/disks/0"'
elif [ "$2" = disks/1 ]; then
echo '"/var/srv/disks/1"'
else
exit 2
fi
''')
fake_script(self, 'storage-add', '')
self.assertEqual(len(self.model.storages), 2)
self.assertEqual(self.model.storages.keys(), meta.storages.keys())
self.assertIn('disks', self.model.storages)
test_cases = {
0: {'name': 'disks', 'location': pathlib.Path('/var/srv/disks/0')},
1: {'name': 'disks', 'location': pathlib.Path('/var/srv/disks/1')},
}
for storage in self.model.storages['disks']:
self.assertEqual(storage.name, 'disks')
self.assertIn(storage.id, test_cases)
self.assertEqual(storage.name, test_cases[storage.id]['name'])
self.assertEqual(storage.location, test_cases[storage.id]['location'])
self.assertEqual(fake_script_calls(self, clear=True), [
['storage-list', 'disks', '--format=json'],
['storage-get', '-s', 'disks/0', 'location', '--format=json'],
['storage-get', '-s', 'disks/1', 'location', '--format=json'],
])
self.assertSequenceEqual(self.model.storages['data'], [])
self.model.storages.request('data', count=3)
self.assertEqual(fake_script_calls(self), [
['storage-list', 'data', '--format=json'],
['storage-add', 'data=3'],
])
# Try to add storage not present in charm metadata.
with self.assertRaises(ops.model.ModelError):
self.model.storages.request('deadbeef')
# Invalid count parameter types.
for count_v in [None, False, 2.0, 'a', b'beef', object]:
with self.assertRaises(TypeError):
self.model.storages.request('data', count_v)
class TestModelBindings(unittest.TestCase):
def setUp(self):
def restore_env(env):
os.environ.clear()
os.environ.update(env)
self.addCleanup(restore_env, os.environ.copy())
os.environ['JUJU_UNIT_NAME'] = 'myapp/0'
meta = ops.charm.CharmMeta()
meta.relations = {
'db0': RelationMeta('provides', 'db0', {'interface': 'db0', 'scope': 'global'}),
'db1': RelationMeta('requires', 'db1', {'interface': 'db1', 'scope': 'global'}),
'db2': RelationMeta('peers', 'db2', {'interface': 'db2', 'scope': 'global'}),
}
self.backend = ops.model.ModelBackend()
self.model = ops.model.Model('myapp/0', meta, self.backend)
fake_script(self, 'relation-ids',
"""([ "$1" = db0 ] && echo '["db0:4"]') || echo '[]'""")
fake_script(self, 'relation-list', """[ "$2" = 4 ] && echo '["remoteapp1/0"]' || exit 2""")
self.network_get_out = '''{
"bind-addresses": [
{
"mac-address": "de:ad:be:ef:ca:fe",
"interface-name": "lo",
"addresses": [
{
"hostname": "",
"value": "192.0.2.2",
"cidr": "192.0.2.0/24"
},
{
"hostname": "deadbeef.example",
"value": "dead:beef::1",
"cidr": "dead:beef::/64"
}
]
},
{
"mac-address": "",
"interface-name": "tun",
"addresses": [
{
"hostname": "",
"value": "192.0.3.3",
"cidr": ""
},
{
"hostname": "",
"value": "2001:db8::3",
"cidr": ""
},
{
"hostname": "deadbeef.local",
"value": "fe80::1:1",
"cidr": "fe80::/64"
}
]
}
],
"egress-subnets": [
"192.0.2.2/32",
"192.0.3.0/24",
"dead:beef::/64",
"2001:db8::3/128"
],
"ingress-addresses": [
"192.0.2.2",
"192.0.3.3",
"dead:beef::1",
"2001:db8::3"
]
}'''
def _check_binding_data(self, binding_name, binding):
self.assertEqual(binding.name, binding_name)
self.assertEqual(binding.network.bind_address, ipaddress.ip_address('192.0.2.2'))
self.assertEqual(binding.network.ingress_address, ipaddress.ip_address('192.0.2.2'))
# /32 and /128 CIDRs are valid one-address networks for IPv{4,6}Network types respectively.
self.assertEqual(binding.network.egress_subnets, [ipaddress.ip_network('192.0.2.2/32'),
ipaddress.ip_network('192.0.3.0/24'),
ipaddress.ip_network('dead:beef::/64'),
ipaddress.ip_network('2001:db8::3/128')])
for (i, (name, address, subnet)) in enumerate([
('lo', '192.0.2.2', '192.0.2.0/24'),
('lo', 'dead:beef::1', 'dead:beef::/64'),
('tun', '192.0.3.3', '192.0.3.3/32'),
('tun', '2001:db8::3', '2001:db8::3/128'),
('tun', 'fe80::1:1', 'fe80::/64')]):
self.assertEqual(binding.network.interfaces[i].name, name)
self.assertEqual(binding.network.interfaces[i].address, ipaddress.ip_address(address))
self.assertEqual(binding.network.interfaces[i].subnet, ipaddress.ip_network(subnet))
def test_invalid_keys(self):
# Basic validation for passing invalid keys.
for name in (object, 0):
with self.assertRaises(ops.model.ModelError):
self.model.get_binding(name)
def test_dead_relations(self):
fake_script(
self,
'network-get',
'''
if [ "$1" = db0 ] && [ "$2" = --format=json ]; then
echo '{}'
else
echo ERROR invalid value "$2" for option -r: relation not found >&2
exit 2
fi
'''.format(self.network_get_out))
# Validate the behavior for dead relations.
binding = ops.model.Binding('db0', 42, self.model._backend)
self.assertEqual(binding.network.bind_address, ipaddress.ip_address('192.0.2.2'))
self.assertEqual(fake_script_calls(self, clear=True), [
['network-get', 'db0', '-r', '42', '--format=json'],
['network-get', 'db0', '--format=json'],
])
def test_binding_by_relation_name(self):
fake_script(self, 'network-get',
'''[ "$1" = db0 ] && echo '{}' || exit 1'''.format(self.network_get_out))
binding_name = 'db0'
expected_calls = [['network-get', 'db0', '--format=json']]
binding = self.model.get_binding(binding_name)
self._check_binding_data(binding_name, binding)
self.assertEqual(fake_script_calls(self, clear=True), expected_calls)
def test_binding_by_relation(self):
fake_script(self, 'network-get',
'''[ "$1" = db0 ] && echo '{}' || exit 1'''.format(self.network_get_out))
binding_name = 'db0'
expected_calls = [
['relation-ids', 'db0', '--format=json'],
# The two invocations below are due to the get_relation call.
['relation-list', '-r', '4', '--format=json'],
['network-get', 'db0', '-r', '4', '--format=json'],
]
binding = self.model.get_binding(self.model.get_relation(binding_name))
self._check_binding_data(binding_name, binding)
self.assertEqual(fake_script_calls(self, clear=True), expected_calls)
class TestModelBackend(unittest.TestCase):
def setUp(self):
os.environ['JUJU_UNIT_NAME'] = 'myapp/0'
self.addCleanup(os.environ.pop, 'JUJU_UNIT_NAME')
self._backend = None
@property
def backend(self):
if self._backend is None:
self._backend = ops.model.ModelBackend()
return self._backend
def test_relation_tool_errors(self):
err_msg = 'ERROR invalid value "$2" for option -r: relation not found'
test_cases = [(
lambda: fake_script(self, 'relation-list', 'echo fooerror >&2 ; exit 1'),
lambda: self.backend.relation_list(3),
ops.model.ModelError,
[['relation-list', '-r', '3', '--format=json']],
), (
lambda: fake_script(self, 'relation-list', 'echo {} >&2 ; exit 2'.format(err_msg)),
lambda: self.backend.relation_list(3),
ops.model.RelationNotFoundError,
[['relation-list', '-r', '3', '--format=json']],
), (
lambda: fake_script(self, 'relation-set', 'echo fooerror >&2 ; exit 1'),
lambda: self.backend.relation_set(3, 'foo', 'bar', is_app=False),
ops.model.ModelError,
[['relation-set', '-r', '3', 'foo=bar', '--app=False']],
), (
lambda: fake_script(self, 'relation-set', 'echo {} >&2 ; exit 2'.format(err_msg)),
lambda: self.backend.relation_set(3, 'foo', 'bar', is_app=False),
ops.model.RelationNotFoundError,
[['relation-set', '-r', '3', 'foo=bar', '--app=False']],
), (
lambda: fake_script(self, 'relation-get', 'echo fooerror >&2 ; exit 1'),
lambda: self.backend.relation_get(3, 'remote/0', is_app=False),
ops.model.ModelError,
[['relation-get', '-r', '3', '-', 'remote/0', '--app=False', '--format=json']],
), (
lambda: fake_script(self, 'relation-get', 'echo {} >&2 ; exit 2'.format(err_msg)),
lambda: self.backend.relation_get(3, 'remote/0', is_app=False),
ops.model.RelationNotFoundError,
[['relation-get', '-r', '3', '-', 'remote/0', '--app=False', '--format=json']],
)]
for do_fake, run, exception, calls in test_cases:
do_fake()
with self.assertRaises(exception):
run()
self.assertEqual(fake_script_calls(self, clear=True), calls)
def test_status_is_app_forced_kwargs(self):
fake_script(self, 'status-get', 'exit 1')
fake_script(self, 'status-set', 'exit 1')
test_cases = (
lambda: self.backend.status_get(False),
lambda: self.backend.status_get(True),
lambda: self.backend.status_set('active', '', False),
lambda: self.backend.status_set('active', '', True),
)
for case in test_cases:
with self.assertRaises(TypeError):
case()
def test_storage_tool_errors(self):
test_cases = [(
lambda: fake_script(self, 'storage-list', 'echo fooerror >&2 ; exit 1'),
lambda: self.backend.storage_list('foobar'),
ops.model.ModelError,
[['storage-list', 'foobar', '--format=json']],
), (
lambda: fake_script(self, 'storage-get', 'echo fooerror >&2 ; exit 1'),
lambda: self.backend.storage_get('foobar', 'someattr'),
ops.model.ModelError,
[['storage-get', '-s', 'foobar', 'someattr', '--format=json']],
), (
lambda: fake_script(self, 'storage-add', 'echo fooerror >&2 ; exit 1'),
lambda: self.backend.storage_add('foobar', count=2),
ops.model.ModelError,
[['storage-add', 'foobar=2']],
), (
lambda: fake_script(self, 'storage-add', 'echo fooerror >&2 ; exit 1'),
lambda: self.backend.storage_add('foobar', count=object),
TypeError,
[],
), (
lambda: fake_script(self, 'storage-add', 'echo fooerror >&2 ; exit 1'),
lambda: self.backend.storage_add('foobar', count=True),
TypeError,
[],
)]
for do_fake, run, exception, calls in test_cases:
do_fake()
with self.assertRaises(exception):
run()
self.assertEqual(fake_script_calls(self, clear=True), calls)
def test_network_get(self):
network_get_out = '''{
"bind-addresses": [
{
"mac-address": "",
"interface-name": "",
"addresses": [
{
"hostname": "",
"value": "192.0.2.2",
"cidr": ""
}
]
}
],
"egress-subnets": [
"192.0.2.2/32"
],
"ingress-addresses": [
"192.0.2.2"
]
}'''
fake_script(self, 'network-get',
'''[ "$1" = deadbeef ] && echo '{}' || exit 1'''.format(network_get_out))
network_info = self.backend.network_get('deadbeef')
self.assertEqual(network_info, json.loads(network_get_out))
self.assertEqual(fake_script_calls(self, clear=True),
[['network-get', 'deadbeef', '--format=json']])
network_info = self.backend.network_get('deadbeef', 1)
self.assertEqual(network_info, json.loads(network_get_out))
self.assertEqual(fake_script_calls(self, clear=True),
[['network-get', 'deadbeef', '-r', '1', '--format=json']])
def test_network_get_errors(self):
err_no_endpoint = 'ERROR no network config found for binding "$2"'
err_no_rel = 'ERROR invalid value "$3" for option -r: relation not found'
test_cases = [(
lambda: fake_script(self, 'network-get',
'echo {} >&2 ; exit 1'.format(err_no_endpoint)),
lambda: self.backend.network_get("deadbeef"),
ops.model.ModelError,
[['network-get', 'deadbeef', '--format=json']],
), (
lambda: fake_script(self, 'network-get', 'echo {} >&2 ; exit 2'.format(err_no_rel)),
lambda: self.backend.network_get("deadbeef", 3),
ops.model.RelationNotFoundError,
[['network-get', 'deadbeef', '-r', '3', '--format=json']],
)]
for do_fake, run, exception, calls in test_cases:
do_fake()
with self.assertRaises(exception):
run()
self.assertEqual(fake_script_calls(self, clear=True), calls)
def test_action_get_error(self):
fake_script(self, 'action-get', '')
fake_script(self, 'action-get', 'echo fooerror >&2 ; exit 1')
with self.assertRaises(ops.model.ModelError):
self.backend.action_get()
calls = [['action-get', '--format=json']]
self.assertEqual(fake_script_calls(self, clear=True), calls)
def test_action_set_error(self):
fake_script(self, 'action-get', '')
fake_script(self, 'action-set', 'echo fooerror >&2 ; exit 1')
with self.assertRaises(ops.model.ModelError):
self.backend.action_set(OrderedDict([('foo', 'bar'), ('dead', 'beef cafe')]))
calls = [["action-set", "foo=bar", "dead=beef cafe"]]
self.assertEqual(fake_script_calls(self, clear=True), calls)
def test_action_log_error(self):
fake_script(self, 'action-get', '')
fake_script(self, 'action-log', 'echo fooerror >&2 ; exit 1')
with self.assertRaises(ops.model.ModelError):
self.backend.action_log('log-message')
calls = [["action-log", "log-message"]]
self.assertEqual(fake_script_calls(self, clear=True), calls)
def test_action_get(self):
fake_script(self, 'action-get', """echo '{"foo-name": "bar", "silent": false}'""")
params = self.backend.action_get()
self.assertEqual(params['foo-name'], 'bar')
self.assertEqual(params['silent'], False)
self.assertEqual(fake_script_calls(self), [['action-get', '--format=json']])
def test_action_set(self):
fake_script(self, 'action-get', 'exit 1')
fake_script(self, 'action-set', 'exit 0')
self.backend.action_set(OrderedDict([('x', 'dead beef'), ('y', 1)]))
self.assertEqual(fake_script_calls(self), [['action-set', 'x=dead beef', 'y=1']])
def test_action_fail(self):
fake_script(self, 'action-get', 'exit 1')
fake_script(self, 'action-fail', 'exit 0')
self.backend.action_fail('error 42')
self.assertEqual(fake_script_calls(self), [['action-fail', 'error 42']])
def test_action_log(self):
fake_script(self, 'action-get', 'exit 1')
fake_script(self, 'action-log', 'exit 0')
self.backend.action_log('progress: 42%')
self.assertEqual(fake_script_calls(self), [['action-log', 'progress: 42%']])
def test_application_version_set(self):
fake_script(self, 'application-version-set', 'exit 0')
self.backend.application_version_set('1.2b3')
self.assertEqual(fake_script_calls(self), [['application-version-set', '--', '1.2b3']])
def test_application_version_set_invalid(self):
fake_script(self, 'application-version-set', 'exit 0')
with self.assertRaises(TypeError):
self.backend.application_version_set(2)
with self.assertRaises(TypeError):
self.backend.application_version_set()
self.assertEqual(fake_script_calls(self), [])
def test_juju_log(self):
fake_script(self, 'juju-log', 'exit 0')
self.backend.juju_log('WARNING', 'foo')
self.assertEqual(fake_script_calls(self, clear=True),
[['juju-log', '--log-level', 'WARNING', 'foo']])
with self.assertRaises(TypeError):
self.backend.juju_log('DEBUG')
self.assertEqual(fake_script_calls(self, clear=True), [])
fake_script(self, 'juju-log', 'exit 1')
with self.assertRaises(ops.model.ModelError):
self.backend.juju_log('BAR', 'foo')
self.assertEqual(fake_script_calls(self, clear=True),
[['juju-log', '--log-level', 'BAR', 'foo']])
def test_valid_metrics(self):
fake_script(self, 'add-metric', 'exit 0')
test_cases = [(
OrderedDict([('foo', 42), ('b-ar', 4.5), ('ba_-z', 4.5), ('a', 1)]),
OrderedDict([('de', 'ad'), ('be', 'ef_ -')]),
[['add-metric', '--labels', 'de=ad,be=ef_ -',
'foo=42', 'b-ar=4.5', 'ba_-z=4.5', 'a=1']]
), (
OrderedDict([('foo1', 0), ('b2r', 4.5)]),
OrderedDict([('d3', 'aд'), ('b33f', '3_ -')]),
[['add-metric', '--labels', 'd3=aд,b33f=3_ -', 'foo1=0', 'b2r=4.5']],
)]
for metrics, labels, expected_calls in test_cases:
self.backend.add_metrics(metrics, labels)
self.assertEqual(fake_script_calls(self, clear=True), expected_calls)
def test_invalid_metric_names(self):
invalid_inputs = [
({'': 4.2}, {}),
({'1': 4.2}, {}),
({'1': -4.2}, {}),
({'123': 4.2}, {}),
({'1foo': 4.2}, {}),
({'-foo': 4.2}, {}),
({'_foo': 4.2}, {}),
({'foo-': 4.2}, {}),
({'foo_': 4.2}, {}),
({'a-': 4.2}, {}),
({'a_': 4.2}, {}),
({'BAЯ': 4.2}, {}),
]
for metrics, labels in invalid_inputs:
with self.assertRaises(ops.model.ModelError):
self.backend.add_metrics(metrics, labels)
def test_invalid_metric_values(self):
invalid_inputs = [
({'a': float('+inf')}, {}),
({'a': float('-inf')}, {}),
({'a': float('nan')}, {}),
({'foo': 'bar'}, {}),
({'foo': '1O'}, {}),
]
for metrics, labels in invalid_inputs:
with self.assertRaises(ops.model.ModelError):
self.backend.add_metrics(metrics, labels)
def test_invalid_metric_labels(self):
invalid_inputs = [
({'foo': 4.2}, {'': 'baz'}),
({'foo': 4.2}, {',bar': 'baz'}),
({'foo': 4.2}, {'b=a=r': 'baz'}),
({'foo': 4.2}, {'BAЯ': 'baz'}),
]
for metrics, labels in invalid_inputs:
with self.assertRaises(ops.model.ModelError):
self.backend.add_metrics(metrics, labels)
def test_invalid_metric_label_values(self):
invalid_inputs = [
({'foo': 4.2}, {'bar': ''}),
({'foo': 4.2}, {'bar': 'b,az'}),
({'foo': 4.2}, {'bar': 'b=az'}),
]
for metrics, labels in invalid_inputs:
with self.assertRaises(ops.model.ModelError):
self.backend.add_metrics(metrics, labels)
class TestLazyMapping(unittest.TestCase):
def test_invalidate(self):
loaded = []
class MyLazyMap(ops.model.LazyMapping):
def _load(self):
loaded.append(1)
return {'foo': 'bar'}
map = MyLazyMap()
self.assertEqual(map['foo'], 'bar')
self.assertEqual(loaded, [1])
self.assertEqual(map['foo'], 'bar')
self.assertEqual(loaded, [1])
map._invalidate()
self.assertEqual(map['foo'], 'bar')
self.assertEqual(loaded, [1, 1])
if __name__ == "__main__":
unittest.main()
#!/usr/bin/python3
# Copyright 2019 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import pathlib
import shutil
import sys
import tempfile
import textwrap
import unittest
from ops.charm import (
CharmBase,
RelationEvent,
)
from ops.framework import (
Object,
)
from ops.model import (
ModelError,
RelationNotFoundError,
)
from ops.testing import Harness
class TestHarness(unittest.TestCase):
def test_add_relation(self):
harness = Harness(CharmBase, meta='''
name: test-app
requires:
db:
interface: pgsql
''')
rel_id = harness.add_relation('db', 'postgresql')
self.assertIsInstance(rel_id, int)
backend = harness._backend
self.assertEqual(backend.relation_ids('db'), [rel_id])
self.assertEqual(backend.relation_list(rel_id), [])
# Make sure the initial data bags for our app and unit are empty.
self.assertEqual(backend.relation_get(rel_id, 'test-app', is_app=True), {})
self.assertEqual(backend.relation_get(rel_id, 'test-app/0', is_app=False), {})
def test_add_relation_and_unit(self):
harness = Harness(CharmBase, meta='''
name: test-app
requires:
db:
interface: pgsql
''')
rel_id = harness.add_relation('db', 'postgresql')
self.assertIsInstance(rel_id, int)
harness.add_relation_unit(rel_id, 'postgresql/0')
harness.update_relation_data(rel_id, 'postgresql/0', {'foo': 'bar'})
backend = harness._backend
self.assertEqual(backend.relation_ids('db'), [rel_id])
self.assertEqual(backend.relation_list(rel_id), ['postgresql/0'])
self.assertEqual(
backend.relation_get(rel_id, 'postgresql/0', is_app=False),
{'foo': 'bar'})
def test_add_relation_with_remote_app_data(self):
# language=YAML
harness = Harness(CharmBase, meta='''
name: test-app
requires:
db:
interface: pgsql
''')
remote_app = 'postgresql'
rel_id = harness.add_relation('db', remote_app)
harness.update_relation_data(rel_id, 'postgresql', {'app': 'data'})
self.assertIsInstance(rel_id, int)
backend = harness._backend
self.assertEqual([rel_id], backend.relation_ids('db'))
self.assertEqual({'app': 'data'}, backend.relation_get(rel_id, remote_app, is_app=True))
def test_add_relation_with_our_initial_data(self):
class InitialDataTester(CharmBase):
"""Record the relation-changed events."""
def __init__(self, framework, charm_name):
super().__init__(framework, charm_name)
self.observed_events = []
self.framework.observe(self.on.db_relation_changed, self._on_db_relation_changed)
def _on_db_relation_changed(self, event):
self.observed_events.append(event)
# language=YAML
harness = Harness(InitialDataTester, meta='''
name: test-app
requires:
db:
interface: pgsql
''')
rel_id = harness.add_relation('db', 'postgresql')
harness.update_relation_data(rel_id, 'test-app', {'k': 'v1'})
harness.update_relation_data(rel_id, 'test-app/0', {'ingress-address': '192.0.2.1'})
backend = harness._backend
self.assertEqual({'k': 'v1'}, backend.relation_get(rel_id, 'test-app', is_app=True))
self.assertEqual({'ingress-address': '192.0.2.1'},
backend.relation_get(rel_id, 'test-app/0', is_app=False))
harness.begin()
self.assertEqual({'k': 'v1'}, backend.relation_get(rel_id, 'test-app', is_app=True))
self.assertEqual({'ingress-address': '192.0.2.1'},
backend.relation_get(rel_id, 'test-app/0', is_app=False))
# Make sure no relation-changed events are emitted for our own data bags.
self.assertEqual([], harness.charm.observed_events)
# A remote unit can still update our app relation data bag since our unit is not a leader.
harness.update_relation_data(rel_id, 'test-app', {'k': 'v2'})
# And we get an event
self.assertEqual([], harness.charm.observed_events)
# We can also update our own relation data, even if it is a bit 'cheaty'
harness.update_relation_data(rel_id, 'test-app/0', {'ingress-address': '192.0.2.2'})
# But no event happens
# Updating our data app relation data bag and our unit data bag does not generate events.
harness.set_leader(True)
harness.update_relation_data(rel_id, 'test-app', {'k': 'v3'})
harness.update_relation_data(rel_id, 'test-app/0', {'ingress-address': '192.0.2.2'})
self.assertEqual([], harness.charm.observed_events)
def test_add_peer_relation_with_initial_data_leader(self):
class InitialDataTester(CharmBase):
"""Record the relation-changed events."""
def __init__(self, framework, charm_name):
super().__init__(framework, charm_name)
self.observed_events = []
self.framework.observe(self.on.cluster_relation_changed,
self._on_cluster_relation_changed)
def _on_cluster_relation_changed(self, event):
self.observed_events.append(event)
# language=YAML
harness = Harness(InitialDataTester, meta='''
name: test-app
peers:
cluster:
interface: cluster
''')
# TODO: dmitriis 2020-04-07 test a minion unit and initial peer relation app data
# events when the harness begins to emit events for initial data.
harness.set_leader(is_leader=True)
rel_id = harness.add_relation('cluster', 'test-app')
harness.update_relation_data(rel_id, 'test-app', {'k': 'v'})
harness.update_relation_data(rel_id, 'test-app/0', {'ingress-address': '192.0.2.1'})
backend = harness._backend
self.assertEqual({'k': 'v'}, backend.relation_get(rel_id, 'test-app', is_app=True))
self.assertEqual({'ingress-address': '192.0.2.1'},
backend.relation_get(rel_id, 'test-app/0', is_app=False))
harness.begin()
self.assertEqual({'k': 'v'}, backend.relation_get(rel_id, 'test-app', is_app=True))
self.assertEqual({'ingress-address': '192.0.2.1'},
backend.relation_get(rel_id, 'test-app/0', is_app=False))
# Make sure no relation-changed events are emitted for our own data bags.
self.assertEqual([], harness.charm.observed_events)
# Updating our app relation data bag and our unit data bag does not trigger events
harness.update_relation_data(rel_id, 'test-app', {'k': 'v2'})
harness.update_relation_data(rel_id, 'test-app/0', {'ingress-address': '192.0.2.2'})
self.assertEqual([], harness.charm.observed_events)
# If our unit becomes a minion, updating app relation data indirectly becomes possible
# and our charm gets notifications.
harness.set_leader(False)
harness.update_relation_data(rel_id, 'test-app', {'k': 'v3'})
self.assertEqual({'k': 'v3'}, backend.relation_get(rel_id, 'test-app', is_app=True))
self.assertTrue(len(harness.charm.observed_events), 1)
self.assertIsInstance(harness.charm.observed_events[0], RelationEvent)
def test_relation_events(self):
harness = Harness(RelationEventCharm, meta='''
name: test-app
requires:
db:
interface: pgsql
''')
harness.begin()
harness.charm.observe_relation_events('db')
self.assertEqual(harness.charm.get_changes(), [])
rel_id = harness.add_relation('db', 'postgresql')
self.assertEqual(
harness.charm.get_changes(),
[{'name': 'relation-created',
'data': {
'app': 'postgresql',
'unit': None,
'relation_id': rel_id,
}}])
harness.add_relation_unit(rel_id, 'postgresql/0')
self.assertEqual(
harness.charm.get_changes(),
[{'name': 'relation-joined',
'data': {
'app': 'postgresql',
'unit': 'postgresql/0',
'relation_id': rel_id,
}}])
harness.update_relation_data(rel_id, 'postgresql', {'foo': 'bar'})
self.assertEqual(
harness.charm.get_changes(),
[{'name': 'relation-changed',
'data': {
'app': 'postgresql',
'unit': None,
'relation_id': rel_id,
}}])
harness.update_relation_data(rel_id, 'postgresql/0', {'baz': 'bing'})
self.assertEqual(
harness.charm.get_changes(),
[{'name': 'relation-changed',
'data': {
'app': 'postgresql',
'unit': 'postgresql/0',
'relation_id': rel_id,
}}])
def test_get_relation_data(self):
harness = Harness(CharmBase, meta='''
name: test-app
requires:
db:
interface: pgsql
''')
rel_id = harness.add_relation('db', 'postgresql')
harness.update_relation_data(rel_id, 'postgresql', {'remote': 'data'})
self.assertEqual(harness.get_relation_data(rel_id, 'test-app'), {})
self.assertEqual(harness.get_relation_data(rel_id, 'test-app/0'), {})
self.assertEqual(harness.get_relation_data(rel_id, 'test-app/1'), None)
self.assertEqual(harness.get_relation_data(rel_id, 'postgresql'), {'remote': 'data'})
with self.assertRaises(KeyError):
# unknown relation id
harness.get_relation_data(99, 'postgresql')
def test_create_harness_twice(self):
metadata = '''
name: my-charm
requires:
db:
interface: pgsql
'''
harness1 = Harness(CharmBase, meta=metadata)
harness2 = Harness(CharmBase, meta=metadata)
harness1.begin()
harness2.begin()
helper1 = DBRelationChangedHelper(harness1.charm, "helper1")
helper2 = DBRelationChangedHelper(harness2.charm, "helper2")
rel_id = harness2.add_relation('db', 'postgresql')
harness2.update_relation_data(rel_id, 'postgresql', {'key': 'value'})
# Helper2 should see the event triggered by harness2, but helper1 should see no events.
self.assertEqual(helper1.changes, [])
self.assertEqual(helper2.changes, [(rel_id, 'postgresql')])
def test_begin_twice(self):
# language=YAML
harness = Harness(CharmBase, meta='''
name: test-app
requires:
db:
interface: pgsql
''')
harness.begin()
with self.assertRaises(RuntimeError):
harness.begin()
def test_update_relation_exposes_new_data(self):
harness = Harness(CharmBase, meta='''
name: my-charm
requires:
db:
interface: pgsql
''')
harness.begin()
viewer = RelationChangedViewer(harness.charm, 'db')
rel_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(rel_id, 'postgresql/0')
harness.update_relation_data(rel_id, 'postgresql/0', {'initial': 'data'})
self.assertEqual(viewer.changes, [{'initial': 'data'}])
harness.update_relation_data(rel_id, 'postgresql/0', {'new': 'value'})
self.assertEqual(viewer.changes, [{'initial': 'data'},
{'initial': 'data', 'new': 'value'}])
def test_update_relation_no_local_unit_change_event(self):
# language=YAML
harness = Harness(CharmBase, meta='''
name: my-charm
requires:
db:
interface: pgsql
''')
harness.begin()
helper = DBRelationChangedHelper(harness.charm, "helper")
rel_id = harness.add_relation('db', 'postgresql')
rel = harness.charm.model.get_relation('db')
rel.data[harness.charm.model.unit]['key'] = 'value'
# there should be no event for updating our own data
harness.update_relation_data(rel_id, 'my-charm/0', {'new': 'other'})
# but the data will be updated.
self.assertEqual({'key': 'value', 'new': 'other'}, rel.data[harness.charm.model.unit])
rel.data[harness.charm.model.unit]['new'] = 'value'
# Our unit data bag got updated.
self.assertEqual(rel.data[harness.charm.model.unit]['new'], 'value')
# But there were no changed events registered by our unit.
self.assertEqual([], helper.changes)
def test_update_peer_relation_no_local_unit_change_event(self):
# language=YAML
harness = Harness(CharmBase, meta='''
name: postgresql
peers:
db:
interface: pgsql
''')
harness.begin()
helper = DBRelationChangedHelper(harness.charm, "helper")
rel_id = harness.add_relation('db', 'postgresql')
rel = harness.charm.model.get_relation('db')
rel.data[harness.charm.model.unit]['key'] = 'value'
rel = harness.charm.model.get_relation('db')
harness.update_relation_data(rel_id, 'postgresql/0', {'key': 'v1'})
self.assertEqual({'key': 'v1'}, rel.data[harness.charm.model.unit])
# Make sure there was no event
self.assertEqual([], helper.changes)
rel.data[harness.charm.model.unit]['key'] = 'v2'
# Our unit data bag got updated.
self.assertEqual({'key': 'v2'}, dict(rel.data[harness.charm.model.unit]))
# But there were no changed events registered by our unit.
self.assertEqual([], helper.changes)
# Same for when our unit is a leader.
harness.set_leader(is_leader=True)
harness.update_relation_data(rel_id, 'postgresql/0', {'key': 'v3'})
self.assertEqual({'key': 'v3'}, dict(rel.data[harness.charm.model.unit]))
self.assertEqual([], helper.changes)
rel.data[harness.charm.model.unit]['key'] = 'v4'
self.assertEqual(rel.data[harness.charm.model.unit]['key'], 'v4')
self.assertEqual([], helper.changes)
def test_update_peer_relation_app_data(self):
# language=YAML
harness = Harness(CharmBase, meta='''
name: postgresql
peers:
db:
interface: pgsql
''')
harness.begin()
harness.set_leader(is_leader=True)
helper = DBRelationChangedHelper(harness.charm, "helper")
rel_id = harness.add_relation('db', 'postgresql')
rel = harness.charm.model.get_relation('db')
rel.data[harness.charm.app]['key'] = 'value'
harness.update_relation_data(rel_id, 'postgresql', {'key': 'v1'})
self.assertEqual({'key': 'v1'}, rel.data[harness.charm.app])
self.assertEqual([], helper.changes)
rel.data[harness.charm.app]['key'] = 'v2'
# Our unit data bag got updated.
self.assertEqual(rel.data[harness.charm.model.app]['key'], 'v2')
# But there were no changed events registered by our unit.
self.assertEqual([], helper.changes)
# If our unit is not a leader unit we get an update about peer app relation data changes.
harness.set_leader(is_leader=False)
harness.update_relation_data(rel_id, 'postgresql', {'k2': 'v2'})
self.assertEqual(rel.data[harness.charm.model.app]['k2'], 'v2')
self.assertEqual(helper.changes, [(0, 'postgresql')])
def test_update_relation_no_local_app_change_event(self):
# language=YAML
harness = Harness(CharmBase, meta='''
name: my-charm
requires:
db:
interface: pgsql
''')
harness.begin()
harness.set_leader(False)
helper = DBRelationChangedHelper(harness.charm, "helper")
rel_id = harness.add_relation('db', 'postgresql')
# TODO: remove this as soon as https://github.com/canonical/operator/issues/175 is fixed.
harness.add_relation_unit(rel_id, 'postgresql/0')
self.assertEqual(helper.changes, [])
harness.update_relation_data(rel_id, 'my-charm', {'new': 'value'})
rel = harness.charm.model.get_relation('db')
self.assertEqual(rel.data[harness.charm.app]['new'], 'value')
# Our app data bag got updated.
self.assertEqual(rel.data[harness.charm.model.app]['new'], 'value')
# But there were no changed events registered by our unit.
self.assertEqual(helper.changes, [])
def test_update_relation_remove_data(self):
harness = Harness(CharmBase, meta='''
name: my-charm
requires:
db:
interface: pgsql
''')
harness.begin()
viewer = RelationChangedViewer(harness.charm, 'db')
rel_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(rel_id, 'postgresql/0')
harness.update_relation_data(rel_id, 'postgresql/0', {'initial': 'data'})
harness.update_relation_data(rel_id, 'postgresql/0', {'initial': ''})
self.assertEqual(viewer.changes, [{'initial': 'data'}, {}])
def test_update_config(self):
harness = Harness(RecordingCharm)
harness.begin()
harness.update_config(key_values={'a': 'foo', 'b': 2})
self.assertEqual(
harness.charm.changes,
[{'name': 'config', 'data': {'a': 'foo', 'b': 2}}])
harness.update_config(key_values={'b': 3})
self.assertEqual(
harness.charm.changes,
[{'name': 'config', 'data': {'a': 'foo', 'b': 2}},
{'name': 'config', 'data': {'a': 'foo', 'b': 3}}])
# you can set config values to the empty string, you can use unset to actually remove items
harness.update_config(key_values={'a': ''}, unset=set('b'))
self.assertEqual(
harness.charm.changes,
[{'name': 'config', 'data': {'a': 'foo', 'b': 2}},
{'name': 'config', 'data': {'a': 'foo', 'b': 3}},
{'name': 'config', 'data': {'a': ''}},
])
def test_set_leader(self):
harness = Harness(RecordingCharm)
# No event happens here
harness.set_leader(False)
harness.begin()
self.assertFalse(harness.charm.model.unit.is_leader())
harness.set_leader(True)
self.assertEqual(harness.charm.get_changes(reset=True), [{'name': 'leader-elected'}])
self.assertTrue(harness.charm.model.unit.is_leader())
harness.set_leader(False)
self.assertFalse(harness.charm.model.unit.is_leader())
# No hook event when you lose leadership.
# TODO: verify if Juju always triggers `leader-settings-changed` if you
# lose leadership.
self.assertEqual(harness.charm.get_changes(reset=True), [])
harness.disable_hooks()
harness.set_leader(True)
# No hook event if you have disabled them
self.assertEqual(harness.charm.get_changes(reset=True), [])
def test_relation_set_app_not_leader(self):
harness = Harness(RecordingCharm, meta='''
name: test-charm
requires:
db:
interface: pgsql
''')
harness.begin()
harness.set_leader(False)
rel_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(rel_id, 'postgresql/0')
rel = harness.charm.model.get_relation('db')
with self.assertRaises(ModelError):
rel.data[harness.charm.app]['foo'] = 'bar'
# The data has not actually been changed
self.assertEqual(harness.get_relation_data(rel_id, 'test-charm'), {})
harness.set_leader(True)
rel.data[harness.charm.app]['foo'] = 'bar'
self.assertEqual(harness.get_relation_data(rel_id, 'test-charm'), {'foo': 'bar'})
def test_hooks_enabled_and_disabled(self):
harness = Harness(RecordingCharm, meta='''
name: test-charm
''')
# Before begin() there are no events.
harness.update_config({'value': 'first'})
# By default, after begin the charm is set up to receive events.
harness.begin()
harness.update_config({'value': 'second'})
self.assertEqual(
harness.charm.get_changes(reset=True),
[{'name': 'config', 'data': {'value': 'second'}}])
# Once disabled, we won't see config-changed when we make an update
harness.disable_hooks()
harness.update_config({'third': '3'})
self.assertEqual(harness.charm.get_changes(reset=True), [])
harness.enable_hooks()
harness.update_config({'value': 'fourth'})
self.assertEqual(
harness.charm.get_changes(reset=True),
[{'name': 'config', 'data': {'value': 'fourth', 'third': '3'}}])
def test_metadata_from_directory(self):
tmp = pathlib.Path(tempfile.mkdtemp())
self.addCleanup(shutil.rmtree, str(tmp))
metadata_filename = tmp / 'metadata.yaml'
with metadata_filename.open('wt') as metadata:
metadata.write(textwrap.dedent('''
name: my-charm
requires:
db:
interface: pgsql
'''))
harness = self._get_dummy_charm_harness(tmp)
harness.begin()
self.assertEqual(list(harness.model.relations), ['db'])
# The charm_dir also gets set
self.assertEqual(harness.framework.charm_dir, tmp)
def test_actions_from_directory(self):
tmp = pathlib.Path(tempfile.mkdtemp())
self.addCleanup(shutil.rmtree, str(tmp))
actions_filename = tmp / 'actions.yaml'
with actions_filename.open('wt') as actions:
actions.write(textwrap.dedent('''
test:
description: a dummy action
'''))
harness = self._get_dummy_charm_harness(tmp)
harness.begin()
self.assertEqual(list(harness.framework.meta.actions), ['test'])
# The charm_dir also gets set
self.assertEqual(harness.framework.charm_dir, tmp)
def _get_dummy_charm_harness(self, tmp):
self._write_dummy_charm(tmp)
charm_mod = importlib.import_module('charm')
harness = Harness(charm_mod.MyTestingCharm)
return harness
def _write_dummy_charm(self, tmp):
srcdir = tmp / 'src'
srcdir.mkdir(0o755)
charm_filename = srcdir / 'charm.py'
with charm_filename.open('wt') as charmpy:
# language=Python
charmpy.write(textwrap.dedent('''
from ops.charm import CharmBase
class MyTestingCharm(CharmBase):
pass
'''))
orig = sys.path[:]
sys.path.append(str(srcdir))
def cleanup():
sys.path = orig
sys.modules.pop('charm')
self.addCleanup(cleanup)
def test_actions_passed_in(self):
harness = Harness(
CharmBase,
meta='''
name: test-app
''',
actions='''
test-action:
description: a dummy test action
''')
self.assertEqual(list(harness.framework.meta.actions), ['test-action'])
def test_relation_set_deletes(self):
harness = Harness(CharmBase, meta='''
name: test-charm
requires:
db:
interface: pgsql
''')
harness.begin()
harness.set_leader(False)
rel_id = harness.add_relation('db', 'postgresql')
harness.update_relation_data(rel_id, 'test-charm/0', {'foo': 'bar'})
harness.add_relation_unit(rel_id, 'postgresql/0')
rel = harness.charm.model.get_relation('db', rel_id)
del rel.data[harness.charm.model.unit]['foo']
self.assertEqual({}, harness.get_relation_data(rel_id, 'test-charm/0'))
def test_set_workload_version(self):
harness = Harness(CharmBase, meta='''
name: app
''')
harness.begin()
self.assertIsNone(harness.get_workload_version())
harness.charm.model.unit.set_workload_version('1.2.3')
self.assertEqual(harness.get_workload_version(), '1.2.3')
class DBRelationChangedHelper(Object):
def __init__(self, parent, key):
super().__init__(parent, key)
self.changes = []
parent.framework.observe(parent.on.db_relation_changed, self.on_relation_changed)
def on_relation_changed(self, event):
if event.unit is not None:
self.changes.append((event.relation.id, event.unit.name))
else:
self.changes.append((event.relation.id, event.app.name))
class RelationChangedViewer(Object):
"""Track relation_changed events and saves the data seen in the relation bucket."""
def __init__(self, charm, relation_name):
super().__init__(charm, relation_name)
self.changes = []
charm.framework.observe(charm.on[relation_name].relation_changed, self.on_relation_changed)
def on_relation_changed(self, event):
if event.unit is not None:
data = event.relation.data[event.unit]
else:
data = event.relation.data[event.app]
self.changes.append(dict(data))
class RecordingCharm(CharmBase):
"""Record the events that we see, and any associated data."""
def __init__(self, framework, charm_name):
super().__init__(framework, charm_name)
self.changes = []
self.framework.observe(self.on.config_changed, self.on_config_changed)
self.framework.observe(self.on.leader_elected, self.on_leader_elected)
def get_changes(self, reset=True):
changes = self.changes
if reset:
self.changes = []
return changes
def on_config_changed(self, _):
self.changes.append(dict(name='config', data=dict(self.framework.model.config)))
def on_leader_elected(self, _):
self.changes.append(dict(name='leader-elected'))
class RelationEventCharm(RecordingCharm):
"""Record events related to relation lifecycles."""
def __init__(self, framework, charm_name):
super().__init__(framework, charm_name)
def observe_relation_events(self, relation_name):
self.framework.observe(self.on[relation_name].relation_created, self._on_relation_created)
self.framework.observe(self.on[relation_name].relation_joined, self._on_relation_joined)
self.framework.observe(self.on[relation_name].relation_changed, self._on_relation_changed)
self.framework.observe(self.on[relation_name].relation_departed,
self._on_relation_departed)
self.framework.observe(self.on[relation_name].relation_broken, self._on_relation_broken)
def _on_relation_created(self, event):
self._observe_relation_event('relation-created', event)
def _on_relation_joined(self, event):
self._observe_relation_event('relation-joined', event)
def _on_relation_changed(self, event):
self._observe_relation_event('relation-changed', event)
def _on_relation_departed(self, event):
self._observe_relation_event('relation-departed', event)
def _on_relation_broken(self, event):
self._observe_relation_event('relation-broken', event)
def _observe_relation_event(self, event_name, event):
unit_name = None
if event.unit is not None:
unit_name = event.unit.name
app_name = None
if event.app is not None:
app_name = event.app.name
self.changes.append(
dict(name=event_name,
data=dict(app=app_name, unit=unit_name, relation_id=event.relation.id)))
class TestTestingModelBackend(unittest.TestCase):
def test_status_set_get_unit(self):
harness = Harness(CharmBase, meta='''
name: app
''')
backend = harness._backend
backend.status_set('blocked', 'message', is_app=False)
self.assertEqual(backend.status_get(is_app=False), ('blocked', 'message'))
self.assertEqual(backend.status_get(is_app=True), None)
def test_status_set_get_app(self):
harness = Harness(CharmBase, meta='''
name: app
''')
backend = harness._backend
backend.status_set('blocked', 'message', is_app=True)
self.assertEqual(backend.status_get(is_app=True), ('blocked', 'message'))
self.assertEqual(backend.status_get(is_app=False), None)
def test_relation_ids_unknown_relation(self):
harness = Harness(CharmBase, meta='''
name: test-charm
provides:
db:
interface: mydb
''')
backend = harness._backend
# With no relations added, we just get an empty list for the interface
self.assertEqual(backend.relation_ids('db'), [])
# But an unknown interface raises a ModelError
with self.assertRaises(ModelError):
backend.relation_ids('unknown')
def test_relation_get_unknown_relation_id(self):
harness = Harness(CharmBase, meta='''
name: test-charm
''')
backend = harness._backend
with self.assertRaises(RelationNotFoundError):
backend.relation_get(1234, 'unit/0', False)
def test_relation_list_unknown_relation_id(self):
harness = Harness(CharmBase, meta='''
name: test-charm
''')
backend = harness._backend
with self.assertRaises(RelationNotFoundError):
backend.relation_list(1234)
if __name__ == "__main__":
unittest.main()
ops
jinja2
git+https://github.com/juju-solutions/resource-oci-image/@c5778285d332edf3d9a538f9d0c06154b7ec1b0b#egg=oci-image
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# Copyright © 2020 Dominik Fleischmann dominik.fleischmann@canonical.com
"""Operator Charm main library."""
# Load modules from lib directory
import logging
import subprocess
import setuppath # noqa:F401
from ops.main import main
from ops.charm import CharmBase
from ops.framework import StoredState
from ops.main import main
from ops.model import ActiveStatus, MaintenanceStatus
import subprocess
from ops.model import ActiveStatus, MaintenanceStatus, BlockedStatus
from oci_image import OCIImageResource, OCIImageResourceError
from jinja2 import Template
SQUID_CONF = "/etc/squid/squid.conf"
logger = logging.getLogger(__name__)
class SquidK8SCharm(CharmBase):
"""Class reprisenting this Operator charm."""
state = StoredState()
_stored = StoredState()
def __init__(self, *args):
"""Initialize charm and configure states and events to observe."""
super().__init__(*args)
# -- standard hook observation
self.framework.observe(self.on.install, self.on_install)
self.framework.observe(self.on.start, self.on_start)
self.framework.observe(self.on.config_changed, self.on_config_changed)
self.framework.observe(self.on.deleteurl_action, self)
# -- initialize states --
self.state.set_default(installed=False)
self.state.set_default(configured=False)
self.state.set_default(started=False)
def make_pod_spec(self):
config = self.framework.model.config
ports = [{"name": "squid", "containerPort": config["port"], "protocol": "TCP"}]
spec = {
"containers": [{
"name": self.framework.model.app.name,
"image": config["image"],
"ports": ports,
}],
}
return spec
def _apply_spec(self, spec):
# Only apply the spec if this unit is a leader
if self.framework.model.unit.is_leader():
self.framework.model.pod.set_spec(spec)
self.state.spec = spec
def on_install(self, event):
"""Handle install state."""
self.unit.status = MaintenanceStatus("Installing charm software")
# Perform install tasks
self.unit.status = MaintenanceStatus("Install complete")
logging.info("Install of software complete")
self.state.installed = True
def on_config_changed(self, event):
"""Handle config changed."""
if not self.state.installed:
logging.warning("Config changed called before install complete, deferring event: {}.".format(event.handle))
return
if self.state.started:
# Stop if necessary for reconfig
logging.info("Stopping for configuration, event handle: {}".format(event.handle))
# Configure the software
logging.info("Configuring")
self.state.configured = True
def on_start(self, event):
"""Handle start state."""
if not self.state.configured:
logging.warning("Start called before configuration complete, deferring event: {}".format(event.handle))
self._stored.set_default(pod_spec=None, allowedurls=set())
self.framework.observe(self.on.start, self.configure_pod)
self.framework.observe(self.on.config_changed, self.configure_pod)
self.framework.observe(self.on.addurl_action, self.on_addurl_action)
self.framework.observe(self.on.deleteurl_action, self.on_deleteurl_action)
self.image = OCIImageResource(self, "image")
def _update_allowed_urls(self, add: str = None, delete: str = None):
if add:
self._stored.allowedurls.add(add)
if delete and delete in self._stored.allowedurls:
self._stored.allowedurls.remove(delete)
def _update_squid_config(self, add: str = None, delete: str = None):
self._update_allowed_urls(add=add, delete=delete)
squid_config_text = self._get_squid_config_file_text()
if squid_config_text:
with open(SQUID_CONF, "w") as f:
f.write(squid_config_text)
subprocess.Popen(
"sleep 1 && kill -HUP `cat /var/run/squid.pid`", shell=True
)
def on_addurl_action(self, event):
url = event.params["url"]
self._update_squid_config(add=url)
def on_deleteurl_action(self, event):
"""Handle the deleteurl action."""
url = event.params["url"]
self._update_squid_config(delete=url)
def _get_squid_config_file_text(self):
squid_config_text = None
allowed_urls_text = ""
for url in self._stored.allowedurls:
allowed_urls_text += f"acl allowedurls dstdomain .{url}\n"
allowed_urls_text += "http_access allow allowedurls\n"
with open("template/squid.conf") as template:
squid_config_text = Template(template.read()).render(
allowed_urls=allowed_urls_text
)
return squid_config_text
def configure_pod(self, event):
if not self.unit.is_leader():
self.unit.status = ActiveStatus("ready")
return
self.unit.status = MaintenanceStatus("Applying pod spec")
# Start software
new_pod_spec = self.make_pod_spec()
self._apply_spec(new_pod_spec)
self.unit.status = ActiveStatus("Unit is ready")
self.state.started = True
logging.info("Started")
# Fetch image information
try:
self.unit.status = MaintenanceStatus("Fetching image information")
image_info = self.image.fetch()
except OCIImageResourceError:
self.unit.status = BlockedStatus("Error fetching image information")
return
def on_deleteurl_action(self, event):
"""Handle the deleteurl action."""
url = event.params["url"]
pod_spec = self._make_pod_spec(image_info)
line_to_delete = "acl allowedurls dstdomain .{}".format(url)
line_deleted = False
if self._stored.pod_spec != pod_spec:
self.model.pod.set_spec(pod_spec)
self._stored.pod_spec = pod_spec
self.unit.status = ActiveStatus("ready")
with open("/etc/squid/squid.conf", "r") as f:
lines = f.readlines()
with open("/etc/squid/squid.conf", "w") as f:
for line in lines:
if line_to_delete not in line:
f.write(line)
else:
line_deleted = True
def _make_pod_spec(self, image_info):
config = self.config
ports = [{"name": "squid", "containerPort": config["port"], "protocol": "TCP"}]
if line_deleted:
event.set_results({"output": "URL deleted succesfully"})
subprocess.Popen("sleep 1 && kill -HUP `cat /var/run/squid.pid`", shell=True)
else:
event.fail("No URL was deleted")
spec = {
"version": 3,
"containers": [
{
"name": self.framework.model.app.name,
"imageDetails": image_info,
"ports": ports,
}
],
}
return spec
if __name__ == "__main__":
from ops.main import main
main(SquidK8SCharm)
acl SSL_ports port 443
acl Safe_ports port 80 # http
acl Safe_ports port 21 # ftp
acl Safe_ports port 443 # https
acl Safe_ports port 70 # gopher
acl Safe_ports port 210 # wais
acl Safe_ports port 1025-65535 # unregistered ports
acl Safe_ports port 280 # http-mgmt
acl Safe_ports port 488 # gss-http
acl Safe_ports port 591 # filemaker
acl Safe_ports port 777 # multiling http
acl CONNECT method CONNECT
http_access deny !Safe_ports
http_access deny CONNECT !SSL_ports
http_access allow localhost manager
http_access deny manager
http_access allow localhost
{{ allowed_urls }}
http_access deny all
http_port 3128
coredump_dir /var/spool/squid
refresh_pattern ^ftp: 1440 20% 10080
refresh_pattern ^gopher: 1440 0% 1440
refresh_pattern -i (/cgi-bin/|\?) 0 0% 0
refresh_pattern (Release|Packages(.gz)*)$ 0 20% 2880
refresh_pattern . 0 20% 4320
Copyright 2007 Pallets
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Metadata-Version: 2.1
Name: Jinja2
Version: 2.11.2
Summary: A very fast and expressive template engine.
Home-page: https://palletsprojects.com/p/jinja/
Author: Armin Ronacher
Author-email: armin.ronacher@active-4.com
Maintainer: Pallets
Maintainer-email: contact@palletsprojects.com
License: BSD-3-Clause
Project-URL: Documentation, https://jinja.palletsprojects.com/
Project-URL: Code, https://github.com/pallets/jinja
Project-URL: Issue tracker, https://github.com/pallets/jinja/issues
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Internet :: WWW/HTTP :: Dynamic Content
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Text Processing :: Markup :: HTML
Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*
Description-Content-Type: text/x-rst
Requires-Dist: MarkupSafe (>=0.23)
Provides-Extra: i18n
Requires-Dist: Babel (>=0.8) ; extra == 'i18n'
Jinja
=====
Jinja is a fast, expressive, extensible templating engine. Special
placeholders in the template allow writing code similar to Python
syntax. Then the template is passed data to render the final document.
It includes:
- Template inheritance and inclusion.
- Define and import macros within templates.
- HTML templates can use autoescaping to prevent XSS from untrusted
user input.
- A sandboxed environment can safely render untrusted templates.
- AsyncIO support for generating templates and calling async
functions.
- I18N support with Babel.
- Templates are compiled to optimized Python code just-in-time and
cached, or can be compiled ahead-of-time.
- Exceptions point to the correct line in templates to make debugging
easier.
- Extensible filters, tests, functions, and even syntax.
Jinja's philosophy is that while application logic belongs in Python if
possible, it shouldn't make the template designer's job difficult by
restricting functionality too much.
Installing
----------
Install and update using `pip`_:
.. code-block:: text
$ pip install -U Jinja2
.. _pip: https://pip.pypa.io/en/stable/quickstart/
In A Nutshell
-------------
.. code-block:: jinja
{% extends "base.html" %}
{% block title %}Members{% endblock %}
{% block content %}
<ul>
{% for user in users %}
<li><a href="{{ user.url }}">{{ user.username }}</a></li>
{% endfor %}
</ul>
{% endblock %}
Links
-----
- Website: https://palletsprojects.com/p/jinja/
- Documentation: https://jinja.palletsprojects.com/
- Releases: https://pypi.org/project/Jinja2/
- Code: https://github.com/pallets/jinja
- Issue tracker: https://github.com/pallets/jinja/issues
- Test status: https://dev.azure.com/pallets/jinja/_build
- Official chat: https://discord.gg/t6rrQZH
Jinja2-2.11.2.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
Jinja2-2.11.2.dist-info/LICENSE.rst,sha256=O0nc7kEF6ze6wQ-vG-JgQI_oXSUrjp3y4JefweCUQ3s,1475
Jinja2-2.11.2.dist-info/METADATA,sha256=5ZHRZoIRAMHsJPnqhlJ622_dRPsYePYJ-9EH4-Ry7yI,3535
Jinja2-2.11.2.dist-info/RECORD,,
Jinja2-2.11.2.dist-info/WHEEL,sha256=kGT74LWyRUZrL4VgLh6_g12IeVl_9u9ZVhadrgXZUEY,110
Jinja2-2.11.2.dist-info/entry_points.txt,sha256=Qy_DkVo6Xj_zzOtmErrATe8lHZhOqdjpt3e4JJAGyi8,61
Jinja2-2.11.2.dist-info/top_level.txt,sha256=PkeVWtLb3-CqjWi1fO29OCbj55EhX_chhKrCdrVe_zs,7
jinja2/__init__.py,sha256=0QCM_jKKDM10yzSdHRVV4mQbCbDqf0GN0GirAqibn9Y,1549
jinja2/__pycache__/__init__.cpython-38.pyc,,
jinja2/__pycache__/_compat.cpython-38.pyc,,
jinja2/__pycache__/_identifier.cpython-38.pyc,,
jinja2/__pycache__/asyncfilters.cpython-38.pyc,,
jinja2/__pycache__/asyncsupport.cpython-38.pyc,,
jinja2/__pycache__/bccache.cpython-38.pyc,,
jinja2/__pycache__/compiler.cpython-38.pyc,,
jinja2/__pycache__/constants.cpython-38.pyc,,
jinja2/__pycache__/debug.cpython-38.pyc,,
jinja2/__pycache__/defaults.cpython-38.pyc,,
jinja2/__pycache__/environment.cpython-38.pyc,,
jinja2/__pycache__/exceptions.cpython-38.pyc,,
jinja2/__pycache__/ext.cpython-38.pyc,,
jinja2/__pycache__/filters.cpython-38.pyc,,
jinja2/__pycache__/idtracking.cpython-38.pyc,,
jinja2/__pycache__/lexer.cpython-38.pyc,,
jinja2/__pycache__/loaders.cpython-38.pyc,,
jinja2/__pycache__/meta.cpython-38.pyc,,
jinja2/__pycache__/nativetypes.cpython-38.pyc,,
jinja2/__pycache__/nodes.cpython-38.pyc,,
jinja2/__pycache__/optimizer.cpython-38.pyc,,
jinja2/__pycache__/parser.cpython-38.pyc,,
jinja2/__pycache__/runtime.cpython-38.pyc,,
jinja2/__pycache__/sandbox.cpython-38.pyc,,
jinja2/__pycache__/tests.cpython-38.pyc,,
jinja2/__pycache__/utils.cpython-38.pyc,,
jinja2/__pycache__/visitor.cpython-38.pyc,,
jinja2/_compat.py,sha256=B6Se8HjnXVpzz9-vfHejn-DV2NjaVK-Iewupc5kKlu8,3191
jinja2/_identifier.py,sha256=EdgGJKi7O1yvr4yFlvqPNEqV6M1qHyQr8Gt8GmVTKVM,1775
jinja2/asyncfilters.py,sha256=XJtYXTxFvcJ5xwk6SaDL4S0oNnT0wPYvXBCSzc482fI,4250
jinja2/asyncsupport.py,sha256=ZBFsDLuq3Gtji3Ia87lcyuDbqaHZJRdtShZcqwpFnSQ,7209
jinja2/bccache.py,sha256=3Pmp4jo65M9FQuIxdxoDBbEDFwe4acDMQf77nEJfrHA,12139
jinja2/compiler.py,sha256=Ta9W1Lit542wItAHXlDcg0sEOsFDMirCdlFPHAurg4o,66284
jinja2/constants.py,sha256=RR1sTzNzUmKco6aZicw4JpQpJGCuPuqm1h1YmCNUEFY,1458
jinja2/debug.py,sha256=neR7GIGGjZH3_ILJGVUYy3eLQCCaWJMXOb7o0kGInWc,8529
jinja2/defaults.py,sha256=85B6YUUCyWPSdrSeVhcqFVuu_bHUAQXeey--FIwSeVQ,1126
jinja2/environment.py,sha256=XDSLKc4SqNLMOwTSq3TbWEyA5WyXfuLuVD0wAVjEFwM,50629
jinja2/exceptions.py,sha256=VjNLawcmf2ODffqVMCQK1cRmvFaUfQWF4u8ouP3QPcE,5425
jinja2/ext.py,sha256=AtwL5O5enT_L3HR9-oBvhGyUTdGoyaqG_ICtnR_EVd4,26441
jinja2/filters.py,sha256=_RpPgAlgIj7ExvyDzcHAC3B36cocfWK-1TEketbNeM0,41415
jinja2/idtracking.py,sha256=J3O4VHsrbf3wzwiBc7Cro26kHb6_5kbULeIOzocchIU,9211
jinja2/lexer.py,sha256=nUFLRKhhKmmEWkLI65nQePgcQs7qsRdjVYZETMt_v0g,30331
jinja2/loaders.py,sha256=C-fST_dmFjgWkp0ZuCkrgICAoOsoSIF28wfAFink0oU,17666
jinja2/meta.py,sha256=QjyYhfNRD3QCXjBJpiPl9KgkEkGXJbAkCUq4-Ur10EQ,4131
jinja2/nativetypes.py,sha256=Ul__gtVw4xH-0qvUvnCNHedQeNDwmEuyLJztzzSPeRg,2753
jinja2/nodes.py,sha256=Mk1oJPVgIjnQw9WOqILvcu3rLepcFZ0ahxQm2mbwDwc,31095
jinja2/optimizer.py,sha256=gQLlMYzvQhluhzmAIFA1tXS0cwgWYOjprN-gTRcHVsc,1457
jinja2/parser.py,sha256=fcfdqePNTNyvosIvczbytVA332qpsURvYnCGcjDHSkA,35660
jinja2/runtime.py,sha256=0y-BRyIEZ9ltByL2Id6GpHe1oDRQAwNeQvI0SKobNMw,30618
jinja2/sandbox.py,sha256=knayyUvXsZ-F0mk15mO2-ehK9gsw04UhB8td-iUOtLc,17127
jinja2/tests.py,sha256=iO_Y-9Vo60zrVe1lMpSl5sKHqAxe2leZHC08OoZ8K24,4799
jinja2/utils.py,sha256=OoVMlQe9S2-lWT6jJbTu9tDuDvGNyWUhHDcE51i5_Do,22522
jinja2/visitor.py,sha256=DUHupl0a4PGp7nxRtZFttUzAi1ccxzqc2hzetPYUz8U,3240
Wheel-Version: 1.0
Generator: bdist_wheel (0.34.2)
Root-Is-Purelib: true
Tag: py2-none-any
Tag: py3-none-any
[babel.extractors]
jinja2 = jinja2.ext:babel_extract [i18n]
Copyright 2010 Pallets
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Metadata-Version: 2.1
Name: MarkupSafe
Version: 1.1.1
Summary: Safely add untrusted strings to HTML/XML markup.
Home-page: https://palletsprojects.com/p/markupsafe/
Author: Armin Ronacher
Author-email: armin.ronacher@active-4.com
Maintainer: The Pallets Team
Maintainer-email: contact@palletsprojects.com
License: BSD-3-Clause
Project-URL: Documentation, https://markupsafe.palletsprojects.com/
Project-URL: Code, https://github.com/pallets/markupsafe
Project-URL: Issue tracker, https://github.com/pallets/markupsafe/issues
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Topic :: Internet :: WWW/HTTP :: Dynamic Content
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Text Processing :: Markup :: HTML
Requires-Python: >=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*
Description-Content-Type: text/x-rst
MarkupSafe
==========
MarkupSafe implements a text object that escapes characters so it is
safe to use in HTML and XML. Characters that have special meanings are
replaced so that they display as the actual characters. This mitigates
injection attacks, meaning untrusted user input can safely be displayed
on a page.
Installing
----------
Install and update using `pip`_:
.. code-block:: text
pip install -U MarkupSafe
.. _pip: https://pip.pypa.io/en/stable/quickstart/
Examples
--------
.. code-block:: pycon
>>> from markupsafe import Markup, escape
>>> # escape replaces special characters and wraps in Markup
>>> escape('<script>alert(document.cookie);</script>')
Markup(u'&lt;script&gt;alert(document.cookie);&lt;/script&gt;')
>>> # wrap in Markup to mark text "safe" and prevent escaping
>>> Markup('<strong>Hello</strong>')
Markup('<strong>hello</strong>')
>>> escape(Markup('<strong>Hello</strong>'))
Markup('<strong>hello</strong>')
>>> # Markup is a text subclass (str on Python 3, unicode on Python 2)
>>> # methods and operators escape their arguments
>>> template = Markup("Hello <em>%s</em>")
>>> template % '"World"'
Markup('Hello <em>&#34;World&#34;</em>')
Donate
------
The Pallets organization develops and supports MarkupSafe and other
libraries that use it. In order to grow the community of contributors
and users, and allow the maintainers to devote more time to the
projects, `please donate today`_.
.. _please donate today: https://palletsprojects.com/donate
Links
-----
* Website: https://palletsprojects.com/p/markupsafe/
* Documentation: https://markupsafe.palletsprojects.com/
* License: `BSD-3-Clause <https://github.com/pallets/markupsafe/blob/master/LICENSE.rst>`_
* Releases: https://pypi.org/project/MarkupSafe/
* Code: https://github.com/pallets/markupsafe
* Issue tracker: https://github.com/pallets/markupsafe/issues
* Test status:
* Linux, Mac: https://travis-ci.org/pallets/markupsafe
* Windows: https://ci.appveyor.com/project/pallets/markupsafe
* Test coverage: https://codecov.io/gh/pallets/markupsafe
* Official chat: https://discord.gg/t6rrQZH
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment