| #!/usr/bin/env python3 |
| # Copyright 2021 Canonical Ltd. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact: legal@canonical.com |
| # |
| # To get in touch with the maintainers, please contact: |
| # osm-charmers@lists.launchpad.net |
| ## |
| |
| # pylint: disable=E0213 |
| |
| import logging |
| from typing import NoReturn |
| |
| |
| from ops.framework import EventBase |
| from ops.main import main |
| from opslib.osm.charm import CharmedOsmBase, RelationsMissing |
| from opslib.osm.interfaces.kafka import KafkaCluster, KafkaServer |
| from opslib.osm.interfaces.zookeeper import ZookeeperClient |
| from opslib.osm.pod import ContainerV3Builder, PodSpecV3Builder |
| from opslib.osm.validator import ModelValidator, validator |
| |
| logger = logging.getLogger(__name__) |
| |
| KAFKA_PORT = 9092 |
| KAFKA_RESERVED_BROKER_MAX_ID = "999999999" |
| |
| |
| class ConfigModel(ModelValidator): |
| num_partitions: int |
| image_pull_policy: str |
| security_context: bool |
| |
| @validator("image_pull_policy") |
| def validate_image_pull_policy(cls, v): |
| values = { |
| "always": "Always", |
| "ifnotpresent": "IfNotPresent", |
| "never": "Never", |
| } |
| v = v.lower() |
| if v not in values.keys(): |
| raise ValueError("value must be always, ifnotpresent or never") |
| return values[v] |
| |
| |
| class KafkaCharm(CharmedOsmBase): |
| """Kafka Charm.""" |
| |
| def __init__(self, *args) -> NoReturn: |
| """Kafka Charm constructor.""" |
| super().__init__(*args, oci_image="image") |
| self.kafka_cluster = KafkaCluster(self, "cluster") |
| self.kafka_server = KafkaServer(self, "kafka") |
| self.zookeeper_client = ZookeeperClient(self, "zookeeper") |
| event_observer_mapping = { |
| self.on["cluster"].relation_changed: self.configure_pod, |
| self.on["kafka"].relation_joined: self._publish_info, |
| self.on["zookeeper"].relation_changed: self.configure_pod, |
| self.on["zookeeper"].relation_broken: self.configure_pod, |
| } |
| for event, observer in event_observer_mapping.items(): |
| self.framework.observe(event, observer) |
| |
| @property |
| def num_units(self): |
| return self.kafka_cluster.num_units |
| |
| def _publish_info(self, event: EventBase): |
| """Publishes Kafka information. |
| |
| Args: |
| event (EventBase): Kafka relation event. |
| """ |
| if self.unit.is_leader(): |
| self.kafka_server.publish_info(self.app.name, KAFKA_PORT) |
| |
| def _check_missing_dependencies(self): |
| if self.zookeeper_client.is_missing_data_in_app(): |
| raise RelationsMissing(["zookeeper"]) |
| |
| def build_pod_spec(self, image_info): |
| # Validate config |
| config = ConfigModel(**dict(self.config)) |
| |
| # Check relations |
| self._check_missing_dependencies() |
| |
| # Create Builder for the PodSpec |
| pod_spec_builder = PodSpecV3Builder( |
| enable_security_context=config.security_context |
| ) |
| |
| # Build Container |
| container_builder = ContainerV3Builder( |
| self.app.name, |
| image_info, |
| config.image_pull_policy, |
| run_as_non_root=config.security_context, |
| ) |
| |
| container_builder.add_port(name="kafka", port=KAFKA_PORT) |
| container_builder.add_tcpsocket_readiness_probe( |
| KAFKA_PORT, |
| initial_delay_seconds=10, |
| timeout_seconds=5, |
| period_seconds=5, |
| ) |
| container_builder.add_tcpsocket_liveness_probe( |
| KAFKA_PORT, |
| initial_delay_seconds=60, |
| timeout_seconds=10, |
| period_seconds=5, |
| ) |
| container_builder.add_envs( |
| { |
| "ENABLE_AUTO_EXTEND": "true", |
| "KAFKA_ADVERTISED_HOST_NAME": self.app.name, |
| "KAFKA_ADVERTISED_PORT": KAFKA_PORT, |
| "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "true", |
| "KAFKA_RESERVED_BROKER_MAX_ID": KAFKA_RESERVED_BROKER_MAX_ID, |
| } |
| ) |
| container_builder.add_command( |
| [ |
| "sh", |
| "-c", |
| " ".join( |
| [ |
| "exec kafka-server-start.sh /opt/kafka/config/server.properties", |
| "--override broker.id=${HOSTNAME##*-}", |
| f"--override listeners=PLAINTEXT://:{KAFKA_PORT}", |
| f"--override zookeeper.connect={self.zookeeper_client.zookeeper_uri}", |
| "--override log.dir=/var/lib/kafka", |
| "--override auto.create.topics.enable=true", |
| "--override auto.leader.rebalance.enable=true", |
| "--override background.threads=10", |
| "--override compression.type=producer", |
| "--override delete.topic.enable=false", |
| "--override leader.imbalance.check.interval.seconds=300", |
| "--override leader.imbalance.per.broker.percentage=10", |
| "--override log.flush.interval.messages=9223372036854775807", |
| "--override log.flush.offset.checkpoint.interval.ms=60000", |
| "--override log.flush.scheduler.interval.ms=9223372036854775807", |
| "--override log.retention.bytes=-1", |
| "--override log.retention.hours=168", |
| "--override log.roll.hours=168", |
| "--override log.roll.jitter.hours=0", |
| "--override log.segment.bytes=1073741824", |
| "--override log.segment.delete.delay.ms=60000", |
| "--override message.max.bytes=1000012", |
| "--override min.insync.replicas=1", |
| "--override num.io.threads=8", |
| f"--override num.network.threads={self.num_units}", |
| "--override num.recovery.threads.per.data.dir=1", |
| "--override num.replica.fetchers=1", |
| "--override offset.metadata.max.bytes=4096", |
| "--override offsets.commit.required.acks=-1", |
| "--override offsets.commit.timeout.ms=5000", |
| "--override offsets.load.buffer.size=5242880", |
| "--override offsets.retention.check.interval.ms=600000", |
| "--override offsets.retention.minutes=1440", |
| "--override offsets.topic.compression.codec=0", |
| "--override offsets.topic.num.partitions=50", |
| f"--override offsets.topic.replication.factor={self.num_units}", |
| "--override offsets.topic.segment.bytes=104857600", |
| "--override queued.max.requests=500", |
| "--override quota.consumer.default=9223372036854775807", |
| "--override quota.producer.default=9223372036854775807", |
| "--override replica.fetch.min.bytes=1", |
| "--override replica.fetch.wait.max.ms=500", |
| "--override replica.high.watermark.checkpoint.interval.ms=5000", |
| "--override replica.lag.time.max.ms=10000", |
| "--override replica.socket.receive.buffer.bytes=65536", |
| "--override replica.socket.timeout.ms=30000", |
| "--override request.timeout.ms=30000", |
| "--override socket.receive.buffer.bytes=102400", |
| "--override socket.request.max.bytes=104857600", |
| "--override socket.send.buffer.bytes=102400", |
| "--override unclean.leader.election.enable=true", |
| "--override zookeeper.session.timeout.ms=6000", |
| "--override zookeeper.set.acl=false", |
| "--override broker.id.generation.enable=true", |
| "--override connections.max.idle.ms=600000", |
| "--override controlled.shutdown.enable=true", |
| "--override controlled.shutdown.max.retries=3", |
| "--override controlled.shutdown.retry.backoff.ms=5000", |
| "--override controller.socket.timeout.ms=30000", |
| "--override default.replication.factor=1", |
| "--override fetch.purgatory.purge.interval.requests=1000", |
| "--override group.max.session.timeout.ms=300000", |
| "--override group.min.session.timeout.ms=6000", |
| "--override log.cleaner.backoff.ms=15000", |
| "--override log.cleaner.dedupe.buffer.size=134217728", |
| "--override log.cleaner.delete.retention.ms=86400000", |
| "--override log.cleaner.enable=true", |
| "--override log.cleaner.io.buffer.load.factor=0.9", |
| "--override log.cleaner.io.buffer.size=524288", |
| "--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308", |
| "--override log.cleaner.min.cleanable.ratio=0.5", |
| "--override log.cleaner.min.compaction.lag.ms=0", |
| "--override log.cleaner.threads=1", |
| "--override log.cleanup.policy=delete", |
| "--override log.index.interval.bytes=4096", |
| "--override log.index.size.max.bytes=10485760", |
| "--override log.message.timestamp.difference.max.ms=9223372036854775807", |
| "--override log.message.timestamp.type=CreateTime", |
| "--override log.preallocate=false", |
| "--override log.retention.check.interval.ms=300000", |
| "--override max.connections.per.ip=2147483647", |
| f"--override num.partitions={config.num_partitions}", |
| "--override producer.purgatory.purge.interval.requests=1000", |
| "--override replica.fetch.backoff.ms=1000", |
| "--override replica.fetch.max.bytes=1048576", |
| "--override replica.fetch.response.max.bytes=10485760", |
| "--override reserved.broker.max.id=1000", |
| ] |
| ), |
| ] |
| ) |
| |
| container = container_builder.build() |
| |
| # Add container to pod spec |
| pod_spec_builder.add_container(container) |
| |
| return pod_spec_builder.build() |
| |
| |
| if __name__ == "__main__": |
| main(KafkaCharm) |