Skip to content
Snippets Groups Projects
Commit a1c62db7 authored by lavado's avatar lavado
Browse files

moving squid NS/KNF to magma folder

parent fd7b74ee
No related branches found
No related tags found
2 merge requests!11Hackfest9,!10Hackfest9
Showing
with 0 additions and 3590 deletions
# squid-k8s Charm
## Overview
This is a Kuberentes Charm to deploy [Squid Cache](http://www.squid-cache.org/).
Sugested Actions for this charm:
* Set allowed URLs
Possible way to run action: `juju run-action squid/0 addurl url=google.com`
* Stop/Start/Restart the squid service - done
Run like this: `juju run-action squid/0 restart`
* Set ftp, http, https proxies
## Quickstart
If you don't have microk8s and juju installed executing the following commands:
```
sudo snap install juju --classic
sudo snap install microk8s --classic
juju bootstrap microk8s
juju add-model squid
```
Afterwards clone the repository and deploy the charm
```
git clone https://github.com/DomFleischmann/charm-squid-k8s.git
cd charm-squid-k8s
git submodule update --init
juju deploy .
```
Check if the charm is deployed correctly with `juju status`
To test the `addurl` action open another terminal and type the following command:
`export https_proxy=http://<squid-ip>:3128`
Where squid-ip is the Squid App Address shown in `juju status`
Now when executing `curl https://www.google.com` squid will block access to the url
Execute the `addurl` action:
`juju run-action squid/0 addurl url=google.com`
Now when executing `curl https://www.google.com` it will give you the google output.
## Contact
- Author: Dominik Fleischmann <dominik.fleischmann@canonical.com>
- Bug Tracker: [here](https://github.com/DomFleischmann/charm-squid-k8s)
addurl:
description: "Add squid config"
params:
url:
description: "URL that will be allowed"
type: string
default: ""
start:
description: "Start squid service"
restart:
description: "Retart squid service"
stop:
description: "Stop squid service"
#!/bin/bash
URL=`action-get url`
if ! grep -Fxq "http_access allow allowedurls" /etc/squid/squid.conf
then
sed -i '/^# And finally deny all .*/i http_access allow allowedurls\n' /etc/squid/squid.conf
fi
sed -i "/^http_access allow allowedurls.*/i acl allowedurls dstdomain \.$URL" /etc/squid/squid.conf
kill -HUP `cat /var/run/squid.pid`
#!/bin/bash
service squid restart
#!/bin/bash
service squid start
#!/bin/bash
service squid stop
options:
image:
type: string
description: 'Docker image for squid'
default: 'sameersbn/squid:latest'
port:
type: int
description: 'Port'
default: 3128
../src/charm.py
\ No newline at end of file
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Operator Framework."""
# Import here the bare minimum to break the circular import between modules
from . import charm # NOQA
# Copyright 2019-2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pathlib
import typing
import yaml
from ops.framework import Object, EventSource, EventBase, Framework, ObjectEvents
from ops import model
class HookEvent(EventBase):
"""A base class for events that trigger because of a Juju hook firing."""
class ActionEvent(EventBase):
"""A base class for events that trigger when a user asks for an Action to be run.
To read the parameters for the action, see the instance variable `params`.
To respond with the result of the action, call `set_results`. To add progress
messages that are visible as the action is progressing use `log`.
:ivar params: The parameters passed to the action (read by action-get)
"""
def defer(self):
"""Action events are not deferable like other events.
This is because an action runs synchronously and the user is waiting for the result.
"""
raise RuntimeError('cannot defer action events')
def restore(self, snapshot: dict) -> None:
"""Used by the operator framework to record the action.
Not meant to be called directly by Charm code.
"""
env_action_name = os.environ.get('JUJU_ACTION_NAME')
event_action_name = self.handle.kind[:-len('_action')].replace('_', '-')
if event_action_name != env_action_name:
# This could only happen if the dev manually emits the action, or from a bug.
raise RuntimeError('action event kind does not match current action')
# Params are loaded at restore rather than __init__ because
# the model is not available in __init__.
self.params = self.framework.model._backend.action_get()
def set_results(self, results: typing.Mapping) -> None:
"""Report the result of the action.
Args:
results: The result of the action as a Dict
"""
self.framework.model._backend.action_set(results)
def log(self, message: str) -> None:
"""Send a message that a user will see while the action is running.
Args:
message: The message for the user.
"""
self.framework.model._backend.action_log(message)
def fail(self, message: str = '') -> None:
"""Report that this action has failed.
Args:
message: Optional message to record why it has failed.
"""
self.framework.model._backend.action_fail(message)
class InstallEvent(HookEvent):
"""Represents the `install` hook from Juju."""
class StartEvent(HookEvent):
"""Represents the `start` hook from Juju."""
class StopEvent(HookEvent):
"""Represents the `stop` hook from Juju."""
class RemoveEvent(HookEvent):
"""Represents the `remove` hook from Juju. """
class ConfigChangedEvent(HookEvent):
"""Represents the `config-changed` hook from Juju."""
class UpdateStatusEvent(HookEvent):
"""Represents the `update-status` hook from Juju."""
class UpgradeCharmEvent(HookEvent):
"""Represents the `upgrade-charm` hook from Juju.
This will be triggered when a user has run `juju upgrade-charm`. It is run after Juju
has unpacked the upgraded charm code, and so this event will be handled with new code.
"""
class PreSeriesUpgradeEvent(HookEvent):
"""Represents the `pre-series-upgrade` hook from Juju.
This happens when a user has run `juju upgrade-series MACHINE prepare` and
will fire for each unit that is running on the machine, telling them that
the user is preparing to upgrade the Machine's series (eg trusty->bionic).
The charm should take actions to prepare for the upgrade (a database charm
would want to write out a version-independent dump of the database, so that
when a new version of the database is available in a new series, it can be
used.)
Once all units on a machine have run `pre-series-upgrade`, the user will
initiate the steps to actually upgrade the machine (eg `do-release-upgrade`).
When the upgrade has been completed, the :class:`PostSeriesUpgradeEvent` will fire.
"""
class PostSeriesUpgradeEvent(HookEvent):
"""Represents the `post-series-upgrade` hook from Juju.
This is run after the user has done a distribution upgrade (or rolled back
and kept the same series). It is called in response to
`juju upgrade-series MACHINE complete`. Charms are expected to do whatever
steps are necessary to reconfigure their applications for the new series.
"""
class LeaderElectedEvent(HookEvent):
"""Represents the `leader-elected` hook from Juju.
Juju will trigger this when a new lead unit is chosen for a given application.
This represents the leader of the charm information (not necessarily the primary
of a running application). The main utility is that charm authors can know
that only one unit will be a leader at any given time, so they can do
configuration, etc, that would otherwise require coordination between units.
(eg, selecting a password for a new relation)
"""
class LeaderSettingsChangedEvent(HookEvent):
"""Represents the `leader-settings-changed` hook from Juju.
Deprecated. This represents when a lead unit would call `leader-set` to inform
the other units of an application that they have new information to handle.
This has been deprecated in favor of using a Peer relation, and having the
leader set a value in the Application data bag for that peer relation.
(see :class:`RelationChangedEvent`).
"""
class CollectMetricsEvent(HookEvent):
"""Represents the `collect-metrics` hook from Juju.
Note that events firing during a CollectMetricsEvent are currently
sandboxed in how they can interact with Juju. To report metrics
use :meth:`.add_metrics`.
"""
def add_metrics(self, metrics: typing.Mapping, labels: typing.Mapping = None) -> None:
"""Record metrics that have been gathered by the charm for this unit.
Args:
metrics: A collection of {key: float} pairs that contains the
metrics that have been gathered
labels: {key:value} strings that can be applied to the
metrics that are being gathered
"""
self.framework.model._backend.add_metrics(metrics, labels)
class RelationEvent(HookEvent):
"""A base class representing the various relation lifecycle events.
Charmers should not be creating RelationEvents directly. The events will be
generated by the framework from Juju related events. Users can observe them
from the various `CharmBase.on[relation_name].relation_*` events.
Attributes:
relation: The Relation involved in this event
app: The remote application that has triggered this event
unit: The remote unit that has triggered this event. This may be None
if the relation event was triggered as an Application level event
"""
def __init__(self, handle, relation, app=None, unit=None):
super().__init__(handle)
if unit is not None and unit.app != app:
raise RuntimeError(
'cannot create RelationEvent with application {} and unit {}'.format(app, unit))
self.relation = relation
self.app = app
self.unit = unit
def snapshot(self) -> dict:
"""Used by the framework to serialize the event to disk.
Not meant to be called by Charm code.
"""
snapshot = {
'relation_name': self.relation.name,
'relation_id': self.relation.id,
}
if self.app:
snapshot['app_name'] = self.app.name
if self.unit:
snapshot['unit_name'] = self.unit.name
return snapshot
def restore(self, snapshot: dict) -> None:
"""Used by the framework to deserialize the event from disk.
Not meant to be called by Charm code.
"""
self.relation = self.framework.model.get_relation(
snapshot['relation_name'], snapshot['relation_id'])
app_name = snapshot.get('app_name')
if app_name:
self.app = self.framework.model.get_app(app_name)
else:
self.app = None
unit_name = snapshot.get('unit_name')
if unit_name:
self.unit = self.framework.model.get_unit(unit_name)
else:
self.unit = None
class RelationCreatedEvent(RelationEvent):
"""Represents the `relation-created` hook from Juju.
This is triggered when a new relation to another app is added in Juju. This
can occur before units for those applications have started. All existing
relations should be established before start.
"""
class RelationJoinedEvent(RelationEvent):
"""Represents the `relation-joined` hook from Juju.
This is triggered whenever a new unit of a related application joins the relation.
(eg, a unit was added to an existing related app, or a new relation was established
with an application that already had units.)
"""
class RelationChangedEvent(RelationEvent):
"""Represents the `relation-changed` hook from Juju.
This is triggered whenever there is a change to the data bucket for a related
application or unit. Look at `event.relation.data[event.unit/app]` to see the
new information.
"""
class RelationDepartedEvent(RelationEvent):
"""Represents the `relation-departed` hook from Juju.
This is the inverse of the RelationJoinedEvent, representing when a unit
is leaving the relation (the unit is being removed, the app is being removed,
the relation is being removed). It is fired once for each unit that is
going away.
"""
class RelationBrokenEvent(RelationEvent):
"""Represents the `relation-broken` hook from Juju.
If a relation is being removed (`juju remove-relation` or `juju remove-application`),
once all the units have been removed, RelationBrokenEvent will fire to signal
that the relationship has been fully terminated.
"""
class StorageEvent(HookEvent):
"""Base class representing Storage related events."""
class StorageAttachedEvent(StorageEvent):
"""Represents the `storage-attached` hook from Juju.
Called when new storage is available for the charm to use.
"""
class StorageDetachingEvent(StorageEvent):
"""Represents the `storage-detaching` hook from Juju.
Called when storage a charm has been using is going away.
"""
class CharmEvents(ObjectEvents):
"""The events that are generated by Juju in response to the lifecycle of an application."""
install = EventSource(InstallEvent)
start = EventSource(StartEvent)
stop = EventSource(StopEvent)
remove = EventSource(RemoveEvent)
update_status = EventSource(UpdateStatusEvent)
config_changed = EventSource(ConfigChangedEvent)
upgrade_charm = EventSource(UpgradeCharmEvent)
pre_series_upgrade = EventSource(PreSeriesUpgradeEvent)
post_series_upgrade = EventSource(PostSeriesUpgradeEvent)
leader_elected = EventSource(LeaderElectedEvent)
leader_settings_changed = EventSource(LeaderSettingsChangedEvent)
collect_metrics = EventSource(CollectMetricsEvent)
class CharmBase(Object):
"""Base class that represents the Charm overall.
Usually this initialization is done by ops.main.main() rather than Charm authors
directly instantiating a Charm.
Args:
framework: The framework responsible for managing the Model and events for this
Charm.
key: Arbitrary key to distinguish this instance of CharmBase from another.
Generally is None when initialized by the framework. For charms instantiated by
main.main(), this is currenly None.
Attributes:
on: Defines all events that the Charm will fire.
"""
on = CharmEvents()
def __init__(self, framework: Framework, key: typing.Optional[str]):
"""Initialize the Charm with its framework and application name.
"""
super().__init__(framework, key)
for relation_name in self.framework.meta.relations:
relation_name = relation_name.replace('-', '_')
self.on.define_event(relation_name + '_relation_created', RelationCreatedEvent)
self.on.define_event(relation_name + '_relation_joined', RelationJoinedEvent)
self.on.define_event(relation_name + '_relation_changed', RelationChangedEvent)
self.on.define_event(relation_name + '_relation_departed', RelationDepartedEvent)
self.on.define_event(relation_name + '_relation_broken', RelationBrokenEvent)
for storage_name in self.framework.meta.storages:
storage_name = storage_name.replace('-', '_')
self.on.define_event(storage_name + '_storage_attached', StorageAttachedEvent)
self.on.define_event(storage_name + '_storage_detaching', StorageDetachingEvent)
for action_name in self.framework.meta.actions:
action_name = action_name.replace('-', '_')
self.on.define_event(action_name + '_action', ActionEvent)
@property
def app(self) -> model.Application:
"""Application that this unit is part of."""
return self.framework.model.app
@property
def unit(self) -> model.Unit:
"""Unit that this execution is responsible for."""
return self.framework.model.unit
@property
def meta(self) -> 'CharmMeta':
"""CharmMeta of this charm.
"""
return self.framework.meta
@property
def charm_dir(self) -> pathlib.Path:
"""Root directory of the Charm as it is running.
"""
return self.framework.charm_dir
class CharmMeta:
"""Object containing the metadata for the charm.
This is read from metadata.yaml and/or actions.yaml. Generally charms will
define this information, rather than reading it at runtime. This class is
mostly for the framework to understand what the charm has defined.
The maintainers, tags, terms, series, and extra_bindings attributes are all
lists of strings. The requires, provides, peers, relations, storage,
resources, and payloads attributes are all mappings of names to instances
of the respective RelationMeta, StorageMeta, ResourceMeta, or PayloadMeta.
The relations attribute is a convenience accessor which includes all of the
requires, provides, and peers RelationMeta items. If needed, the role of
the relation definition can be obtained from its role attribute.
Attributes:
name: The name of this charm
summary: Short description of what this charm does
description: Long description for this charm
maintainers: A list of strings of the email addresses of the maintainers
of this charm.
tags: Charm store tag metadata for categories associated with this charm.
terms: Charm store terms that should be agreed to before this charm can
be deployed. (Used for things like licensing issues.)
series: The list of supported OS series that this charm can support.
The first entry in the list is the default series that will be
used by deploy if no other series is requested by the user.
subordinate: True/False whether this charm is intended to be used as a
subordinate charm.
min_juju_version: If supplied, indicates this charm needs features that
are not available in older versions of Juju.
requires: A dict of {name: :class:`RelationMeta` } for each 'requires' relation.
provides: A dict of {name: :class:`RelationMeta` } for each 'provides' relation.
peers: A dict of {name: :class:`RelationMeta` } for each 'peer' relation.
relations: A dict containing all :class:`RelationMeta` attributes (merged from other
sections)
storages: A dict of {name: :class:`StorageMeta`} for each defined storage.
resources: A dict of {name: :class:`ResourceMeta`} for each defined resource.
payloads: A dict of {name: :class:`PayloadMeta`} for each defined payload.
extra_bindings: A dict of additional named bindings that a charm can use
for network configuration.
actions: A dict of {name: :class:`ActionMeta`} for actions that the charm has defined.
Args:
raw: a mapping containing the contents of metadata.yaml
actions_raw: a mapping containing the contents of actions.yaml
"""
def __init__(self, raw: dict = {}, actions_raw: dict = {}):
self.name = raw.get('name', '')
self.summary = raw.get('summary', '')
self.description = raw.get('description', '')
self.maintainers = []
if 'maintainer' in raw:
self.maintainers.append(raw['maintainer'])
if 'maintainers' in raw:
self.maintainers.extend(raw['maintainers'])
self.tags = raw.get('tags', [])
self.terms = raw.get('terms', [])
self.series = raw.get('series', [])
self.subordinate = raw.get('subordinate', False)
self.min_juju_version = raw.get('min-juju-version')
self.requires = {name: RelationMeta('requires', name, rel)
for name, rel in raw.get('requires', {}).items()}
self.provides = {name: RelationMeta('provides', name, rel)
for name, rel in raw.get('provides', {}).items()}
# TODO: (jam 2020-05-11) The *role* should be 'peer' even though it comes from the
# 'peers' section.
self.peers = {name: RelationMeta('peers', name, rel)
for name, rel in raw.get('peers', {}).items()}
self.relations = {}
self.relations.update(self.requires)
self.relations.update(self.provides)
self.relations.update(self.peers)
self.storages = {name: StorageMeta(name, storage)
for name, storage in raw.get('storage', {}).items()}
self.resources = {name: ResourceMeta(name, res)
for name, res in raw.get('resources', {}).items()}
self.payloads = {name: PayloadMeta(name, payload)
for name, payload in raw.get('payloads', {}).items()}
self.extra_bindings = raw.get('extra-bindings', {})
self.actions = {name: ActionMeta(name, action) for name, action in actions_raw.items()}
@classmethod
def from_yaml(
cls, metadata: typing.Union[str, typing.TextIO],
actions: typing.Optional[typing.Union[str, typing.TextIO]] = None):
"""Instantiate a CharmMeta from a YAML description of metadata.yaml.
Args:
metadata: A YAML description of charm metadata (name, relations, etc.)
This can be a simple string, or a file-like object. (passed to `yaml.safe_load`).
actions: YAML description of Actions for this charm (eg actions.yaml)
"""
meta = yaml.safe_load(metadata)
raw_actions = {}
if actions is not None:
raw_actions = yaml.safe_load(actions)
return cls(meta, raw_actions)
class RelationMeta:
"""Object containing metadata about a relation definition.
Should not be constructed directly by Charm code. Is gotten from one of
:attr:`CharmMeta.peers`, :attr:`CharmMeta.requires`, :attr:`CharmMeta.provides`,
:attr:`CharmMeta.relations`.
Attributes:
role: This is one of requires/provides/peers
relation_name: Name of this relation from metadata.yaml
interface_name: Optional definition of the interface protocol.
scope: "global" or "container" scope based on how the relation should be used.
"""
def __init__(self, role, relation_name, raw):
self.role = role
self.relation_name = relation_name
self.interface_name = raw['interface']
self.scope = raw.get('scope')
class StorageMeta:
"""Object containing metadata about a storage definition."""
def __init__(self, name, raw):
self.storage_name = name
self.type = raw['type']
self.description = raw.get('description', '')
self.shared = raw.get('shared', False)
self.read_only = raw.get('read-only', False)
self.minimum_size = raw.get('minimum-size')
self.location = raw.get('location')
self.multiple_range = None
if 'multiple' in raw:
range = raw['multiple']['range']
if '-' not in range:
self.multiple_range = (int(range), int(range))
else:
range = range.split('-')
self.multiple_range = (int(range[0]), int(range[1]) if range[1] else None)
class ResourceMeta:
"""Object containing metadata about a resource definition."""
def __init__(self, name, raw):
self.resource_name = name
self.type = raw['type']
self.filename = raw.get('filename', None)
self.description = raw.get('description', '')
class PayloadMeta:
"""Object containing metadata about a payload definition."""
def __init__(self, name, raw):
self.payload_name = name
self.type = raw['type']
class ActionMeta:
"""Object containing metadata about an action's definition."""
def __init__(self, name, raw=None):
raw = raw or {}
self.name = name
self.title = raw.get('title', '')
self.description = raw.get('description', '')
self.parameters = raw.get('params', {}) # {<parameter name>: <JSON Schema definition>}
self.required = raw.get('required', []) # [<parameter name>, ...]
# Copyright 2019-2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import collections.abc
import inspect
import keyword
import marshal
import os
import pdb
import pickle
import re
import sqlite3
import sys
import types
import weakref
from datetime import timedelta
from ops import charm
class Handle:
"""Handle defines a name for an object in the form of a hierarchical path.
The provided parent is the object (or that object's handle) that this handle
sits under, or None if the object identified by this handle stands by itself
as the root of its own hierarchy.
The handle kind is a string that defines a namespace so objects with the
same parent and kind will have unique keys.
The handle key is a string uniquely identifying the object. No other objects
under the same parent and kind may have the same key.
"""
def __init__(self, parent, kind, key):
if parent and not isinstance(parent, Handle):
parent = parent.handle
self._parent = parent
self._kind = kind
self._key = key
if parent:
if key:
self._path = "{}/{}[{}]".format(parent, kind, key)
else:
self._path = "{}/{}".format(parent, kind)
else:
if key:
self._path = "{}[{}]".format(kind, key)
else:
self._path = "{}".format(kind)
def nest(self, kind, key):
return Handle(self, kind, key)
def __hash__(self):
return hash((self.parent, self.kind, self.key))
def __eq__(self, other):
return (self.parent, self.kind, self.key) == (other.parent, other.kind, other.key)
def __str__(self):
return self.path
@property
def parent(self):
return self._parent
@property
def kind(self):
return self._kind
@property
def key(self):
return self._key
@property
def path(self):
return self._path
@classmethod
def from_path(cls, path):
handle = None
for pair in path.split("/"):
pair = pair.split("[")
good = False
if len(pair) == 1:
kind, key = pair[0], None
good = True
elif len(pair) == 2:
kind, key = pair
if key and key[-1] == ']':
key = key[:-1]
good = True
if not good:
raise RuntimeError("attempted to restore invalid handle path {}".format(path))
handle = Handle(handle, kind, key)
return handle
class EventBase:
def __init__(self, handle):
self.handle = handle
self.deferred = False
def defer(self):
self.deferred = True
def snapshot(self):
"""Return the snapshot data that should be persisted.
Subclasses must override to save any custom state.
"""
return None
def restore(self, snapshot):
"""Restore the value state from the given snapshot.
Subclasses must override to restore their custom state.
"""
self.deferred = False
class EventSource:
"""EventSource wraps an event type with a descriptor to facilitate observing and emitting.
It is generally used as:
class SomethingHappened(EventBase):
pass
class SomeObject(Object):
something_happened = EventSource(SomethingHappened)
With that, instances of that type will offer the someobj.something_happened
attribute which is a BoundEvent and may be used to emit and observe the event.
"""
def __init__(self, event_type):
if not isinstance(event_type, type) or not issubclass(event_type, EventBase):
raise RuntimeError(
'Event requires a subclass of EventBase as an argument, got {}'.format(event_type))
self.event_type = event_type
self.event_kind = None
self.emitter_type = None
def _set_name(self, emitter_type, event_kind):
if self.event_kind is not None:
raise RuntimeError(
'EventSource({}) reused as {}.{} and {}.{}'.format(
self.event_type.__name__,
self.emitter_type.__name__,
self.event_kind,
emitter_type.__name__,
event_kind,
))
self.event_kind = event_kind
self.emitter_type = emitter_type
def __get__(self, emitter, emitter_type=None):
if emitter is None:
return self
# Framework might not be available if accessed as CharmClass.on.event
# rather than charm_instance.on.event, but in that case it couldn't be
# emitted anyway, so there's no point to registering it.
framework = getattr(emitter, 'framework', None)
if framework is not None:
framework.register_type(self.event_type, emitter, self.event_kind)
return BoundEvent(emitter, self.event_type, self.event_kind)
class BoundEvent:
def __repr__(self):
return '<BoundEvent {} bound to {}.{} at {}>'.format(
self.event_type.__name__,
type(self.emitter).__name__,
self.event_kind,
hex(id(self)),
)
def __init__(self, emitter, event_type, event_kind):
self.emitter = emitter
self.event_type = event_type
self.event_kind = event_kind
def emit(self, *args, **kwargs):
"""Emit event to all registered observers.
The current storage state is committed before and after each observer is notified.
"""
framework = self.emitter.framework
key = framework._next_event_key()
event = self.event_type(Handle(self.emitter, self.event_kind, key), *args, **kwargs)
framework._emit(event)
class HandleKind:
"""Helper descriptor to define the Object.handle_kind field.
The handle_kind for an object defaults to its type name, but it may
be explicitly overridden if desired.
"""
def __get__(self, obj, obj_type):
kind = obj_type.__dict__.get("handle_kind")
if kind:
return kind
return obj_type.__name__
class _Metaclass(type):
"""Helper class to ensure proper instantiation of Object-derived classes.
This class currently has a single purpose: events derived from EventSource
that are class attributes of Object-derived classes need to be told what
their name is in that class. For example, in
class SomeObject(Object):
something_happened = EventSource(SomethingHappened)
the instance of EventSource needs to know it's called 'something_happened'.
Starting from python 3.6 we could use __set_name__ on EventSource for this,
but until then this (meta)class does the equivalent work.
TODO: when we drop support for 3.5 drop this class, and rename _set_name in
EventSource to __set_name__; everything should continue to work.
"""
def __new__(typ, *a, **kw):
k = super().__new__(typ, *a, **kw)
# k is now the Object-derived class; loop over its class attributes
for n, v in vars(k).items():
# we could do duck typing here if we want to support
# non-EventSource-derived shenanigans. We don't.
if isinstance(v, EventSource):
# this is what 3.6+ does automatically for us:
v._set_name(k, n)
return k
class Object(metaclass=_Metaclass):
handle_kind = HandleKind()
def __init__(self, parent, key):
kind = self.handle_kind
if isinstance(parent, Framework):
self.framework = parent
# Avoid Framework instances having a circular reference to themselves.
if self.framework is self:
self.framework = weakref.proxy(self.framework)
self.handle = Handle(None, kind, key)
else:
self.framework = parent.framework
self.handle = Handle(parent, kind, key)
self.framework._track(self)
# TODO Detect conflicting handles here.
@property
def model(self):
return self.framework.model
class ObjectEvents(Object):
"""Convenience type to allow defining .on attributes at class level."""
handle_kind = "on"
def __init__(self, parent=None, key=None):
if parent is not None:
super().__init__(parent, key)
else:
self._cache = weakref.WeakKeyDictionary()
def __get__(self, emitter, emitter_type):
if emitter is None:
return self
instance = self._cache.get(emitter)
if instance is None:
# Same type, different instance, more data. Doing this unusual construct
# means people can subclass just this one class to have their own 'on'.
instance = self._cache[emitter] = type(self)(emitter)
return instance
@classmethod
def define_event(cls, event_kind, event_type):
"""Define an event on this type at runtime.
cls: a type to define an event on.
event_kind: an attribute name that will be used to access the
event. Must be a valid python identifier, not be a keyword
or an existing attribute.
event_type: a type of the event to define.
"""
prefix = 'unable to define an event with event_kind that '
if not event_kind.isidentifier():
raise RuntimeError(prefix + 'is not a valid python identifier: ' + event_kind)
elif keyword.iskeyword(event_kind):
raise RuntimeError(prefix + 'is a python keyword: ' + event_kind)
try:
getattr(cls, event_kind)
raise RuntimeError(
prefix + 'overlaps with an existing type {} attribute: {}'.format(cls, event_kind))
except AttributeError:
pass
event_descriptor = EventSource(event_type)
event_descriptor._set_name(cls, event_kind)
setattr(cls, event_kind, event_descriptor)
def events(self):
"""Return a mapping of event_kinds to bound_events for all available events.
"""
events_map = {}
# We have to iterate over the class rather than instance to allow for properties which
# might call this method (e.g., event views), leading to infinite recursion.
for attr_name, attr_value in inspect.getmembers(type(self)):
if isinstance(attr_value, EventSource):
# We actually care about the bound_event, however, since it
# provides the most info for users of this method.
event_kind = attr_name
bound_event = getattr(self, event_kind)
events_map[event_kind] = bound_event
return events_map
def __getitem__(self, key):
return PrefixedEvents(self, key)
class PrefixedEvents:
def __init__(self, emitter, key):
self._emitter = emitter
self._prefix = key.replace("-", "_") + '_'
def __getattr__(self, name):
return getattr(self._emitter, self._prefix + name)
class PreCommitEvent(EventBase):
pass
class CommitEvent(EventBase):
pass
class FrameworkEvents(ObjectEvents):
pre_commit = EventSource(PreCommitEvent)
commit = EventSource(CommitEvent)
class NoSnapshotError(Exception):
def __init__(self, handle_path):
self.handle_path = handle_path
def __str__(self):
return 'no snapshot data found for {} object'.format(self.handle_path)
class NoTypeError(Exception):
def __init__(self, handle_path):
self.handle_path = handle_path
def __str__(self):
return "cannot restore {} since no class was registered for it".format(self.handle_path)
class SQLiteStorage:
DB_LOCK_TIMEOUT = timedelta(hours=1)
def __init__(self, filename):
# The isolation_level argument is set to None such that the implicit
# transaction management behavior of the sqlite3 module is disabled.
self._db = sqlite3.connect(str(filename),
isolation_level=None,
timeout=self.DB_LOCK_TIMEOUT.total_seconds())
self._setup()
def _setup(self):
# Make sure that the database is locked until the connection is closed,
# not until the transaction ends.
self._db.execute("PRAGMA locking_mode=EXCLUSIVE")
c = self._db.execute("BEGIN")
c.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='snapshot'")
if c.fetchone()[0] == 0:
# Keep in mind what might happen if the process dies somewhere below.
# The system must not be rendered permanently broken by that.
self._db.execute("CREATE TABLE snapshot (handle TEXT PRIMARY KEY, data BLOB)")
self._db.execute('''
CREATE TABLE notice (
sequence INTEGER PRIMARY KEY AUTOINCREMENT,
event_path TEXT,
observer_path TEXT,
method_name TEXT)
''')
self._db.commit()
def close(self):
self._db.close()
def commit(self):
self._db.commit()
# There's commit but no rollback. For abort to be supported, we'll need logic that
# can rollback decisions made by third-party code in terms of the internal state
# of objects that have been snapshotted, and hooks to let them know about it and
# take the needed actions to undo their logic until the last snapshot.
# This is doable but will increase significantly the chances for mistakes.
def save_snapshot(self, handle_path, snapshot_data):
self._db.execute("REPLACE INTO snapshot VALUES (?, ?)", (handle_path, snapshot_data))
def load_snapshot(self, handle_path):
c = self._db.cursor()
c.execute("SELECT data FROM snapshot WHERE handle=?", (handle_path,))
row = c.fetchone()
if row:
return row[0]
return None
def drop_snapshot(self, handle_path):
self._db.execute("DELETE FROM snapshot WHERE handle=?", (handle_path,))
def save_notice(self, event_path, observer_path, method_name):
self._db.execute('INSERT INTO notice VALUES (NULL, ?, ?, ?)',
(event_path, observer_path, method_name))
def drop_notice(self, event_path, observer_path, method_name):
self._db.execute('''
DELETE FROM notice
WHERE event_path=?
AND observer_path=?
AND method_name=?
''', (event_path, observer_path, method_name))
def notices(self, event_path):
if event_path:
c = self._db.execute('''
SELECT event_path, observer_path, method_name
FROM notice
WHERE event_path=?
ORDER BY sequence
''', (event_path,))
else:
c = self._db.execute('''
SELECT event_path, observer_path, method_name
FROM notice
ORDER BY sequence
''')
while True:
rows = c.fetchmany()
if not rows:
break
for row in rows:
yield tuple(row)
# the message to show to the user when a pdb breakpoint goes active
_BREAKPOINT_WELCOME_MESSAGE = """
Starting pdb to debug charm operator.
Run `h` for help, `c` to continue, or `exit`/CTRL-d to abort.
Future breakpoints may interrupt execution again.
More details at https://discourse.jujucharms.com/t/debugging-charm-hooks
"""
class Framework(Object):
on = FrameworkEvents()
# Override properties from Object so that we can set them in __init__.
model = None
meta = None
charm_dir = None
def __init__(self, data_path, charm_dir, meta, model):
super().__init__(self, None)
self._data_path = data_path
self.charm_dir = charm_dir
self.meta = meta
self.model = model
self._observers = [] # [(observer_path, method_name, parent_path, event_key)]
self._observer = weakref.WeakValueDictionary() # {observer_path: observer}
self._objects = weakref.WeakValueDictionary()
self._type_registry = {} # {(parent_path, kind): cls}
self._type_known = set() # {cls}
self._storage = SQLiteStorage(data_path)
# We can't use the higher-level StoredState because it relies on events.
self.register_type(StoredStateData, None, StoredStateData.handle_kind)
stored_handle = Handle(None, StoredStateData.handle_kind, '_stored')
try:
self._stored = self.load_snapshot(stored_handle)
except NoSnapshotError:
self._stored = StoredStateData(self, '_stored')
self._stored['event_count'] = 0
# Hook into builtin breakpoint, so if Python >= 3.7, devs will be able to just do
# breakpoint(); if Python < 3.7, this doesn't affect anything
sys.breakpointhook = self.breakpoint
# Flag to indicate that we already presented the welcome message in a debugger breakpoint
self._breakpoint_welcomed = False
# Parse once the env var, which may be used multiple times later
debug_at = os.environ.get('JUJU_DEBUG_AT')
self._juju_debug_at = debug_at.split(',') if debug_at else ()
def close(self):
self._storage.close()
def _track(self, obj):
"""Track object and ensure it is the only object created using its handle path."""
if obj is self:
# Framework objects don't track themselves
return
if obj.handle.path in self.framework._objects:
raise RuntimeError(
'two objects claiming to be {} have been created'.format(obj.handle.path))
self._objects[obj.handle.path] = obj
def _forget(self, obj):
"""Stop tracking the given object. See also _track."""
self._objects.pop(obj.handle.path, None)
def commit(self):
# Give a chance for objects to persist data they want to before a commit is made.
self.on.pre_commit.emit()
# Make sure snapshots are saved by instances of StoredStateData. Any possible state
# modifications in on_commit handlers of instances of other classes will not be persisted.
self.on.commit.emit()
# Save our event count after all events have been emitted.
self.save_snapshot(self._stored)
self._storage.commit()
def register_type(self, cls, parent, kind=None):
if parent and not isinstance(parent, Handle):
parent = parent.handle
if parent:
parent_path = parent.path
else:
parent_path = None
if not kind:
kind = cls.handle_kind
self._type_registry[(parent_path, kind)] = cls
self._type_known.add(cls)
def save_snapshot(self, value):
"""Save a persistent snapshot of the provided value.
The provided value must implement the following interface:
value.handle = Handle(...)
value.snapshot() => {...} # Simple builtin types only.
value.restore(snapshot) # Restore custom state from prior snapshot.
"""
if type(value) not in self._type_known:
raise RuntimeError(
'cannot save {} values before registering that type'.format(type(value).__name__))
data = value.snapshot()
# Use marshal as a validator, enforcing the use of simple types, as we later the
# information is really pickled, which is too error prone for future evolution of the
# stored data (e.g. if the developer stores a custom object and later changes its
# class name; when unpickling the original class will not be there and event
# data loading will fail).
try:
marshal.dumps(data)
except ValueError:
msg = "unable to save the data for {}, it must contain only simple types: {!r}"
raise ValueError(msg.format(value.__class__.__name__, data))
# Use pickle for serialization, so the value remains portable.
raw_data = pickle.dumps(data)
self._storage.save_snapshot(value.handle.path, raw_data)
def load_snapshot(self, handle):
parent_path = None
if handle.parent:
parent_path = handle.parent.path
cls = self._type_registry.get((parent_path, handle.kind))
if not cls:
raise NoTypeError(handle.path)
raw_data = self._storage.load_snapshot(handle.path)
if not raw_data:
raise NoSnapshotError(handle.path)
data = pickle.loads(raw_data)
obj = cls.__new__(cls)
obj.framework = self
obj.handle = handle
obj.restore(data)
self._track(obj)
return obj
def drop_snapshot(self, handle):
self._storage.drop_snapshot(handle.path)
def observe(self, bound_event, observer):
"""Register observer to be called when bound_event is emitted.
The bound_event is generally provided as an attribute of the object that emits
the event, and is created in this style:
class SomeObject:
something_happened = Event(SomethingHappened)
That event may be observed as:
framework.observe(someobj.something_happened, self.on_something_happened)
If the method to be called follows the name convention "on_<event name>", it
may be omitted from the observe call. That means the above is equivalent to:
framework.observe(someobj.something_happened, self)
"""
if not isinstance(bound_event, BoundEvent):
raise RuntimeError(
'Framework.observe requires a BoundEvent as second parameter, got {}'.format(
bound_event))
event_type = bound_event.event_type
event_kind = bound_event.event_kind
emitter = bound_event.emitter
self.register_type(event_type, emitter, event_kind)
if hasattr(emitter, "handle"):
emitter_path = emitter.handle.path
else:
raise RuntimeError(
'event emitter {} must have a "handle" attribute'.format(type(emitter).__name__))
method_name = None
if isinstance(observer, types.MethodType):
method_name = observer.__name__
observer = observer.__self__
else:
method_name = "on_" + event_kind
if not hasattr(observer, method_name):
raise RuntimeError(
'Observer method not provided explicitly'
' and {} type has no "{}" method'.format(type(observer).__name__,
method_name))
# Validate that the method has an acceptable call signature.
sig = inspect.signature(getattr(observer, method_name))
# Self isn't included in the params list, so the first arg will be the event.
extra_params = list(sig.parameters.values())[1:]
if not sig.parameters:
raise TypeError(
'{}.{} must accept event parameter'.format(type(observer).__name__, method_name))
elif any(param.default is inspect.Parameter.empty for param in extra_params):
# Allow for additional optional params, since there's no reason to exclude them, but
# required params will break.
raise TypeError(
'{}.{} has extra required parameter'.format(type(observer).__name__, method_name))
# TODO Prevent the exact same parameters from being registered more than once.
self._observer[observer.handle.path] = observer
self._observers.append((observer.handle.path, method_name, emitter_path, event_kind))
def _next_event_key(self):
"""Return the next event key that should be used, incrementing the internal counter."""
# Increment the count first; this means the keys will start at 1, and 0
# means no events have been emitted.
self._stored['event_count'] += 1
return str(self._stored['event_count'])
def _emit(self, event):
"""See BoundEvent.emit for the public way to call this."""
# Save the event for all known observers before the first notification
# takes place, so that either everyone interested sees it, or nobody does.
self.save_snapshot(event)
event_path = event.handle.path
event_kind = event.handle.kind
parent_path = event.handle.parent.path
# TODO Track observers by (parent_path, event_kind) rather than as a list of
# all observers. Avoiding linear search through all observers for every event
for observer_path, method_name, _parent_path, _event_kind in self._observers:
if _parent_path != parent_path:
continue
if _event_kind and _event_kind != event_kind:
continue
# Again, only commit this after all notices are saved.
self._storage.save_notice(event_path, observer_path, method_name)
self._reemit(event_path)
def reemit(self):
"""Reemit previously deferred events to the observers that deferred them.
Only the specific observers that have previously deferred the event will be
notified again. Observers that asked to be notified about events after it's
been first emitted won't be notified, as that would mean potentially observing
events out of order.
"""
self._reemit()
def _reemit(self, single_event_path=None):
last_event_path = None
deferred = True
for event_path, observer_path, method_name in self._storage.notices(single_event_path):
event_handle = Handle.from_path(event_path)
if last_event_path != event_path:
if not deferred:
self._storage.drop_snapshot(last_event_path)
last_event_path = event_path
deferred = False
try:
event = self.load_snapshot(event_handle)
except NoTypeError:
self._storage.drop_notice(event_path, observer_path, method_name)
continue
event.deferred = False
observer = self._observer.get(observer_path)
if observer:
custom_handler = getattr(observer, method_name, None)
if custom_handler:
event_is_from_juju = isinstance(event, charm.HookEvent)
event_is_action = isinstance(event, charm.ActionEvent)
if (event_is_from_juju or event_is_action) and 'hook' in self._juju_debug_at:
# Present the welcome message and run under PDB.
self._show_debug_code_message()
pdb.runcall(custom_handler, event)
else:
# Regular call to the registered method.
custom_handler(event)
if event.deferred:
deferred = True
else:
self._storage.drop_notice(event_path, observer_path, method_name)
# We intentionally consider this event to be dead and reload it from
# scratch in the next path.
self.framework._forget(event)
if not deferred:
self._storage.drop_snapshot(last_event_path)
def _show_debug_code_message(self):
"""Present the welcome message (only once!) when using debugger functionality."""
if not self._breakpoint_welcomed:
self._breakpoint_welcomed = True
print(_BREAKPOINT_WELCOME_MESSAGE, file=sys.stderr, end='')
def breakpoint(self, name=None):
"""Add breakpoint, optionally named, at the place where this method is called.
For the breakpoint to be activated the JUJU_DEBUG_AT environment variable
must be set to "all" or to the specific name parameter provided, if any. In every
other situation calling this method does nothing.
The framework also provides a standard breakpoint named "hook", that will
stop execution when a hook event is about to be handled.
For those reasons, the "all" and "hook" breakpoint names are reserved.
"""
# If given, validate the name comply with all the rules
if name is not None:
if not isinstance(name, str):
raise TypeError('breakpoint names must be strings')
if name in ('hook', 'all'):
raise ValueError('breakpoint names "all" and "hook" are reserved')
if not re.match(r'^[a-z0-9]([a-z0-9\-]*[a-z0-9])?$', name):
raise ValueError('breakpoint names must look like "foo" or "foo-bar"')
indicated_breakpoints = self._juju_debug_at
if 'all' in indicated_breakpoints or name in indicated_breakpoints:
self._show_debug_code_message()
# If we call set_trace() directly it will open the debugger *here*, so indicating
# it to use our caller's frame
code_frame = inspect.currentframe().f_back
pdb.Pdb().set_trace(code_frame)
class StoredStateData(Object):
def __init__(self, parent, attr_name):
super().__init__(parent, attr_name)
self._cache = {}
self.dirty = False
def __getitem__(self, key):
return self._cache.get(key)
def __setitem__(self, key, value):
self._cache[key] = value
self.dirty = True
def __contains__(self, key):
return key in self._cache
def snapshot(self):
return self._cache
def restore(self, snapshot):
self._cache = snapshot
self.dirty = False
def on_commit(self, event):
if self.dirty:
self.framework.save_snapshot(self)
self.dirty = False
class BoundStoredState:
def __init__(self, parent, attr_name):
parent.framework.register_type(StoredStateData, parent)
handle = Handle(parent, StoredStateData.handle_kind, attr_name)
try:
data = parent.framework.load_snapshot(handle)
except NoSnapshotError:
data = StoredStateData(parent, attr_name)
# __dict__ is used to avoid infinite recursion.
self.__dict__["_data"] = data
self.__dict__["_attr_name"] = attr_name
parent.framework.observe(parent.framework.on.commit, self._data)
def __getattr__(self, key):
# "on" is the only reserved key that can't be used in the data map.
if key == "on":
return self._data.on
if key not in self._data:
raise AttributeError("attribute '{}' is not stored".format(key))
return _wrap_stored(self._data, self._data[key])
def __setattr__(self, key, value):
if key == "on":
raise AttributeError("attribute 'on' is reserved and cannot be set")
value = _unwrap_stored(self._data, value)
if not isinstance(value, (type(None), int, float, str, bytes, list, dict, set)):
raise AttributeError(
'attribute {!r} cannot be a {}: must be int/float/dict/list/etc'.format(
key, type(value).__name__))
self._data[key] = _unwrap_stored(self._data, value)
def set_default(self, **kwargs):
""""Set the value of any given key if it has not already been set"""
for k, v in kwargs.items():
if k not in self._data:
self._data[k] = v
class StoredState:
"""A class used to store data the charm needs persisted across invocations.
Example::
class MyClass(Object):
_stored = StoredState()
Instances of `MyClass` can transparently save state between invocations by
setting attributes on `_stored`. Initial state should be set with
`set_default` on the bound object, that is::
class MyClass(Object):
_stored = StoredState()
def __init__(self, parent, key):
super().__init__(parent, key)
self._stored.set_default(seen=set())
self.framework.observe(self.on.seen, self._on_seen)
def _on_seen(self, event):
self._stored.seen.add(event.uuid)
"""
def __init__(self):
self.parent_type = None
self.attr_name = None
def __get__(self, parent, parent_type=None):
if self.parent_type is not None and self.parent_type not in parent_type.mro():
# the StoredState instance is being shared between two unrelated classes
# -> unclear what is exepcted of us -> bail out
raise RuntimeError(
'StoredState shared by {} and {}'.format(
self.parent_type.__name__, parent_type.__name__))
if parent is None:
# accessing via the class directly (e.g. MyClass.stored)
return self
bound = None
if self.attr_name is not None:
bound = parent.__dict__.get(self.attr_name)
if bound is not None:
# we already have the thing from a previous pass, huzzah
return bound
# need to find ourselves amongst the parent's bases
for cls in parent_type.mro():
for attr_name, attr_value in cls.__dict__.items():
if attr_value is not self:
continue
# we've found ourselves! is it the first time?
if bound is not None:
# the StoredState instance is being stored in two different
# attributes -> unclear what is expected of us -> bail out
raise RuntimeError("StoredState shared by {0}.{1} and {0}.{2}".format(
cls.__name__, self.attr_name, attr_name))
# we've found ourselves for the first time; save where, and bind the object
self.attr_name = attr_name
self.parent_type = cls
bound = BoundStoredState(parent, attr_name)
if bound is not None:
# cache the bound object to avoid the expensive lookup the next time
# (don't use setattr, to keep things symmetric with the fast-path lookup above)
parent.__dict__[self.attr_name] = bound
return bound
raise AttributeError(
'cannot find {} attribute in type {}'.format(
self.__class__.__name__, parent_type.__name__))
def _wrap_stored(parent_data, value):
t = type(value)
if t is dict:
return StoredDict(parent_data, value)
if t is list:
return StoredList(parent_data, value)
if t is set:
return StoredSet(parent_data, value)
return value
def _unwrap_stored(parent_data, value):
t = type(value)
if t is StoredDict or t is StoredList or t is StoredSet:
return value._under
return value
class StoredDict(collections.abc.MutableMapping):
def __init__(self, stored_data, under):
self._stored_data = stored_data
self._under = under
def __getitem__(self, key):
return _wrap_stored(self._stored_data, self._under[key])
def __setitem__(self, key, value):
self._under[key] = _unwrap_stored(self._stored_data, value)
self._stored_data.dirty = True
def __delitem__(self, key):
del self._under[key]
self._stored_data.dirty = True
def __iter__(self):
return self._under.__iter__()
def __len__(self):
return len(self._under)
def __eq__(self, other):
if isinstance(other, StoredDict):
return self._under == other._under
elif isinstance(other, collections.abc.Mapping):
return self._under == other
else:
return NotImplemented
class StoredList(collections.abc.MutableSequence):
def __init__(self, stored_data, under):
self._stored_data = stored_data
self._under = under
def __getitem__(self, index):
return _wrap_stored(self._stored_data, self._under[index])
def __setitem__(self, index, value):
self._under[index] = _unwrap_stored(self._stored_data, value)
self._stored_data.dirty = True
def __delitem__(self, index):
del self._under[index]
self._stored_data.dirty = True
def __len__(self):
return len(self._under)
def insert(self, index, value):
self._under.insert(index, value)
self._stored_data.dirty = True
def append(self, value):
self._under.append(value)
self._stored_data.dirty = True
def __eq__(self, other):
if isinstance(other, StoredList):
return self._under == other._under
elif isinstance(other, collections.abc.Sequence):
return self._under == other
else:
return NotImplemented
def __lt__(self, other):
if isinstance(other, StoredList):
return self._under < other._under
elif isinstance(other, collections.abc.Sequence):
return self._under < other
else:
return NotImplemented
def __le__(self, other):
if isinstance(other, StoredList):
return self._under <= other._under
elif isinstance(other, collections.abc.Sequence):
return self._under <= other
else:
return NotImplemented
def __gt__(self, other):
if isinstance(other, StoredList):
return self._under > other._under
elif isinstance(other, collections.abc.Sequence):
return self._under > other
else:
return NotImplemented
def __ge__(self, other):
if isinstance(other, StoredList):
return self._under >= other._under
elif isinstance(other, collections.abc.Sequence):
return self._under >= other
else:
return NotImplemented
class StoredSet(collections.abc.MutableSet):
def __init__(self, stored_data, under):
self._stored_data = stored_data
self._under = under
def add(self, key):
self._under.add(key)
self._stored_data.dirty = True
def discard(self, key):
self._under.discard(key)
self._stored_data.dirty = True
def __contains__(self, key):
return key in self._under
def __iter__(self):
return self._under.__iter__()
def __len__(self):
return len(self._under)
@classmethod
def _from_iterable(cls, it):
"""Construct an instance of the class from any iterable input.
Per https://docs.python.org/3/library/collections.abc.html
if the Set mixin is being used in a class with a different constructor signature,
you will need to override _from_iterable() with a classmethod that can construct
new instances from an iterable argument.
"""
return set(it)
def __le__(self, other):
if isinstance(other, StoredSet):
return self._under <= other._under
elif isinstance(other, collections.abc.Set):
return self._under <= other
else:
return NotImplemented
def __ge__(self, other):
if isinstance(other, StoredSet):
return self._under >= other._under
elif isinstance(other, collections.abc.Set):
return self._under >= other
else:
return NotImplemented
def __eq__(self, other):
if isinstance(other, StoredSet):
return self._under == other._under
elif isinstance(other, collections.abc.Set):
return self._under == other
else:
return NotImplemented
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from functools import total_ordering
@total_ordering
class JujuVersion:
PATTERN = r'''^
(?P<major>\d{1,9})\.(?P<minor>\d{1,9}) # <major> and <minor> numbers are always there
((?:\.|-(?P<tag>[a-z]+))(?P<patch>\d{1,9}))? # sometimes with .<patch> or -<tag><patch>
(\.(?P<build>\d{1,9}))?$ # and sometimes with a <build> number.
'''
def __init__(self, version):
m = re.match(self.PATTERN, version, re.VERBOSE)
if not m:
raise RuntimeError('"{}" is not a valid Juju version string'.format(version))
d = m.groupdict()
self.major = int(m.group('major'))
self.minor = int(m.group('minor'))
self.tag = d['tag'] or ''
self.patch = int(d['patch'] or 0)
self.build = int(d['build'] or 0)
def __repr__(self):
if self.tag:
s = '{}.{}-{}{}'.format(self.major, self.minor, self.tag, self.patch)
else:
s = '{}.{}.{}'.format(self.major, self.minor, self.patch)
if self.build > 0:
s += '.{}'.format(self.build)
return s
def __eq__(self, other):
if self is other:
return True
if isinstance(other, str):
other = type(self)(other)
elif not isinstance(other, JujuVersion):
raise RuntimeError('cannot compare Juju version "{}" with "{}"'.format(self, other))
return (
self.major == other.major
and self.minor == other.minor
and self.tag == other.tag
and self.build == other.build
and self.patch == other.patch)
def __lt__(self, other):
if self is other:
return False
if isinstance(other, str):
other = type(self)(other)
elif not isinstance(other, JujuVersion):
raise RuntimeError('cannot compare Juju version "{}" with "{}"'.format(self, other))
if self.major != other.major:
return self.major < other.major
elif self.minor != other.minor:
return self.minor < other.minor
elif self.tag != other.tag:
if not self.tag:
return False
elif not other.tag:
return True
return self.tag < other.tag
elif self.patch != other.patch:
return self.patch < other.patch
elif self.build != other.build:
return self.build < other.build
return False
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
class JujuLogHandler(logging.Handler):
"""A handler for sending logs to Juju via juju-log."""
def __init__(self, model_backend, level=logging.DEBUG):
super().__init__(level)
self.model_backend = model_backend
def emit(self, record):
self.model_backend.juju_log(record.levelname, self.format(record))
def setup_root_logging(model_backend, debug=False):
"""Setup python logging to forward messages to juju-log.
By default, logging is set to DEBUG level, and messages will be filtered by Juju.
Charmers can also set their own default log level with::
logging.getLogger().setLevel(logging.INFO)
model_backend -- a ModelBackend to use for juju-log
debug -- if True, write logs to stderr as well as to juju-log.
"""
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(JujuLogHandler(model_backend))
if debug:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
#!/usr/bin/env python3
# Copyright 2019 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import sys
from pathlib import Path
import yaml
import ops.charm
import ops.framework
import ops.model
import logging
from ops.log import setup_root_logging
CHARM_STATE_FILE = '.unit-state.db'
logger = logging.getLogger()
def _get_charm_dir():
charm_dir = os.environ.get("JUJU_CHARM_DIR")
if charm_dir is None:
# Assume $JUJU_CHARM_DIR/lib/op/main.py structure.
charm_dir = Path('{}/../../..'.format(__file__)).resolve()
else:
charm_dir = Path(charm_dir).resolve()
return charm_dir
def _load_metadata(charm_dir):
metadata = yaml.safe_load((charm_dir / 'metadata.yaml').read_text())
actions_meta = charm_dir / 'actions.yaml'
if actions_meta.exists():
actions_metadata = yaml.safe_load(actions_meta.read_text())
else:
actions_metadata = {}
return metadata, actions_metadata
def _create_event_link(charm, bound_event):
"""Create a symlink for a particular event.
charm -- A charm object.
bound_event -- An event for which to create a symlink.
"""
if issubclass(bound_event.event_type, ops.charm.HookEvent):
event_dir = charm.framework.charm_dir / 'hooks'
event_path = event_dir / bound_event.event_kind.replace('_', '-')
elif issubclass(bound_event.event_type, ops.charm.ActionEvent):
if not bound_event.event_kind.endswith("_action"):
raise RuntimeError(
'action event name {} needs _action suffix'.format(bound_event.event_kind))
event_dir = charm.framework.charm_dir / 'actions'
# The event_kind is suffixed with "_action" while the executable is not.
event_path = event_dir / bound_event.event_kind[:-len('_action')].replace('_', '-')
else:
raise RuntimeError(
'cannot create a symlink: unsupported event type {}'.format(bound_event.event_type))
event_dir.mkdir(exist_ok=True)
if not event_path.exists():
# CPython has different implementations for populating sys.argv[0] for Linux and Windows.
# For Windows it is always an absolute path (any symlinks are resolved)
# while for Linux it can be a relative path.
target_path = os.path.relpath(os.path.realpath(sys.argv[0]), str(event_dir))
# Ignore the non-symlink files or directories
# assuming the charm author knows what they are doing.
logger.debug(
'Creating a new relative symlink at %s pointing to %s',
event_path, target_path)
event_path.symlink_to(target_path)
def _setup_event_links(charm_dir, charm):
"""Set up links for supported events that originate from Juju.
Whether a charm can handle an event or not can be determined by
introspecting which events are defined on it.
Hooks or actions are created as symlinks to the charm code file
which is determined by inspecting symlinks provided by the charm
author at hooks/install or hooks/start.
charm_dir -- A root directory of the charm.
charm -- An instance of the Charm class.
"""
for bound_event in charm.on.events().values():
# Only events that originate from Juju need symlinks.
if issubclass(bound_event.event_type, (ops.charm.HookEvent, ops.charm.ActionEvent)):
_create_event_link(charm, bound_event)
def _emit_charm_event(charm, event_name):
"""Emits a charm event based on a Juju event name.
charm -- A charm instance to emit an event from.
event_name -- A Juju event name to emit on a charm.
"""
event_to_emit = None
try:
event_to_emit = getattr(charm.on, event_name)
except AttributeError:
logger.debug("event %s not defined for %s", event_name, charm)
# If the event is not supported by the charm implementation, do
# not error out or try to emit it. This is to support rollbacks.
if event_to_emit is not None:
args, kwargs = _get_event_args(charm, event_to_emit)
logger.debug('Emitting Juju event %s', event_name)
event_to_emit.emit(*args, **kwargs)
def _get_event_args(charm, bound_event):
event_type = bound_event.event_type
model = charm.framework.model
if issubclass(event_type, ops.charm.RelationEvent):
relation_name = os.environ['JUJU_RELATION']
relation_id = int(os.environ['JUJU_RELATION_ID'].split(':')[-1])
relation = model.get_relation(relation_name, relation_id)
else:
relation = None
remote_app_name = os.environ.get('JUJU_REMOTE_APP', '')
remote_unit_name = os.environ.get('JUJU_REMOTE_UNIT', '')
if remote_app_name or remote_unit_name:
if not remote_app_name:
if '/' not in remote_unit_name:
raise RuntimeError('invalid remote unit name: {}'.format(remote_unit_name))
remote_app_name = remote_unit_name.split('/')[0]
args = [relation, model.get_app(remote_app_name)]
if remote_unit_name:
args.append(model.get_unit(remote_unit_name))
return args, {}
elif relation:
return [relation], {}
return [], {}
def main(charm_class):
"""Setup the charm and dispatch the observed event.
The event name is based on the way this executable was called (argv[0]).
"""
charm_dir = _get_charm_dir()
model_backend = ops.model.ModelBackend()
debug = ('JUJU_DEBUG' in os.environ)
setup_root_logging(model_backend, debug=debug)
# Process the Juju event relevant to the current hook execution
# JUJU_HOOK_NAME, JUJU_FUNCTION_NAME, and JUJU_ACTION_NAME are not used
# in order to support simulation of events from debugging sessions.
#
# TODO: For Windows, when symlinks are used, this is not a valid
# method of getting an event name (see LP: #1854505).
juju_exec_path = Path(sys.argv[0])
has_dispatch = juju_exec_path.name == 'dispatch'
if has_dispatch:
# The executable was 'dispatch', which means the actual hook we want to
# run needs to be looked up in the JUJU_DISPATCH_PATH env var, where it
# should be a path relative to the charm directory (the directory that
# holds `dispatch`). If that path actually exists, we want to run that
# before continuing.
dispatch_path = juju_exec_path.parent / Path(os.environ['JUJU_DISPATCH_PATH'])
if dispatch_path.exists() and dispatch_path.resolve() != juju_exec_path.resolve():
argv = sys.argv.copy()
argv[0] = str(dispatch_path)
try:
subprocess.run(argv, check=True)
except subprocess.CalledProcessError as e:
logger.warning("hook %s exited with status %d", dispatch_path, e.returncode)
sys.exit(e.returncode)
juju_exec_path = dispatch_path
juju_event_name = juju_exec_path.name.replace('-', '_')
if juju_exec_path.parent.name == 'actions':
juju_event_name = '{}_action'.format(juju_event_name)
metadata, actions_metadata = _load_metadata(charm_dir)
meta = ops.charm.CharmMeta(metadata, actions_metadata)
unit_name = os.environ['JUJU_UNIT_NAME']
model = ops.model.Model(unit_name, meta, model_backend)
# TODO: If Juju unit agent crashes after exit(0) from the charm code
# the framework will commit the snapshot but Juju will not commit its
# operation.
charm_state_path = charm_dir / CHARM_STATE_FILE
framework = ops.framework.Framework(charm_state_path, charm_dir, meta, model)
try:
charm = charm_class(framework, None)
if not has_dispatch:
# When a charm is force-upgraded and a unit is in an error state Juju
# does not run upgrade-charm and instead runs the failed hook followed
# by config-changed. Given the nature of force-upgrading the hook setup
# code is not triggered on config-changed.
#
# 'start' event is included as Juju does not fire the install event for
# K8s charms (see LP: #1854635).
if (juju_event_name in ('install', 'start', 'upgrade_charm')
or juju_event_name.endswith('_storage_attached')):
_setup_event_links(charm_dir, charm)
# TODO: Remove the collect_metrics check below as soon as the relevant
# Juju changes are made.
#
# Skip reemission of deferred events for collect-metrics events because
# they do not have the full access to all hook tools.
if juju_event_name != 'collect_metrics':
framework.reemit()
_emit_charm_event(charm, juju_event_name)
framework.commit()
finally:
framework.close()
# Copyright 2019 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import weakref
import os
import shutil
import tempfile
import time
import datetime
import re
import ipaddress
import decimal
from abc import ABC, abstractmethod
from collections.abc import Mapping, MutableMapping
from pathlib import Path
from subprocess import run, PIPE, CalledProcessError
class Model:
def __init__(self, unit_name, meta, backend):
self._cache = _ModelCache(backend)
self._backend = backend
self.unit = self.get_unit(unit_name)
self.app = self.unit.app
self.relations = RelationMapping(meta.relations, self.unit, self._backend, self._cache)
self.config = ConfigData(self._backend)
self.resources = Resources(list(meta.resources), self._backend)
self.pod = Pod(self._backend)
self.storages = StorageMapping(list(meta.storages), self._backend)
self._bindings = BindingMapping(self._backend)
def get_unit(self, unit_name):
return self._cache.get(Unit, unit_name)
def get_app(self, app_name):
return self._cache.get(Application, app_name)
def get_relation(self, relation_name, relation_id=None):
"""Get a specific Relation instance.
If relation_id is given, this will return that Relation instance.
If relation_id is not given, this will return the Relation instance if the
relation is established only once or None if it is not established. If this
same relation is established multiple times the error TooManyRelatedAppsError is raised.
"""
return self.relations._get_unique(relation_name, relation_id)
def get_binding(self, binding_key):
"""Get a network space binding.
binding_key -- The relation name or instance to obtain bindings for.
If binding_key is a relation name, the method returns the default binding for that
relation. If a relation instance is provided, the method first looks up a more specific
binding for that specific relation ID, and if none is found falls back to the default
binding for the relation name.
"""
return self._bindings.get(binding_key)
class _ModelCache:
def __init__(self, backend):
self._backend = backend
self._weakrefs = weakref.WeakValueDictionary()
def get(self, entity_type, *args):
key = (entity_type,) + args
entity = self._weakrefs.get(key)
if entity is None:
entity = entity_type(*args, backend=self._backend, cache=self)
self._weakrefs[key] = entity
return entity
class Application:
def __init__(self, name, backend, cache):
self.name = name
self._backend = backend
self._cache = cache
self._is_our_app = self.name == self._backend.app_name
self._status = None
@property
def status(self):
if not self._is_our_app:
return UnknownStatus()
if not self._backend.is_leader():
raise RuntimeError('cannot get application status as a non-leader unit')
if self._status:
return self._status
s = self._backend.status_get(is_app=True)
self._status = StatusBase.from_name(s['status'], s['message'])
return self._status
@status.setter
def status(self, value):
if not isinstance(value, StatusBase):
raise InvalidStatusError(
'invalid value provided for application {} status: {}'.format(self, value)
)
if not self._is_our_app:
raise RuntimeError('cannot to set status for a remote application {}'.format(self))
if not self._backend.is_leader():
raise RuntimeError('cannot set application status as a non-leader unit')
self._backend.status_set(value.name, value.message, is_app=True)
self._status = value
def __repr__(self):
return '<{}.{} {}>'.format(type(self).__module__, type(self).__name__, self.name)
class Unit:
def __init__(self, name, backend, cache):
self.name = name
app_name = name.split('/')[0]
self.app = cache.get(Application, app_name)
self._backend = backend
self._cache = cache
self._is_our_unit = self.name == self._backend.unit_name
self._status = None
@property
def status(self):
if not self._is_our_unit:
return UnknownStatus()
if self._status:
return self._status
s = self._backend.status_get(is_app=False)
self._status = StatusBase.from_name(s['status'], s['message'])
return self._status
@status.setter
def status(self, value):
if not isinstance(value, StatusBase):
raise InvalidStatusError(
'invalid value provided for unit {} status: {}'.format(self, value)
)
if not self._is_our_unit:
raise RuntimeError('cannot set status for a remote unit {}'.format(self))
self._backend.status_set(value.name, value.message, is_app=False)
self._status = value
def __repr__(self):
return '<{}.{} {}>'.format(type(self).__module__, type(self).__name__, self.name)
def is_leader(self):
if self._is_our_unit:
# This value is not cached as it is not guaranteed to persist for the whole duration
# of a hook execution.
return self._backend.is_leader()
else:
raise RuntimeError(
'cannot determine leadership status for remote applications: {}'.format(self)
)
def set_workload_version(self, version):
"""Record the version of the software running as the workload.
This shouldn't be confused with the revision of the charm. This is informative only;
shown in the output of 'juju status'.
"""
if not isinstance(version, str):
raise TypeError("workload version must be a str, not {}: {!r}".format(
type(version).__name__, version))
self._backend.application_version_set(version)
class LazyMapping(Mapping, ABC):
_lazy_data = None
@abstractmethod
def _load(self):
raise NotImplementedError()
@property
def _data(self):
data = self._lazy_data
if data is None:
data = self._lazy_data = self._load()
return data
def _invalidate(self):
self._lazy_data = None
def __contains__(self, key):
return key in self._data
def __len__(self):
return len(self._data)
def __iter__(self):
return iter(self._data)
def __getitem__(self, key):
return self._data[key]
class RelationMapping(Mapping):
"""Map of relation names to lists of Relation instances."""
def __init__(self, relations_meta, our_unit, backend, cache):
self._peers = set()
for name, relation_meta in relations_meta.items():
if relation_meta.role == 'peers':
self._peers.add(name)
self._our_unit = our_unit
self._backend = backend
self._cache = cache
self._data = {relation_name: None for relation_name in relations_meta}
def __contains__(self, key):
return key in self._data
def __len__(self):
return len(self._data)
def __iter__(self):
return iter(self._data)
def __getitem__(self, relation_name):
is_peer = relation_name in self._peers
relation_list = self._data[relation_name]
if relation_list is None:
relation_list = self._data[relation_name] = []
for rid in self._backend.relation_ids(relation_name):
relation = Relation(relation_name, rid, is_peer,
self._our_unit, self._backend, self._cache)
relation_list.append(relation)
return relation_list
def _invalidate(self, relation_name):
self._data[relation_name] = None
def _get_unique(self, relation_name, relation_id=None):
if relation_id is not None:
if not isinstance(relation_id, int):
raise ModelError('relation id {} must be int or None not {}'.format(
relation_id,
type(relation_id).__name__))
for relation in self[relation_name]:
if relation.id == relation_id:
return relation
else:
# The relation may be dead, but it is not forgotten.
is_peer = relation_name in self._peers
return Relation(relation_name, relation_id, is_peer,
self._our_unit, self._backend, self._cache)
num_related = len(self[relation_name])
if num_related == 0:
return None
elif num_related == 1:
return self[relation_name][0]
else:
# TODO: We need something in the framework to catch and gracefully handle
# errors, ideally integrating the error catching with Juju's mechanisms.
raise TooManyRelatedAppsError(relation_name, num_related, 1)
class BindingMapping:
def __init__(self, backend):
self._backend = backend
self._data = {}
def get(self, binding_key):
if isinstance(binding_key, Relation):
binding_name = binding_key.name
relation_id = binding_key.id
elif isinstance(binding_key, str):
binding_name = binding_key
relation_id = None
else:
raise ModelError('binding key must be str or relation instance, not {}'
''.format(type(binding_key).__name__))
binding = self._data.get(binding_key)
if binding is None:
binding = Binding(binding_name, relation_id, self._backend)
self._data[binding_key] = binding
return binding
class Binding:
"""Binding to a network space."""
def __init__(self, name, relation_id, backend):
self.name = name
self._relation_id = relation_id
self._backend = backend
self._network = None
@property
def network(self):
if self._network is None:
try:
self._network = Network(self._backend.network_get(self.name, self._relation_id))
except RelationNotFoundError:
if self._relation_id is None:
raise
# If a relation is dead, we can still get network info associated with an
# endpoint itself
self._network = Network(self._backend.network_get(self.name))
return self._network
class Network:
"""Network space details."""
def __init__(self, network_info):
self.interfaces = []
# Treat multiple addresses on an interface as multiple logical
# interfaces with the same name.
for interface_info in network_info['bind-addresses']:
interface_name = interface_info['interface-name']
for address_info in interface_info['addresses']:
self.interfaces.append(NetworkInterface(interface_name, address_info))
self.ingress_addresses = []
for address in network_info['ingress-addresses']:
self.ingress_addresses.append(ipaddress.ip_address(address))
self.egress_subnets = []
for subnet in network_info['egress-subnets']:
self.egress_subnets.append(ipaddress.ip_network(subnet))
@property
def bind_address(self):
return self.interfaces[0].address
@property
def ingress_address(self):
return self.ingress_addresses[0]
class NetworkInterface:
def __init__(self, name, address_info):
self.name = name
# TODO: expose a hardware address here, see LP: #1864070.
self.address = ipaddress.ip_address(address_info['value'])
cidr = address_info['cidr']
if not cidr:
# The cidr field may be empty, see LP: #1864102.
# In this case, make it a /32 or /128 IP network.
self.subnet = ipaddress.ip_network(address_info['value'])
else:
self.subnet = ipaddress.ip_network(cidr)
# TODO: expose a hostname/canonical name for the address here, see LP: #1864086.
class Relation:
def __init__(self, relation_name, relation_id, is_peer, our_unit, backend, cache):
self.name = relation_name
self.id = relation_id
self.app = None
self.units = set()
# For peer relations, both the remote and the local app are the same.
if is_peer:
self.app = our_unit.app
try:
for unit_name in backend.relation_list(self.id):
unit = cache.get(Unit, unit_name)
self.units.add(unit)
if self.app is None:
self.app = unit.app
except RelationNotFoundError:
# If the relation is dead, just treat it as if it has no remote units.
pass
self.data = RelationData(self, our_unit, backend)
def __repr__(self):
return '<{}.{} {}:{}>'.format(type(self).__module__,
type(self).__name__,
self.name,
self.id)
class RelationData(Mapping):
def __init__(self, relation, our_unit, backend):
self.relation = weakref.proxy(relation)
self._data = {
our_unit: RelationDataContent(self.relation, our_unit, backend),
our_unit.app: RelationDataContent(self.relation, our_unit.app, backend),
}
self._data.update({
unit: RelationDataContent(self.relation, unit, backend)
for unit in self.relation.units})
# The relation might be dead so avoid a None key here.
if self.relation.app is not None:
self._data.update({
self.relation.app: RelationDataContent(self.relation, self.relation.app, backend),
})
def __contains__(self, key):
return key in self._data
def __len__(self):
return len(self._data)
def __iter__(self):
return iter(self._data)
def __getitem__(self, key):
return self._data[key]
# We mix in MutableMapping here to get some convenience implementations, but whether it's actually
# mutable or not is controlled by the flag.
class RelationDataContent(LazyMapping, MutableMapping):
def __init__(self, relation, entity, backend):
self.relation = relation
self._entity = entity
self._backend = backend
self._is_app = isinstance(entity, Application)
def _load(self):
try:
return self._backend.relation_get(self.relation.id, self._entity.name, self._is_app)
except RelationNotFoundError:
# Dead relations tell no tales (and have no data).
return {}
def _is_mutable(self):
if self._is_app:
is_our_app = self._backend.app_name == self._entity.name
if not is_our_app:
return False
# Whether the application data bag is mutable or not depends on
# whether this unit is a leader or not, but this is not guaranteed
# to be always true during the same hook execution.
return self._backend.is_leader()
else:
is_our_unit = self._backend.unit_name == self._entity.name
if is_our_unit:
return True
return False
def __setitem__(self, key, value):
if not self._is_mutable():
raise RelationDataError('cannot set relation data for {}'.format(self._entity.name))
if not isinstance(value, str):
raise RelationDataError('relation data values must be strings')
self._backend.relation_set(self.relation.id, key, value, self._is_app)
# Don't load data unnecessarily if we're only updating.
if self._lazy_data is not None:
if value == '':
# Match the behavior of Juju, which is that setting the value to an
# empty string will remove the key entirely from the relation data.
del self._data[key]
else:
self._data[key] = value
def __delitem__(self, key):
# Match the behavior of Juju, which is that setting the value to an empty
# string will remove the key entirely from the relation data.
self.__setitem__(key, '')
class ConfigData(LazyMapping):
def __init__(self, backend):
self._backend = backend
def _load(self):
return self._backend.config_get()
class StatusBase:
"""Status values specific to applications and units."""
_statuses = {}
def __init__(self, message):
self.message = message
def __new__(cls, *args, **kwargs):
if cls is StatusBase:
raise TypeError("cannot instantiate a base class")
cls._statuses[cls.name] = cls
return super().__new__(cls)
@classmethod
def from_name(cls, name, message):
return cls._statuses[name](message)
class ActiveStatus(StatusBase):
"""The unit is ready.
The unit believes it is correctly offering all the services it has been asked to offer.
"""
name = 'active'
def __init__(self, message=None):
super().__init__(message or '')
class BlockedStatus(StatusBase):
"""The unit requires manual intervention.
An operator has to manually intervene to unblock the unit and let it proceed.
"""
name = 'blocked'
class MaintenanceStatus(StatusBase):
"""The unit is performing maintenance tasks.
The unit is not yet providing services, but is actively doing work in preparation
for providing those services. This is a "spinning" state, not an error state. It
reflects activity on the unit itself, not on peers or related units.
"""
name = 'maintenance'
class UnknownStatus(StatusBase):
"""The unit status is unknown.
A unit-agent has finished calling install, config-changed and start, but the
charm has not called status-set yet.
"""
name = 'unknown'
def __init__(self):
# Unknown status cannot be set and does not have a message associated with it.
super().__init__('')
class WaitingStatus(StatusBase):
"""A unit is unable to progress.
The unit is unable to progress to an active state because an application to which
it is related is not running.
"""
name = 'waiting'
class Resources:
"""Object representing resources for the charm.
"""
def __init__(self, names, backend):
self._backend = backend
self._paths = {name: None for name in names}
def fetch(self, name):
"""Fetch the resource from the controller or store.
If successfully fetched, this returns a Path object to where the resource is stored
on disk, otherwise it raises a ModelError.
"""
if name not in self._paths:
raise RuntimeError('invalid resource name: {}'.format(name))
if self._paths[name] is None:
self._paths[name] = Path(self._backend.resource_get(name))
return self._paths[name]
class Pod:
def __init__(self, backend):
self._backend = backend
def set_spec(self, spec, k8s_resources=None):
if not self._backend.is_leader():
raise ModelError('cannot set a pod spec as this unit is not a leader')
self._backend.pod_spec_set(spec, k8s_resources)
class StorageMapping(Mapping):
"""Map of storage names to lists of Storage instances."""
def __init__(self, storage_names, backend):
self._backend = backend
self._storage_map = {storage_name: None for storage_name in storage_names}
def __contains__(self, key):
return key in self._storage_map
def __len__(self):
return len(self._storage_map)
def __iter__(self):
return iter(self._storage_map)
def __getitem__(self, storage_name):
storage_list = self._storage_map[storage_name]
if storage_list is None:
storage_list = self._storage_map[storage_name] = []
for storage_id in self._backend.storage_list(storage_name):
storage_list.append(Storage(storage_name, storage_id, self._backend))
return storage_list
def request(self, storage_name, count=1):
"""Requests new storage instances of a given name.
Uses storage-add tool to request additional storage. Juju will notify the unit
via <storage-name>-storage-attached events when it becomes available.
"""
if storage_name not in self._storage_map:
raise ModelError(('cannot add storage {!r}:'
' it is not present in the charm metadata').format(storage_name))
self._backend.storage_add(storage_name, count)
class Storage:
def __init__(self, storage_name, storage_id, backend):
self.name = storage_name
self.id = storage_id
self._backend = backend
self._location = None
@property
def location(self):
if self._location is None:
raw = self._backend.storage_get('{}/{}'.format(self.name, self.id), "location")
self._location = Path(raw)
return self._location
class ModelError(Exception):
pass
class TooManyRelatedAppsError(ModelError):
def __init__(self, relation_name, num_related, max_supported):
super().__init__('Too many remote applications on {} ({} > {})'.format(
relation_name, num_related, max_supported))
self.relation_name = relation_name
self.num_related = num_related
self.max_supported = max_supported
class RelationDataError(ModelError):
pass
class RelationNotFoundError(ModelError):
pass
class InvalidStatusError(ModelError):
pass
class ModelBackend:
LEASE_RENEWAL_PERIOD = datetime.timedelta(seconds=30)
def __init__(self):
self.unit_name = os.environ['JUJU_UNIT_NAME']
self.app_name = self.unit_name.split('/')[0]
self._is_leader = None
self._leader_check_time = None
def _run(self, *args, return_output=False, use_json=False):
kwargs = dict(stdout=PIPE, stderr=PIPE)
if use_json:
args += ('--format=json',)
try:
result = run(args, check=True, **kwargs)
except CalledProcessError as e:
raise ModelError(e.stderr)
if return_output:
if result.stdout is None:
return ''
else:
text = result.stdout.decode('utf8')
if use_json:
return json.loads(text)
else:
return text
def relation_ids(self, relation_name):
relation_ids = self._run('relation-ids', relation_name, return_output=True, use_json=True)
return [int(relation_id.split(':')[-1]) for relation_id in relation_ids]
def relation_list(self, relation_id):
try:
return self._run('relation-list', '-r', str(relation_id),
return_output=True, use_json=True)
except ModelError as e:
if 'relation not found' in str(e):
raise RelationNotFoundError() from e
raise
def relation_get(self, relation_id, member_name, is_app):
if not isinstance(is_app, bool):
raise TypeError('is_app parameter to relation_get must be a boolean')
try:
return self._run('relation-get', '-r', str(relation_id),
'-', member_name, '--app={}'.format(is_app),
return_output=True, use_json=True)
except ModelError as e:
if 'relation not found' in str(e):
raise RelationNotFoundError() from e
raise
def relation_set(self, relation_id, key, value, is_app):
if not isinstance(is_app, bool):
raise TypeError('is_app parameter to relation_set must be a boolean')
try:
return self._run('relation-set', '-r', str(relation_id),
'{}={}'.format(key, value), '--app={}'.format(is_app))
except ModelError as e:
if 'relation not found' in str(e):
raise RelationNotFoundError() from e
raise
def config_get(self):
return self._run('config-get', return_output=True, use_json=True)
def is_leader(self):
"""Obtain the current leadership status for the unit the charm code is executing on.
The value is cached for the duration of a lease which is 30s in Juju.
"""
now = time.monotonic()
if self._leader_check_time is None:
check = True
else:
time_since_check = datetime.timedelta(seconds=now - self._leader_check_time)
check = (time_since_check > self.LEASE_RENEWAL_PERIOD or self._is_leader is None)
if check:
# Current time MUST be saved before running is-leader to ensure the cache
# is only used inside the window that is-leader itself asserts.
self._leader_check_time = now
self._is_leader = self._run('is-leader', return_output=True, use_json=True)
return self._is_leader
def resource_get(self, resource_name):
return self._run('resource-get', resource_name, return_output=True).strip()
def pod_spec_set(self, spec, k8s_resources):
tmpdir = Path(tempfile.mkdtemp('-pod-spec-set'))
try:
spec_path = tmpdir / 'spec.json'
spec_path.write_text(json.dumps(spec))
args = ['--file', str(spec_path)]
if k8s_resources:
k8s_res_path = tmpdir / 'k8s-resources.json'
k8s_res_path.write_text(json.dumps(k8s_resources))
args.extend(['--k8s-resources', str(k8s_res_path)])
self._run('pod-spec-set', *args)
finally:
shutil.rmtree(str(tmpdir))
def status_get(self, *, is_app=False):
"""Get a status of a unit or an application.
app -- A boolean indicating whether the status should be retrieved for a unit
or an application.
"""
return self._run('status-get', '--include-data', '--application={}'.format(is_app))
def status_set(self, status, message='', *, is_app=False):
"""Set a status of a unit or an application.
app -- A boolean indicating whether the status should be set for a unit or an
application.
"""
if not isinstance(is_app, bool):
raise TypeError('is_app parameter must be boolean')
return self._run('status-set', '--application={}'.format(is_app), status, message)
def storage_list(self, name):
return [int(s.split('/')[1]) for s in self._run('storage-list', name,
return_output=True, use_json=True)]
def storage_get(self, storage_name_id, attribute):
return self._run('storage-get', '-s', storage_name_id, attribute,
return_output=True, use_json=True)
def storage_add(self, name, count=1):
if not isinstance(count, int) or isinstance(count, bool):
raise TypeError('storage count must be integer, got: {} ({})'.format(count,
type(count)))
self._run('storage-add', '{}={}'.format(name, count))
def action_get(self):
return self._run('action-get', return_output=True, use_json=True)
def action_set(self, results):
self._run('action-set', *["{}={}".format(k, v) for k, v in results.items()])
def action_log(self, message):
self._run('action-log', message)
def action_fail(self, message=''):
self._run('action-fail', message)
def application_version_set(self, version):
self._run('application-version-set', '--', version)
def juju_log(self, level, message):
self._run('juju-log', '--log-level', level, message)
def network_get(self, binding_name, relation_id=None):
"""Return network info provided by network-get for a given binding.
binding_name -- A name of a binding (relation name or extra-binding name).
relation_id -- An optional relation id to get network info for.
"""
cmd = ['network-get', binding_name]
if relation_id is not None:
cmd.extend(['-r', str(relation_id)])
try:
return self._run(*cmd, return_output=True, use_json=True)
except ModelError as e:
if 'relation not found' in str(e):
raise RelationNotFoundError() from e
raise
def add_metrics(self, metrics, labels=None):
cmd = ['add-metric']
if labels:
label_args = []
for k, v in labels.items():
_ModelBackendValidator.validate_metric_label(k)
_ModelBackendValidator.validate_label_value(k, v)
label_args.append('{}={}'.format(k, v))
cmd.extend(['--labels', ','.join(label_args)])
metric_args = []
for k, v in metrics.items():
_ModelBackendValidator.validate_metric_key(k)
metric_value = _ModelBackendValidator.format_metric_value(v)
metric_args.append('{}={}'.format(k, metric_value))
cmd.extend(metric_args)
self._run(*cmd)
class _ModelBackendValidator:
"""Provides facilities for validating inputs and formatting them for model backends."""
METRIC_KEY_REGEX = re.compile(r'^[a-zA-Z](?:[a-zA-Z0-9-_]*[a-zA-Z0-9])?$')
@classmethod
def validate_metric_key(cls, key):
if cls.METRIC_KEY_REGEX.match(key) is None:
raise ModelError(
'invalid metric key {!r}: must match {}'.format(
key, cls.METRIC_KEY_REGEX.pattern))
@classmethod
def validate_metric_label(cls, label_name):
if cls.METRIC_KEY_REGEX.match(label_name) is None:
raise ModelError(
'invalid metric label name {!r}: must match {}'.format(
label_name, cls.METRIC_KEY_REGEX.pattern))
@classmethod
def format_metric_value(cls, value):
try:
decimal_value = decimal.Decimal.from_float(value)
except TypeError as e:
e2 = ModelError('invalid metric value {!r} provided:'
' must be a positive finite float'.format(value))
raise e2 from e
if decimal_value.is_nan() or decimal_value.is_infinite() or decimal_value < 0:
raise ModelError('invalid metric value {!r} provided:'
' must be a positive finite float'.format(value))
return str(decimal_value)
@classmethod
def validate_label_value(cls, label, value):
# Label values cannot be empty, contain commas or equal signs as those are
# used by add-metric as separators.
if not value:
raise ModelError(
'metric label {} has an empty value, which is not allowed'.format(label))
v = str(value)
if re.search('[,=]', v) is not None:
raise ModelError(
'metric label values must not contain "," or "=": {}={!r}'.format(label, value))
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import pathlib
from textwrap import dedent
import typing
from ops import charm, framework, model
# OptionalYAML is something like metadata.yaml or actions.yaml. You can
# pass in a file-like object or the string directly.
OptionalYAML = typing.Optional[typing.Union[str, typing.TextIO]]
# noinspection PyProtectedMember
class Harness:
"""This class represents a way to build up the model that will drive a test suite.
The model that is created is from the viewpoint of the charm that you are testing.
Example::
harness = Harness(MyCharm)
# Do initial setup here
relation_id = harness.add_relation('db', 'postgresql')
# Now instantiate the charm to see events as the model changes
harness.begin()
harness.add_relation_unit(relation_id, 'postgresql/0')
harness.update_relation_data(relation_id, 'postgresql/0', {'key': 'val'})
# Check that charm has properly handled the relation_joined event for postgresql/0
self.assertEqual(harness.charm. ...)
Args:
charm_cls: The Charm class that you'll be testing.
meta: charm.CharmBase is a A string or file-like object containing the contents of
metadata.yaml. If not supplied, we will look for a 'metadata.yaml' file in the
parent directory of the Charm, and if not found fall back to a trivial
'name: test-charm' metadata.
actions: A string or file-like object containing the contents of
actions.yaml. If not supplied, we will look for a 'actions.yaml' file in the
parent directory of the Charm.
"""
def __init__(
self,
charm_cls: typing.Type[charm.CharmBase],
*,
meta: OptionalYAML = None,
actions: OptionalYAML = None):
# TODO: jam 2020-03-05 We probably want to take config as a parameter as well, since
# it would define the default values of config that the charm would see.
self._charm_cls = charm_cls
self._charm = None
self._charm_dir = 'no-disk-path' # this may be updated by _create_meta
self._meta = self._create_meta(meta, actions)
self._unit_name = self._meta.name + '/0'
self._framework = None
self._hooks_enabled = True
self._relation_id_counter = 0
self._backend = _TestingModelBackend(self._unit_name, self._meta)
self._model = model.Model(self._unit_name, self._meta, self._backend)
self._framework = framework.Framework(":memory:", self._charm_dir, self._meta, self._model)
@property
def charm(self) -> charm.CharmBase:
"""Return the instance of the charm class that was passed to __init__.
Note that the Charm is not instantiated until you have called
:meth:`.begin()`.
"""
return self._charm
@property
def model(self) -> model.Model:
"""Return the :class:`~ops.model.Model` that is being driven by this Harness."""
return self._model
@property
def framework(self) -> framework.Framework:
"""Return the Framework that is being driven by this Harness."""
return self._framework
def begin(self) -> None:
"""Instantiate the Charm and start handling events.
Before calling begin(), there is no Charm instance, so changes to the Model won't emit
events. You must call begin before :attr:`.charm` is valid.
"""
if self._charm is not None:
raise RuntimeError('cannot call the begin method on the harness more than once')
# The Framework adds attributes to class objects for events, etc. As such, we can't re-use
# the original class against multiple Frameworks. So create a locally defined class
# and register it.
# TODO: jam 2020-03-16 We are looking to changes this to Instance attributes instead of
# Class attributes which should clean up this ugliness. The API can stay the same
class TestEvents(self._charm_cls.on.__class__):
pass
TestEvents.__name__ = self._charm_cls.on.__class__.__name__
class TestCharm(self._charm_cls):
on = TestEvents()
# Note: jam 2020-03-01 This is so that errors in testing say MyCharm has no attribute foo,
# rather than TestCharm has no attribute foo.
TestCharm.__name__ = self._charm_cls.__name__
self._charm = TestCharm(self._framework, self._framework.meta.name)
def _create_meta(self, charm_metadata, action_metadata):
"""Create a CharmMeta object.
Handle the cases where a user doesn't supply explicit metadata snippets.
"""
filename = inspect.getfile(self._charm_cls)
charm_dir = pathlib.Path(filename).parents[1]
if charm_metadata is None:
metadata_path = charm_dir / 'metadata.yaml'
if metadata_path.is_file():
charm_metadata = metadata_path.read_text()
self._charm_dir = charm_dir
else:
# The simplest of metadata that the framework can support
charm_metadata = 'name: test-charm'
elif isinstance(charm_metadata, str):
charm_metadata = dedent(charm_metadata)
if action_metadata is None:
actions_path = charm_dir / 'actions.yaml'
if actions_path.is_file():
action_metadata = actions_path.read_text()
self._charm_dir = charm_dir
elif isinstance(action_metadata, str):
action_metadata = dedent(action_metadata)
return charm.CharmMeta.from_yaml(charm_metadata, action_metadata)
def disable_hooks(self) -> None:
"""Stop emitting hook events when the model changes.
This can be used by developers to stop changes to the model from emitting events that
the charm will react to. Call :meth:`.enable_hooks`
to re-enable them.
"""
self._hooks_enabled = False
def enable_hooks(self) -> None:
"""Re-enable hook events from charm.on when the model is changed.
By default hook events are enabled once you call :meth:`.begin`,
but if you have used :meth:`.disable_hooks`, this can be used to
enable them again.
"""
self._hooks_enabled = True
def _next_relation_id(self):
rel_id = self._relation_id_counter
self._relation_id_counter += 1
return rel_id
def add_relation(self, relation_name: str, remote_app: str) -> int:
"""Declare that there is a new relation between this app and `remote_app`.
Args:
relation_name: The relation on Charm that is being related to
remote_app: The name of the application that is being related to
Return:
The relation_id created by this add_relation.
"""
rel_id = self._next_relation_id()
self._backend._relation_ids_map.setdefault(relation_name, []).append(rel_id)
self._backend._relation_names[rel_id] = relation_name
self._backend._relation_list_map[rel_id] = []
self._backend._relation_data[rel_id] = {
remote_app: {},
self._backend.unit_name: {},
self._backend.app_name: {},
}
# Reload the relation_ids list
if self._model is not None:
self._model.relations._invalidate(relation_name)
if self._charm is None or not self._hooks_enabled:
return rel_id
relation = self._model.get_relation(relation_name, rel_id)
app = self._model.get_app(remote_app)
self._charm.on[relation_name].relation_created.emit(
relation, app)
return rel_id
def add_relation_unit(self, relation_id: int, remote_unit_name: str) -> None:
"""Add a new unit to a relation.
Example::
rel_id = harness.add_relation('db', 'postgresql')
harness.add_relation_unit(rel_id, 'postgresql/0')
This will trigger a `relation_joined` event and a `relation_changed` event.
Args:
relation_id: The integer relation identifier (as returned by add_relation).
remote_unit_name: A string representing the remote unit that is being added.
Return:
None
"""
self._backend._relation_list_map[relation_id].append(remote_unit_name)
self._backend._relation_data[relation_id][remote_unit_name] = {}
relation_name = self._backend._relation_names[relation_id]
# Make sure that the Model reloads the relation_list for this relation_id, as well as
# reloading the relation data for this unit.
if self._model is not None:
self._model.relations._invalidate(relation_name)
remote_unit = self._model.get_unit(remote_unit_name)
relation = self._model.get_relation(relation_name, relation_id)
relation.data[remote_unit]._invalidate()
if self._charm is None or not self._hooks_enabled:
return
self._charm.on[relation_name].relation_joined.emit(
relation, remote_unit.app, remote_unit)
def get_relation_data(self, relation_id: int, app_or_unit: str) -> typing.Mapping:
"""Get the relation data bucket for a single app or unit in a given relation.
This ignores all of the safety checks of who can and can't see data in relations (eg,
non-leaders can't read their own application's relation data because there are no events
that keep that data up-to-date for the unit).
Args:
relation_id: The relation whose content we want to look at.
app_or_unit: The name of the application or unit whose data we want to read
Return:
a dict containing the relation data for `app_or_unit` or None.
Raises:
KeyError: if relation_id doesn't exist
"""
return self._backend._relation_data[relation_id].get(app_or_unit, None)
def get_workload_version(self) -> str:
"""Read the workload version that was set by the unit."""
return self._backend._workload_version
def update_relation_data(
self,
relation_id: int,
app_or_unit: str,
key_values: typing.Mapping,
) -> None:
"""Update the relation data for a given unit or application in a given relation.
This also triggers the `relation_changed` event for this relation_id.
Args:
relation_id: The integer relation_id representing this relation.
app_or_unit: The unit or application name that is being updated.
This can be the local or remote application.
key_values: Each key/value will be updated in the relation data.
"""
relation_name = self._backend._relation_names[relation_id]
relation = self._model.get_relation(relation_name, relation_id)
if '/' in app_or_unit:
entity = self._model.get_unit(app_or_unit)
else:
entity = self._model.get_app(app_or_unit)
rel_data = relation.data.get(entity, None)
if rel_data is not None:
# rel_data may have cached now-stale data, so _invalidate() it.
# Note, this won't cause the data to be loaded if it wasn't already.
rel_data._invalidate()
new_values = self._backend._relation_data[relation_id][app_or_unit].copy()
for k, v in key_values.items():
if v == '':
new_values.pop(k, None)
else:
new_values[k] = v
self._backend._relation_data[relation_id][app_or_unit] = new_values
if app_or_unit == self._model.unit.name:
# No events for our own unit
return
if app_or_unit == self._model.app.name:
# updating our own app only generates an event if it is a peer relation and we
# aren't the leader
is_peer = self._meta.relations[relation_name].role == 'peers'
if not is_peer:
return
if self._model.unit.is_leader():
return
self._emit_relation_changed(relation_id, app_or_unit)
def _emit_relation_changed(self, relation_id, app_or_unit):
if self._charm is None or not self._hooks_enabled:
return
rel_name = self._backend._relation_names[relation_id]
relation = self.model.get_relation(rel_name, relation_id)
if '/' in app_or_unit:
app_name = app_or_unit.split('/')[0]
unit_name = app_or_unit
app = self.model.get_app(app_name)
unit = self.model.get_unit(unit_name)
args = (relation, app, unit)
else:
app_name = app_or_unit
app = self.model.get_app(app_name)
args = (relation, app)
self._charm.on[rel_name].relation_changed.emit(*args)
def update_config(
self,
key_values: typing.Mapping[str, str] = None,
unset: typing.Iterable[str] = (),
) -> None:
"""Update the config as seen by the charm.
This will trigger a `config_changed` event.
Args:
key_values: A Mapping of key:value pairs to update in config.
unset: An iterable of keys to remove from Config. (Note that this does
not currently reset the config values to the default defined in config.yaml.)
"""
config = self._backend._config
if key_values is not None:
for key, value in key_values.items():
config[key] = value
for key in unset:
config.pop(key, None)
# NOTE: jam 2020-03-01 Note that this sort of works "by accident". Config
# is a LazyMapping, but its _load returns a dict and this method mutates
# the dict that Config is caching. Arguably we should be doing some sort
# of charm.framework.model.config._invalidate()
if self._charm is None or not self._hooks_enabled:
return
self._charm.on.config_changed.emit()
def set_leader(self, is_leader: bool = True) -> None:
"""Set whether this unit is the leader or not.
If this charm becomes a leader then `leader_elected` will be triggered.
Args:
is_leader: True/False as to whether this unit is the leader.
"""
was_leader = self._backend._is_leader
self._backend._is_leader = is_leader
# Note: jam 2020-03-01 currently is_leader is cached at the ModelBackend level, not in
# the Model objects, so this automatically gets noticed.
if is_leader and not was_leader and self._charm is not None and self._hooks_enabled:
self._charm.on.leader_elected.emit()
class _TestingModelBackend:
"""This conforms to the interface for ModelBackend but provides canned data.
DO NOT use this class directly, it is used by `Harness`_ to drive the model.
`Harness`_ is responsible for maintaining the internal consistency of the values here,
as the only public methods of this type are for implementing ModelBackend.
"""
def __init__(self, unit_name, meta):
self.unit_name = unit_name
self.app_name = self.unit_name.split('/')[0]
self._calls = []
self._meta = meta
self._is_leader = None
self._relation_ids_map = {} # relation name to [relation_ids,...]
self._relation_names = {} # reverse map from relation_id to relation_name
self._relation_list_map = {} # relation_id: [unit_name,...]
self._relation_data = {} # {relation_id: {name: data}}
self._config = {}
self._is_leader = False
self._resources_map = {}
self._pod_spec = None
self._app_status = None
self._unit_status = None
self._workload_version = None
def relation_ids(self, relation_name):
try:
return self._relation_ids_map[relation_name]
except KeyError as e:
if relation_name not in self._meta.relations:
raise model.ModelError('{} is not a known relation'.format(relation_name)) from e
return []
def relation_list(self, relation_id):
try:
return self._relation_list_map[relation_id]
except KeyError as e:
raise model.RelationNotFoundError from e
def relation_get(self, relation_id, member_name, is_app):
if is_app and '/' in member_name:
member_name = member_name.split('/')[0]
if relation_id not in self._relation_data:
raise model.RelationNotFoundError()
return self._relation_data[relation_id][member_name].copy()
def relation_set(self, relation_id, key, value, is_app):
relation = self._relation_data[relation_id]
if is_app:
bucket_key = self.app_name
else:
bucket_key = self.unit_name
if bucket_key not in relation:
relation[bucket_key] = {}
bucket = relation[bucket_key]
if value == '':
bucket.pop(key, None)
else:
bucket[key] = value
def config_get(self):
return self._config
def is_leader(self):
return self._is_leader
def application_version_set(self, version):
self._workload_version = version
def resource_get(self, resource_name):
return self._resources_map[resource_name]
def pod_spec_set(self, spec, k8s_resources):
self._pod_spec = (spec, k8s_resources)
def status_get(self, *, is_app=False):
if is_app:
return self._app_status
else:
return self._unit_status
def status_set(self, status, message='', *, is_app=False):
if is_app:
self._app_status = (status, message)
else:
self._unit_status = (status, message)
def storage_list(self, name):
raise NotImplementedError(self.storage_list)
def storage_get(self, storage_name_id, attribute):
raise NotImplementedError(self.storage_get)
def storage_add(self, name, count=1):
raise NotImplementedError(self.storage_add)
def action_get(self):
raise NotImplementedError(self.action_get)
def action_set(self, results):
raise NotImplementedError(self.action_set)
def action_log(self, message):
raise NotImplementedError(self.action_log)
def action_fail(self, message=''):
raise NotImplementedError(self.action_fail)
def network_get(self, endpoint_name, relation_id=None):
raise NotImplementedError(self.network_get)
name: squid
summary: Describe your charm here
maintainers:
- Dominik Fleischmann <dominik.fleischmann@canonical.com>
description: |
A nice long description of what the product does and any
high level information about how the charm provides the product,
on which platform, etc.
tags:
- misc
series:
- kubernetes
storage:
docker:
type: filesystem
location: /srv/docker/squid
spool:
type: filesystem
location: /var/spool/squid
# -- example relations, delete if unneeded
#requires:
# db:
# interface: mysql
#provides:
# website:
# interface: http
[flake8]
max-line-length = 99
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment