+#!/usr/bin/env python3
+# Copyright 2021 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: legal@canonical.com
+#
+# To get in touch with the maintainers, please contact:
+# osm-charmers@lists.launchpad.net
+##
+
+# pylint: disable=E0213
+
+import logging
+from typing import NoReturn
+
+
+from ops.framework import EventBase
+from ops.main import main
+from opslib.osm.charm import CharmedOsmBase, RelationsMissing
+from opslib.osm.interfaces.kafka import KafkaCluster, KafkaServer
+from opslib.osm.interfaces.zookeeper import ZookeeperClient
+from opslib.osm.pod import ContainerV3Builder, PodSpecV3Builder
+from opslib.osm.validator import ModelValidator, validator
+
+logger = logging.getLogger(__name__)
+
+KAFKA_PORT = 9092
+KAFKA_RESERVED_BROKER_MAX_ID = "999999999"
+
+
+class ConfigModel(ModelValidator):
+ num_partitions: int
+ image_pull_policy: str
+
+ @validator("image_pull_policy")
+ def validate_image_pull_policy(cls, v):
+ values = {
+ "always": "Always",
+ "ifnotpresent": "IfNotPresent",
+ "never": "Never",
+ }
+ v = v.lower()
+ if v not in values.keys():
+ raise ValueError("value must be always, ifnotpresent or never")
+ return values[v]
+
+
+class KafkaCharm(CharmedOsmBase):
+ """Kafka Charm."""
+
+ def __init__(self, *args) -> NoReturn:
+ """Kafka Charm constructor."""
+ super().__init__(*args, oci_image="image")
+ self.kafka_cluster = KafkaCluster(self, "cluster")
+ self.kafka_server = KafkaServer(self, "kafka")
+ self.zookeeper_client = ZookeeperClient(self, "zookeeper")
+ event_observer_mapping = {
+ self.on["cluster"].relation_changed: self.configure_pod,
+ self.on["kafka"].relation_joined: self._publish_info,
+ self.on["zookeeper"].relation_changed: self.configure_pod,
+ self.on["zookeeper"].relation_broken: self.configure_pod,
+ }
+ for event, observer in event_observer_mapping.items():
+ self.framework.observe(event, observer)
+
+ @property
+ def num_units(self):
+ return self.kafka_cluster.num_units
+
+ def _publish_info(self, event: EventBase):
+ """Publishes Kafka information.
+
+ Args:
+ event (EventBase): Kafka relation event.
+ """
+ if self.unit.is_leader():
+ self.kafka_server.publish_info(self.app.name, KAFKA_PORT)
+
+ def _check_missing_dependencies(self):
+ if self.zookeeper_client.is_missing_data_in_app():
+ raise RelationsMissing(["zookeeper"])
+
+ def build_pod_spec(self, image_info):
+ # Validate config
+ config = ConfigModel(**dict(self.config))
+
+ # Check relations
+ self._check_missing_dependencies()
+
+ # Create Builder for the PodSpec
+ pod_spec_builder = PodSpecV3Builder()
+
+ # Build Container
+ container_builder = ContainerV3Builder(
+ self.app.name, image_info, config.image_pull_policy
+ )
+
+ container_builder.add_port(name="kafka", port=KAFKA_PORT)
+ container_builder.add_tcpsocket_readiness_probe(
+ KAFKA_PORT,
+ initial_delay_seconds=10,
+ timeout_seconds=5,
+ period_seconds=5,
+ )
+ container_builder.add_tcpsocket_liveness_probe(
+ KAFKA_PORT,
+ initial_delay_seconds=60,
+ timeout_seconds=10,
+ period_seconds=5,
+ )
+ container_builder.add_envs(
+ {
+ "ENABLE_AUTO_EXTEND": "true",
+ "KAFKA_ADVERTISED_HOST_NAME": self.app.name,
+ "KAFKA_ADVERTISED_PORT": KAFKA_PORT,
+ "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "true",
+ "KAFKA_RESERVED_BROKER_MAX_ID": KAFKA_RESERVED_BROKER_MAX_ID,
+ }
+ )
+ container_builder.add_command(
+ [
+ "sh",
+ "-c",
+ " ".join(
+ [
+ "exec kafka-server-start.sh /opt/kafka/config/server.properties",
+ "--override broker.id=${HOSTNAME##*-}",
+ f"--override listeners=PLAINTEXT://:{KAFKA_PORT}",
+ f"--override zookeeper.connect={self.zookeeper_client.zookeeper_uri}",
+ "--override log.dir=/var/lib/kafka",
+ "--override auto.create.topics.enable=true",
+ "--override auto.leader.rebalance.enable=true",
+ "--override background.threads=10",
+ "--override compression.type=producer",
+ "--override delete.topic.enable=false",
+ "--override leader.imbalance.check.interval.seconds=300",
+ "--override leader.imbalance.per.broker.percentage=10",
+ "--override log.flush.interval.messages=9223372036854775807",
+ "--override log.flush.offset.checkpoint.interval.ms=60000",
+ "--override log.flush.scheduler.interval.ms=9223372036854775807",
+ "--override log.retention.bytes=-1",
+ "--override log.retention.hours=168",
+ "--override log.roll.hours=168",
+ "--override log.roll.jitter.hours=0",
+ "--override log.segment.bytes=1073741824",
+ "--override log.segment.delete.delay.ms=60000",
+ "--override message.max.bytes=1000012",
+ "--override min.insync.replicas=1",
+ "--override num.io.threads=8",
+ f"--override num.network.threads={self.num_units}",
+ "--override num.recovery.threads.per.data.dir=1",
+ "--override num.replica.fetchers=1",
+ "--override offset.metadata.max.bytes=4096",
+ "--override offsets.commit.required.acks=-1",
+ "--override offsets.commit.timeout.ms=5000",
+ "--override offsets.load.buffer.size=5242880",
+ "--override offsets.retention.check.interval.ms=600000",
+ "--override offsets.retention.minutes=1440",
+ "--override offsets.topic.compression.codec=0",
+ "--override offsets.topic.num.partitions=50",
+ f"--override offsets.topic.replication.factor={self.num_units}",
+ "--override offsets.topic.segment.bytes=104857600",
+ "--override queued.max.requests=500",
+ "--override quota.consumer.default=9223372036854775807",
+ "--override quota.producer.default=9223372036854775807",
+ "--override replica.fetch.min.bytes=1",
+ "--override replica.fetch.wait.max.ms=500",
+ "--override replica.high.watermark.checkpoint.interval.ms=5000",
+ "--override replica.lag.time.max.ms=10000",
+ "--override replica.socket.receive.buffer.bytes=65536",
+ "--override replica.socket.timeout.ms=30000",
+ "--override request.timeout.ms=30000",
+ "--override socket.receive.buffer.bytes=102400",
+ "--override socket.request.max.bytes=104857600",
+ "--override socket.send.buffer.bytes=102400",
+ "--override unclean.leader.election.enable=true",
+ "--override zookeeper.session.timeout.ms=6000",
+ "--override zookeeper.set.acl=false",
+ "--override broker.id.generation.enable=true",
+ "--override connections.max.idle.ms=600000",
+ "--override controlled.shutdown.enable=true",
+ "--override controlled.shutdown.max.retries=3",
+ "--override controlled.shutdown.retry.backoff.ms=5000",
+ "--override controller.socket.timeout.ms=30000",
+ "--override default.replication.factor=1",
+ "--override fetch.purgatory.purge.interval.requests=1000",
+ "--override group.max.session.timeout.ms=300000",
+ "--override group.min.session.timeout.ms=6000",
+ "--override log.cleaner.backoff.ms=15000",
+ "--override log.cleaner.dedupe.buffer.size=134217728",
+ "--override log.cleaner.delete.retention.ms=86400000",
+ "--override log.cleaner.enable=true",
+ "--override log.cleaner.io.buffer.load.factor=0.9",
+ "--override log.cleaner.io.buffer.size=524288",
+ "--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308",
+ "--override log.cleaner.min.cleanable.ratio=0.5",
+ "--override log.cleaner.min.compaction.lag.ms=0",
+ "--override log.cleaner.threads=1",
+ "--override log.cleanup.policy=delete",
+ "--override log.index.interval.bytes=4096",
+ "--override log.index.size.max.bytes=10485760",
+ "--override log.message.timestamp.difference.max.ms=9223372036854775807",
+ "--override log.message.timestamp.type=CreateTime",
+ "--override log.preallocate=false",
+ "--override log.retention.check.interval.ms=300000",
+ "--override max.connections.per.ip=2147483647",
+ f"--override num.partitions={config.num_partitions}",
+ "--override producer.purgatory.purge.interval.requests=1000",
+ "--override replica.fetch.backoff.ms=1000",
+ "--override replica.fetch.max.bytes=1048576",
+ "--override replica.fetch.response.max.bytes=10485760",
+ "--override reserved.broker.max.id=1000",
+ ]
+ ),
+ ]
+ )
+
+ container = container_builder.build()
+
+ # Add container to pod spec
+ pod_spec_builder.add_container(container)
+
+ return pod_spec_builder.build()
+
+
+if __name__ == "__main__":
+ main(KafkaCharm)