##
ops
-pydantic
git+https://github.com/juju-solutions/resource-oci-image/@c5778285d332edf3d9a538f9d0c06154b7ec1b0b#egg=oci-image
##
import logging
-from pydantic import ValidationError
from typing import Any, Dict, NoReturn
from ops.charm import CharmBase, CharmEvents
Args:
event (EventBase): Kafka relation event.
"""
- data_loc = event.unit if event.unit else event.app
-
- message_host = event.relation.data[data_loc].get("host")
- message_port = event.relation.data[data_loc].get("port")
+ message_host = event.relation.data[event.unit].get("host")
+ message_port = event.relation.data[event.unit].get("port")
if (
message_host
Args:
event (EventBase): DB relation event.
"""
- data_loc = event.unit if event.unit else event.app
-
- database_uri = event.relation.data[data_loc].get("connection_string")
+ database_uri = event.relation.data[event.unit].get("connection_string")
if database_uri and self.state.database_uri != database_uri:
self.state.database_uri = database_uri
Args:
event (EventBase): Keystone relation event.
"""
- data_loc = event.unit if event.unit else event.app
-
- ro_host = event.relation.data[data_loc].get("host")
- ro_port = event.relation.data[data_loc].get("port")
+ ro_host = event.relation.data[event.unit].get("host")
+ ro_port = event.relation.data[event.unit].get("port")
if (
ro_host
self.model.app.name,
self.port,
)
- except ValidationError as exc:
+ except ValueError as exc:
logger.exception("Config/Relation data validation error")
self.unit.status = BlockedStatus(str(exc))
return
##
import logging
-from pydantic import BaseModel, constr, PositiveInt, validator
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, NoReturn
logger = logging.getLogger(__name__)
-class ConfigData(BaseModel):
- """Configuration data model."""
+def _validate_data(
+ config_data: Dict[str, Any], relation_data: Dict[str, Any]
+) -> NoReturn:
+ """Validate input data.
- database_commonkey: constr(min_length=1)
- log_level: constr(regex=r"^(INFO|DEBUG)$")
- vca_host: constr(min_length=1)
- vca_port: PositiveInt
- vca_user: constr(min_length=1)
- vca_pubkey: constr(min_length=1)
- vca_password: constr(min_length=1)
- vca_cacert: str
- vca_cloud: constr(min_length=1)
- vca_k8s_cloud: constr(min_length=1)
- vca_apiproxy: Optional[constr(min_length=1)]
+ Args:
+ config_data (Dict[str, Any]): configuration data.
+ relation_data (Dict[str, Any]): relation data.
+ """
+ config_validators = {
+ "database_commonkey": lambda value, _: isinstance(value, str)
+ and len(value) > 1,
+ "log_level": lambda value, _: isinstance(value, str)
+ and value in ("INFO", "DEBUG"),
+ "vca_host": lambda value, _: isinstance(value, str) and len(value) > 1,
+ "vca_port": lambda value, _: isinstance(value, int) and value > 0,
+ "vca_user": lambda value, _: isinstance(value, str) and len(value) > 1,
+ "vca_pubkey": lambda value, _: isinstance(value, str) and len(value) > 1,
+ "vca_password": lambda value, _: isinstance(value, str) and len(value) > 1,
+ "vca_cacert": lambda value, _: isinstance(value, str),
+ "vca_cloud": lambda value, _: isinstance(value, str) and len(value) > 1,
+ "vca_k8s_cloud": lambda value, _: isinstance(value, str) and len(value) > 1,
+ "vca_apiproxy": lambda value, _: (isinstance(value, str) and len(value) > 1)
+ if value
+ else True,
+ }
+ relation_validators = {
+ "ro_host": lambda value, _: isinstance(value, str) and len(value) > 1,
+ "ro_port": lambda value, _: isinstance(value, int) and value > 0,
+ "message_host": lambda value, _: isinstance(value, str) and len(value) > 1,
+ "message_port": lambda value, _: isinstance(value, int) and value > 0,
+ "database_uri": lambda value, _: isinstance(value, str) and len(value) > 1,
+ }
+ problems = []
- @validator("vca_apiproxy", pre=True, always=True)
- def validate_vca_apiproxy(cls, value, values, **kwargs):
- if not value:
- return None
+ for key, validator in config_validators.items():
+ valid = validator(config_data.get(key), config_data)
- return value
+ if not valid:
+ problems.append(key)
+ for key, validator in relation_validators.items():
+ valid = validator(relation_data.get(key), relation_data)
-class RelationData(BaseModel):
- """Relation data model."""
+ if not valid:
+ problems.append(key)
- ro_host: constr(min_length=1)
- ro_port: PositiveInt
- message_host: constr(min_length=1)
- message_port: PositiveInt
- database_uri: constr(regex=r"^(mongodb://)")
+ if len(problems) > 0:
+ raise ValueError("Errors found in: {}".format(", ".join(problems)))
def _make_pod_ports(port: int) -> List[Dict[str, Any]]:
if not image_info:
return None
- ConfigData(**(config))
- RelationData(**(relation_state))
+ _validate_data(config, relation_state)
ports = _make_pod_ports(port)
env_config = _make_pod_envconfig(config, relation_state)
ro_relation_id = self.harness.add_relation("ro", "ro")
self.harness.add_relation_unit(ro_relation_id, "ro/0")
self.harness.update_relation_data(
- ro_relation_id, "ro", {"host": "ro", "port": 9090}
+ ro_relation_id, "ro/0", {"host": "ro", "port": 9090}
)
# Checking if kafka data is stored
self.assertDictEqual(expected_result, pod_spec)
- def test_on_kafka_relation_app_changed(self) -> NoReturn:
- """Test to see if kafka relation is updated."""
- self.harness.charm.on.start.emit()
-
- self.assertIsNone(self.harness.charm.state.message_host)
- self.assertIsNone(self.harness.charm.state.message_port)
-
- relation_id = self.harness.add_relation("kafka", "kafka")
- self.harness.add_relation_unit(relation_id, "kafka/0")
- self.harness.update_relation_data(
- relation_id, "kafka", {"host": "kafka", "port": 9092}
- )
-
- self.assertEqual(self.harness.charm.state.message_host, "kafka")
- self.assertEqual(self.harness.charm.state.message_port, 9092)
-
- # Verifying status
- self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
-
- # Verifying status message
- self.assertGreater(len(self.harness.charm.unit.status.message), 0)
- self.assertTrue(
- self.harness.charm.unit.status.message.startswith("Waiting for ")
- )
- self.assertNotIn("kafka", self.harness.charm.unit.status.message)
- self.assertIn("mongodb", self.harness.charm.unit.status.message)
- self.assertIn("ro", self.harness.charm.unit.status.message)
- self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
-
def test_on_kafka_relation_unit_changed(self) -> NoReturn:
"""Test to see if kafka relation is updated."""
self.harness.charm.on.start.emit()
self.assertIn("ro", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
- def test_on_mongodb_app_relation_changed(self) -> NoReturn:
- """Test to see if mongodb relation is updated."""
- self.harness.charm.on.start.emit()
-
- self.assertIsNone(self.harness.charm.state.database_uri)
-
- relation_id = self.harness.add_relation("mongodb", "mongodb")
- self.harness.add_relation_unit(relation_id, "mongodb/0")
- self.harness.update_relation_data(
- relation_id, "mongodb", {"connection_string": "mongodb://mongo:27017"}
- )
-
- self.assertEqual(self.harness.charm.state.database_uri, "mongodb://mongo:27017")
-
- # Verifying status
- self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
-
- # Verifying status message
- self.assertGreater(len(self.harness.charm.unit.status.message), 0)
- self.assertTrue(
- self.harness.charm.unit.status.message.startswith("Waiting for ")
- )
- self.assertIn("kafka", self.harness.charm.unit.status.message)
- self.assertNotIn("mongodb", self.harness.charm.unit.status.message)
- self.assertIn("ro", self.harness.charm.unit.status.message)
- self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
-
def test_on_mongodb_unit_relation_changed(self) -> NoReturn:
"""Test to see if mongodb relation is updated."""
self.harness.charm.on.start.emit()
self.assertIn("ro", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
- def test_on_ro_app_relation_changed(self) -> NoReturn:
- """Test to see if RO relation is updated."""
- self.harness.charm.on.start.emit()
-
- self.assertIsNone(self.harness.charm.state.ro_host)
- self.assertIsNone(self.harness.charm.state.ro_port)
-
- relation_id = self.harness.add_relation("ro", "ro")
- self.harness.add_relation_unit(relation_id, "ro/0")
- self.harness.update_relation_data(
- relation_id, "ro", {"host": "ro", "port": 9090}
- )
-
- self.assertEqual(self.harness.charm.state.ro_host, "ro")
- self.assertEqual(self.harness.charm.state.ro_port, 9090)
-
- # Verifying status
- self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
-
- # Verifying status message
- self.assertGreater(len(self.harness.charm.unit.status.message), 0)
- self.assertTrue(
- self.harness.charm.unit.status.message.startswith("Waiting for ")
- )
- self.assertIn("kafka", self.harness.charm.unit.status.message)
- self.assertIn("mongodb", self.harness.charm.unit.status.message)
- self.assertNotIn("ro", self.harness.charm.unit.status.message)
- self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
-
def test_on_ro_unit_relation_changed(self) -> NoReturn:
"""Test to see if RO relation is updated."""
self.harness.charm.on.start.emit()
# osm-charmers@lists.launchpad.net
##
-from pydantic import ValidationError
from typing import NoReturn
import unittest
app_name = "lcm"
port = 9999
- with self.assertRaises(ValidationError):
+ with self.assertRaises(ValueError):
pod_spec.make_pod_spec(image_info, config, relation_state, app_name, port)
def test_make_pod_spec_without_relation_state(self) -> NoReturn:
app_name = "lcm"
port = 9999
- with self.assertRaises(ValidationError):
+ with self.assertRaises(ValueError):
pod_spec.make_pod_spec(image_info, config, relation_state, app_name, port)
stestr
mock
ops
- pydantic
setenv =
{[testenv]setenv}
PYTHON=coverage run