blob: 5be34047e5efefec61a197810f07dc60e89818e4 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: legal@canonical.com
#
# To get in touch with the maintainers, please contact:
# osm-charmers@lists.launchpad.net
##
# pylint: disable=E0213
import logging
from typing import NoReturn
from ops.framework import EventBase
from ops.main import main
from opslib.osm.charm import CharmedOsmBase, RelationsMissing
from opslib.osm.interfaces.kafka import KafkaCluster, KafkaServer
from opslib.osm.interfaces.zookeeper import ZookeeperClient
from opslib.osm.pod import ContainerV3Builder, PodSpecV3Builder
from opslib.osm.validator import ModelValidator, validator
logger = logging.getLogger(__name__)
KAFKA_PORT = 9092
KAFKA_RESERVED_BROKER_MAX_ID = "999999999"
class ConfigModel(ModelValidator):
num_partitions: int
image_pull_policy: str
security_context: bool
@validator("image_pull_policy")
def validate_image_pull_policy(cls, v):
values = {
"always": "Always",
"ifnotpresent": "IfNotPresent",
"never": "Never",
}
v = v.lower()
if v not in values.keys():
raise ValueError("value must be always, ifnotpresent or never")
return values[v]
class KafkaCharm(CharmedOsmBase):
"""Kafka Charm."""
def __init__(self, *args) -> NoReturn:
"""Kafka Charm constructor."""
super().__init__(*args, oci_image="image")
self.kafka_cluster = KafkaCluster(self, "cluster")
self.kafka_server = KafkaServer(self, "kafka")
self.zookeeper_client = ZookeeperClient(self, "zookeeper")
event_observer_mapping = {
self.on["cluster"].relation_changed: self.configure_pod,
self.on["kafka"].relation_joined: self._publish_info,
self.on["zookeeper"].relation_changed: self.configure_pod,
self.on["zookeeper"].relation_broken: self.configure_pod,
}
for event, observer in event_observer_mapping.items():
self.framework.observe(event, observer)
@property
def num_units(self):
return self.kafka_cluster.num_units
def _publish_info(self, event: EventBase):
"""Publishes Kafka information.
Args:
event (EventBase): Kafka relation event.
"""
if self.unit.is_leader():
self.kafka_server.publish_info(self.app.name, KAFKA_PORT)
def _check_missing_dependencies(self):
if self.zookeeper_client.is_missing_data_in_app():
raise RelationsMissing(["zookeeper"])
def build_pod_spec(self, image_info):
# Validate config
config = ConfigModel(**dict(self.config))
# Check relations
self._check_missing_dependencies()
# Create Builder for the PodSpec
pod_spec_builder = PodSpecV3Builder(
enable_security_context=config.security_context
)
# Build Container
container_builder = ContainerV3Builder(
self.app.name,
image_info,
config.image_pull_policy,
run_as_non_root=config.security_context,
)
container_builder.add_port(name="kafka", port=KAFKA_PORT)
container_builder.add_tcpsocket_readiness_probe(
KAFKA_PORT,
initial_delay_seconds=10,
timeout_seconds=5,
period_seconds=5,
)
container_builder.add_tcpsocket_liveness_probe(
KAFKA_PORT,
initial_delay_seconds=60,
timeout_seconds=10,
period_seconds=5,
)
container_builder.add_envs(
{
"ENABLE_AUTO_EXTEND": "true",
"KAFKA_ADVERTISED_HOST_NAME": self.app.name,
"KAFKA_ADVERTISED_PORT": KAFKA_PORT,
"KAFKA_AUTO_CREATE_TOPICS_ENABLE": "true",
"KAFKA_RESERVED_BROKER_MAX_ID": KAFKA_RESERVED_BROKER_MAX_ID,
}
)
container_builder.add_command(
[
"sh",
"-c",
" ".join(
[
"exec kafka-server-start.sh /opt/kafka/config/server.properties",
"--override broker.id=${HOSTNAME##*-}",
f"--override listeners=PLAINTEXT://:{KAFKA_PORT}",
f"--override zookeeper.connect={self.zookeeper_client.zookeeper_uri}",
"--override log.dir=/var/lib/kafka",
"--override auto.create.topics.enable=true",
"--override auto.leader.rebalance.enable=true",
"--override background.threads=10",
"--override compression.type=producer",
"--override delete.topic.enable=false",
"--override leader.imbalance.check.interval.seconds=300",
"--override leader.imbalance.per.broker.percentage=10",
"--override log.flush.interval.messages=9223372036854775807",
"--override log.flush.offset.checkpoint.interval.ms=60000",
"--override log.flush.scheduler.interval.ms=9223372036854775807",
"--override log.retention.bytes=-1",
"--override log.retention.hours=168",
"--override log.roll.hours=168",
"--override log.roll.jitter.hours=0",
"--override log.segment.bytes=1073741824",
"--override log.segment.delete.delay.ms=60000",
"--override message.max.bytes=1000012",
"--override min.insync.replicas=1",
"--override num.io.threads=8",
f"--override num.network.threads={self.num_units}",
"--override num.recovery.threads.per.data.dir=1",
"--override num.replica.fetchers=1",
"--override offset.metadata.max.bytes=4096",
"--override offsets.commit.required.acks=-1",
"--override offsets.commit.timeout.ms=5000",
"--override offsets.load.buffer.size=5242880",
"--override offsets.retention.check.interval.ms=600000",
"--override offsets.retention.minutes=1440",
"--override offsets.topic.compression.codec=0",
"--override offsets.topic.num.partitions=50",
f"--override offsets.topic.replication.factor={self.num_units}",
"--override offsets.topic.segment.bytes=104857600",
"--override queued.max.requests=500",
"--override quota.consumer.default=9223372036854775807",
"--override quota.producer.default=9223372036854775807",
"--override replica.fetch.min.bytes=1",
"--override replica.fetch.wait.max.ms=500",
"--override replica.high.watermark.checkpoint.interval.ms=5000",
"--override replica.lag.time.max.ms=10000",
"--override replica.socket.receive.buffer.bytes=65536",
"--override replica.socket.timeout.ms=30000",
"--override request.timeout.ms=30000",
"--override socket.receive.buffer.bytes=102400",
"--override socket.request.max.bytes=104857600",
"--override socket.send.buffer.bytes=102400",
"--override unclean.leader.election.enable=true",
"--override zookeeper.session.timeout.ms=6000",
"--override zookeeper.set.acl=false",
"--override broker.id.generation.enable=true",
"--override connections.max.idle.ms=600000",
"--override controlled.shutdown.enable=true",
"--override controlled.shutdown.max.retries=3",
"--override controlled.shutdown.retry.backoff.ms=5000",
"--override controller.socket.timeout.ms=30000",
"--override default.replication.factor=1",
"--override fetch.purgatory.purge.interval.requests=1000",
"--override group.max.session.timeout.ms=300000",
"--override group.min.session.timeout.ms=6000",
"--override log.cleaner.backoff.ms=15000",
"--override log.cleaner.dedupe.buffer.size=134217728",
"--override log.cleaner.delete.retention.ms=86400000",
"--override log.cleaner.enable=true",
"--override log.cleaner.io.buffer.load.factor=0.9",
"--override log.cleaner.io.buffer.size=524288",
"--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308",
"--override log.cleaner.min.cleanable.ratio=0.5",
"--override log.cleaner.min.compaction.lag.ms=0",
"--override log.cleaner.threads=1",
"--override log.cleanup.policy=delete",
"--override log.index.interval.bytes=4096",
"--override log.index.size.max.bytes=10485760",
"--override log.message.timestamp.difference.max.ms=9223372036854775807",
"--override log.message.timestamp.type=CreateTime",
"--override log.preallocate=false",
"--override log.retention.check.interval.ms=300000",
"--override max.connections.per.ip=2147483647",
f"--override num.partitions={config.num_partitions}",
"--override producer.purgatory.purge.interval.requests=1000",
"--override replica.fetch.backoff.ms=1000",
"--override replica.fetch.max.bytes=1048576",
"--override replica.fetch.response.max.bytes=10485760",
"--override reserved.broker.max.id=1000",
]
),
]
)
container = container_builder.build()
# Add container to pod spec
pod_spec_builder.add_container(container)
return pod_spec_builder.build()
if __name__ == "__main__":
main(KafkaCharm)