# osm-charmers@lists.launchpad.net
##
-import logging
-from pydantic import (
- BaseModel,
- conint,
- constr,
- IPvAnyNetwork,
- PositiveInt,
- validator,
-)
-from typing import Any, Dict, List, Optional
+from ipaddress import ip_network
+from typing import Any, Callable, Dict, List, NoReturn
from urllib.parse import urlparse
-logger = logging.getLogger(__name__)
+def _validate_max_file_size(max_file_size: int, site_url: str) -> bool:
+ """Validate max_file_size.
-class ConfigData(BaseModel):
- """Configuration data model."""
-
- enable_test: bool
- database_commonkey: constr(min_length=1)
- log_level: constr(regex=r"^(INFO|DEBUG)$")
- auth_backend: constr(regex=r"^(internal|keystone)$")
- site_url: Optional[str]
- max_file_size: Optional[conint(ge=0)]
- ingress_whitelist_source_range: Optional[IPvAnyNetwork]
- tls_secret_name: Optional[str]
-
- @validator("max_file_size", pre=True, always=True)
- def validate_max_file_size(cls, value, values, **kwargs):
- site_url = values.get("site_url")
-
- if not site_url:
- return value
-
- parsed = urlparse(site_url)
-
- if not parsed.scheme.startswith("http"):
- return value
-
- if value is None:
- raise ValueError("max_file_size needs to be defined if site_url is defined")
-
- return value
-
- @validator("ingress_whitelist_source_range", pre=True, always=True)
- def validate_ingress_whitelist_source_range(cls, value, values, **kwargs):
- if not value:
- return None
-
- return value
-
-
-class RelationData(BaseModel):
- """Relation data model."""
-
- message_host: str
- message_port: PositiveInt
- database_uri: constr(regex=r"^(mongodb://)")
- prometheus_host: str
- prometheus_port: PositiveInt
- keystone: bool
- keystone_host: Optional[constr(min_length=1)]
- keystone_port: Optional[PositiveInt]
- keystone_user_domain_name: Optional[constr(min_length=1)]
- keystone_project_domain_name: Optional[constr(min_length=1)]
- keystone_username: Optional[constr(min_length=1)]
- keystone_password: Optional[constr(min_length=1)]
- keystone_service: Optional[constr(min_length=1)]
-
- @validator("keystone_host", pre=True, always=True)
- def validate_keystone_host(cls, value, values, **kwargs):
- keystone = values.get("keystone")
-
- if not keystone:
- return value
-
- if value is None:
- raise ValueError(
- "keystone_host needs to be defined if keystone is configured"
- )
-
- return value
+ Args:
+ max_file_size (int): maximum file size allowed.
+ site_url (str): endpoint url.
- @validator("keystone_port", pre=True, always=True)
- def validate_keystone_port(cls, value, values, **kwargs):
- keystone = values.get("keystone")
+ Returns:
+ bool: True if valid, false otherwise.
+ """
+ if not site_url:
+ return True
- if not keystone:
- return value
+ parsed = urlparse(site_url)
- if value is None:
- raise ValueError(
- "keystone_port needs to be defined if keystone is configured"
- )
+ if not parsed.scheme.startswith("http"):
+ return True
- return value
+ if max_file_size is None:
+ return False
- @validator("keystone_user_domain_name", pre=True, always=True)
- def validate_keystone_user_domain_name(cls, value, values, **kwargs):
- keystone = values.get("keystone")
+ return max_file_size >= 0
- if not keystone:
- return value
- if value is None:
- raise ValueError(
- "keystone_user_domain_name needs to be defined if keystone is configured"
- )
+def _validate_ip_network(network: str) -> bool:
+ """Validate IP network.
- return value
+ Args:
+ network (str): IP network range.
- @validator("keystone_project_domain_name", pre=True, always=True)
- def validate_keystone_project_domain_name(cls, value, values, **kwargs):
- keystone = values.get("keystone")
+ Returns:
+ bool: True if valid, false otherwise.
+ """
+ if not network:
+ return True
- if not keystone:
- return value
+ try:
+ ip_network(network)
+ except ValueError:
+ return False
- if value is None:
- raise ValueError(
- "keystone_project_domain_name needs to be defined if keystone is configured"
- )
+ return True
- return value
- @validator("keystone_username", pre=True, always=True)
- def validate_keystone_username(cls, value, values, **kwargs):
- keystone = values.get("keystone")
+def _validate_keystone_config(keystone: bool, value: Any, validator: Callable) -> bool:
+ """Validate keystone configurations.
- if not keystone:
- return value
+ Args:
+ keystone (bool): is keystone enabled, true if so, false otherwise.
+ value (Any): value to be validated.
+ validator (Callable): function to validate configuration.
- if value is None:
- raise ValueError(
- "keystone_username needs to be defined if keystone is configured"
- )
+ Returns:
+ bool: true if valid, false otherwise.
+ """
+ if not keystone:
+ return True
- return value
+ return validator(value)
- @validator("keystone_password", pre=True, always=True)
- def validate_keystone_password(cls, value, values, **kwargs):
- keystone = values.get("keystone")
- if not keystone:
- return value
+def _validate_data(
+ config_data: Dict[str, Any], relation_data: Dict[str, Any], keystone: bool
+) -> NoReturn:
+ """Validate input data.
- if value is None:
- raise ValueError(
- "keystone_password needs to be defined if keystone is configured"
- )
+ Args:
+ config_data (Dict[str, Any]): configuration data.
+ relation_data (Dict[str, Any]): relation data.
+ keystone (bool): is keystone to be used.
+ """
+ config_validators = {
+ "enable_test": lambda value, _: isinstance(value, bool),
+ "database_commonkey": lambda value, _: isinstance(value, str)
+ and len(value) > 1,
+ "log_level": lambda value, _: isinstance(value, str)
+ and (value == "INFO" or value == "DEBUG"),
+ "auth_backend": lambda value, _: isinstance(value, str)
+ and (value == "internal" or value == "keystone"),
+ "site_url": lambda value, _: isinstance(value, str)
+ if value is not None
+ else True,
+ "max_file_size": lambda value, values: _validate_max_file_size(
+ value, values.get("site_url")
+ ),
+ "ingress_whitelist_source_range": lambda value, _: _validate_ip_network(value),
+ "tls_secret_name": lambda value, _: isinstance(value, str)
+ if value is not None
+ else True,
+ }
+ relation_validators = {
+ "message_host": lambda value, _: isinstance(value, str),
+ "message_port": lambda value, _: isinstance(value, int) and value > 0,
+ "database_uri": lambda value, _: isinstance(value, str)
+ and value.startswith("mongodb://"),
+ "prometheus_host": lambda value, _: isinstance(value, str),
+ "prometheus_port": lambda value, _: isinstance(value, int) and value > 0,
+ "keystone_host": lambda value, _: _validate_keystone_config(
+ keystone, value, lambda x: isinstance(x, str) and len(x) > 0
+ ),
+ "keystone_port": lambda value, _: _validate_keystone_config(
+ keystone, value, lambda x: isinstance(x, int) and x > 0
+ ),
+ "keystone_user_domain_name": lambda value, _: _validate_keystone_config(
+ keystone, value, lambda x: isinstance(x, str) and len(x) > 0
+ ),
+ "keystone_project_domain_name": lambda value, _: _validate_keystone_config(
+ keystone, value, lambda x: isinstance(x, str) and len(x) > 0
+ ),
+ "keystone_username": lambda value, _: _validate_keystone_config(
+ keystone, value, lambda x: isinstance(x, str) and len(x) > 0
+ ),
+ "keystone_password": lambda value, _: _validate_keystone_config(
+ keystone, value, lambda x: isinstance(x, str) and len(x) > 0
+ ),
+ "keystone_service": lambda value, _: _validate_keystone_config(
+ keystone, value, lambda x: isinstance(x, str) and len(x) > 0
+ ),
+ }
+ problems = []
- return value
+ for key, validator in config_validators.items():
+ valid = validator(config_data.get(key), config_data)
- @validator("keystone_service", pre=True, always=True)
- def validate_keystone_service(cls, value, values, **kwargs):
- keystone = values.get("keystone")
+ if not valid:
+ problems.append(key)
- if not keystone:
- return value
+ for key, validator in relation_validators.items():
+ valid = validator(relation_data.get(key), relation_data)
- if value is None:
- raise ValueError(
- "keystone_service needs to be defined if keystone is configured"
- )
+ if not valid:
+ problems.append(key)
- return value
+ if len(problems) > 0:
+ raise ValueError("Errors found in: {}".format(", ".join(problems)))
def _make_pod_ports(port: int) -> List[Dict[str, Any]]:
if not image_info:
return None
- ConfigData(**(config))
- RelationData(
- **(relation_state),
- keystone=True if config.get("auth_backend") == "keystone" else False,
- )
+ _validate_data(config, relation_state, config.get("auth_backend") == "keystone")
ports = _make_pod_ports(port)
env_config = _make_pod_envconfig(config, relation_state)