blob: 5c9182a5761a6af3c32756b0f62dbdde56ee9c47 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: legal@canonical.com
#
# To get in touch with the maintainers, please contact:
# osm-charmers@lists.launchpad.net
##
import logging
from pydantic import BaseModel, constr, PositiveInt
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
class ConfigData(BaseModel):
"""Configuration data model."""
openstack_default_granularity: PositiveInt
global_request_timeout: PositiveInt
log_level: constr(regex=r"^(INFO|DEBUG)$")
collector_interval: PositiveInt
evaluator_interval: PositiveInt
database_commonkey: constr(min_length=1)
vca_host: constr(min_length=1)
vca_user: constr(min_length=1)
vca_password: constr(min_length=1)
vca_cacert: str
class RelationData(BaseModel):
"""Relation data model."""
message_host: constr(min_length=1)
message_port: PositiveInt
database_uri: constr(regex=r"^(mongodb://)")
prometheus_host: constr(min_length=1)
prometheus_port: PositiveInt
def _make_pod_ports(port: int) -> List[Dict[str, Any]]:
"""Generate pod ports details.
Args:
port (int): port to expose.
Returns:
List[Dict[str, Any]]: pod port details.
"""
return [{"name": "mon", "containerPort": port, "protocol": "TCP"}]
def _make_pod_envconfig(
config: Dict[str, Any], relation_state: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate pod environment configuration.
Args:
config (Dict[str, Any]): configuration information.
relation_state (Dict[str, Any]): relation state information.
Returns:
Dict[str, Any]: pod environment configuration.
"""
envconfig = {
# General configuration
"ALLOW_ANONYMOUS_LOGIN": "yes",
"OSMMON_OPENSTACK_DEFAULT_GRANULARITY": config["openstack_default_granularity"],
"OSMMON_GLOBAL_REQUEST_TIMEOUT": config["global_request_timeout"],
"OSMMON_GLOBAL_LOGLEVEL": config["log_level"],
"OSMMON_COLLECTOR_INTERVAL": config["collector_interval"],
"OSMMON_EVALUATOR_INTERVAL": config["evaluator_interval"],
# Kafka configuration
"OSMMON_MESSAGE_DRIVER": "kafka",
"OSMMON_MESSAGE_HOST": relation_state["message_host"],
"OSMMON_MESSAGE_PORT": relation_state["message_port"],
# Database configuration
"OSMMON_DATABASE_DRIVER": "mongo",
"OSMMON_DATABASE_URI": relation_state["database_uri"],
"OSMMON_DATABASE_COMMONKEY": config["database_commonkey"],
# Prometheus configuration
"OSMMON_PROMETHEUS_URL": f"http://{relation_state['prometheus_host']}:{relation_state['prometheus_port']}",
# VCA configuration
"OSMMON_VCA_HOST": config["vca_host"],
"OSMMON_VCA_USER": config["vca_user"],
"OSMMON_VCA_SECRET": config["vca_password"],
"OSMMON_VCA_CACERT": config["vca_cacert"],
}
return envconfig
def _make_startup_probe() -> Dict[str, Any]:
"""Generate startup probe.
Returns:
Dict[str, Any]: startup probe.
"""
return {
"exec": {"command": ["/usr/bin/pgrep python3"]},
"initialDelaySeconds": 60,
"timeoutSeconds": 5,
}
def _make_readiness_probe(port: int) -> Dict[str, Any]:
"""Generate readiness probe.
Args:
port (int): [description]
Returns:
Dict[str, Any]: readiness probe.
"""
return {
"tcpSocket": {
"port": port,
},
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 3,
}
def _make_liveness_probe(port: int) -> Dict[str, Any]:
"""Generate liveness probe.
Args:
port (int): [description]
Returns:
Dict[str, Any]: liveness probe.
"""
return {
"tcpSocket": {
"port": port,
},
"initialDelaySeconds": 45,
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 3,
}
def make_pod_spec(
image_info: Dict[str, str],
config: Dict[str, Any],
relation_state: Dict[str, Any],
app_name: str = "mon",
port: int = 8000,
) -> Dict[str, Any]:
"""Generate the pod spec information.
Args:
image_info (Dict[str, str]): Object provided by
OCIImageResource("image").fetch().
config (Dict[str, Any]): Configuration information.
relation_state (Dict[str, Any]): Relation state information.
app_name (str, optional): Application name. Defaults to "mon".
port (int, optional): Port for the container. Defaults to 8000.
Returns:
Dict[str, Any]: Pod spec dictionary for the charm.
"""
if not image_info:
return None
ConfigData(**(config))
RelationData(**(relation_state))
ports = _make_pod_ports(port)
env_config = _make_pod_envconfig(config, relation_state)
return {
"version": 3,
"containers": [
{
"name": app_name,
"imageDetails": image_info,
"imagePullPolicy": "Always",
"ports": ports,
"envConfig": env_config,
}
],
"kubernetesResources": {
"ingressResources": [],
},
}