+ @validator("mongodb_uri")
+ def validate_mongodb_uri(cls, v):
+ if v and not v.startswith("mongodb://"):
+ raise ValueError("mongodb_uri is not properly formed")
+ return v
+
+ @validator("image_pull_policy")
+ def validate_image_pull_policy(cls, v):
+ values = {
+ "always": "Always",
+ "ifnotpresent": "IfNotPresent",
+ "never": "Never",
+ }
+ v = v.lower()
+ if v not in values.keys():
+ raise ValueError("value must be always, ifnotpresent or never")
+ return values[v]
+
+ @staticmethod
+ def _validate_tcpsocket_probe(probe_str) -> dict:
+ valid_attributes = (
+ "initial_delay_seconds",
+ "timeout_seconds",
+ "period_seconds",
+ "success_threshold",
+ "failure_threshold",
+ )
+ if probe_str:
+ probe_dict = json.loads(probe_str)
+ if all(attribute in valid_attributes for attribute in probe_dict):
+ return probe_dict
+ raise ValueError(
+ "One or more attributes are not accepted by the tcpsocket probe configuration"
+ )
+ return {}
+
+ @validator("tcpsocket_readiness_probe")
+ def validate_tcpsocket_readiness_probe(cls, v):
+ try:
+ return ConfigModel._validate_tcpsocket_probe(v)
+ except Exception as e:
+ raise ValueError(f"tcpsocket_readiness_probe configuration error: {e}")
+
+ @validator("tcpsocket_liveness_probe")
+ def validate_tcpsocket_liveness_probe(cls, v):
+ try:
+ return ConfigModel._validate_tcpsocket_probe(v)
+ except Exception as e:
+ raise ValueError(f"tcpsocket_liveness_probe configuration error: {e}")
+