| David Garcia | 36c8772 | 2021-08-30 18:01:22 +0200 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
| 2 | # Copyright 2021 Canonical Ltd. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 5 | # not use this file except in compliance with the License. You may obtain |
| 6 | # a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 12 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 13 | # License for the specific language governing permissions and limitations |
| 14 | # under the License. |
| 15 | # |
| 16 | # For those usages not covered by the Apache License, Version 2.0 please |
| 17 | # contact: legal@canonical.com |
| 18 | # |
| 19 | # To get in touch with the maintainers, please contact: |
| 20 | # osm-charmers@lists.launchpad.net |
| 21 | ## |
| 22 | |
| 23 | # pylint: disable=E0213 |
| 24 | |
| 25 | import logging |
| 26 | from typing import NoReturn |
| 27 | |
| 28 | |
| 29 | from ops.framework import EventBase |
| 30 | from ops.main import main |
| 31 | from opslib.osm.charm import CharmedOsmBase, RelationsMissing |
| 32 | from opslib.osm.interfaces.kafka import KafkaCluster, KafkaServer |
| 33 | from opslib.osm.interfaces.zookeeper import ZookeeperClient |
| 34 | from opslib.osm.pod import ContainerV3Builder, PodSpecV3Builder |
| 35 | from opslib.osm.validator import ModelValidator, validator |
| 36 | |
| 37 | logger = logging.getLogger(__name__) |
| 38 | |
| 39 | KAFKA_PORT = 9092 |
| 40 | KAFKA_RESERVED_BROKER_MAX_ID = "999999999" |
| 41 | |
| 42 | |
| 43 | class ConfigModel(ModelValidator): |
| 44 | num_partitions: int |
| 45 | image_pull_policy: str |
| sousaedu | 540d937 | 2021-09-29 01:53:30 +0100 | [diff] [blame] | 46 | security_context: bool |
| David Garcia | 36c8772 | 2021-08-30 18:01:22 +0200 | [diff] [blame] | 47 | |
| 48 | @validator("image_pull_policy") |
| 49 | def validate_image_pull_policy(cls, v): |
| 50 | values = { |
| 51 | "always": "Always", |
| 52 | "ifnotpresent": "IfNotPresent", |
| 53 | "never": "Never", |
| 54 | } |
| 55 | v = v.lower() |
| 56 | if v not in values.keys(): |
| 57 | raise ValueError("value must be always, ifnotpresent or never") |
| 58 | return values[v] |
| 59 | |
| 60 | |
| 61 | class KafkaCharm(CharmedOsmBase): |
| 62 | """Kafka Charm.""" |
| 63 | |
| 64 | def __init__(self, *args) -> NoReturn: |
| 65 | """Kafka Charm constructor.""" |
| 66 | super().__init__(*args, oci_image="image") |
| 67 | self.kafka_cluster = KafkaCluster(self, "cluster") |
| 68 | self.kafka_server = KafkaServer(self, "kafka") |
| 69 | self.zookeeper_client = ZookeeperClient(self, "zookeeper") |
| 70 | event_observer_mapping = { |
| 71 | self.on["cluster"].relation_changed: self.configure_pod, |
| 72 | self.on["kafka"].relation_joined: self._publish_info, |
| 73 | self.on["zookeeper"].relation_changed: self.configure_pod, |
| 74 | self.on["zookeeper"].relation_broken: self.configure_pod, |
| 75 | } |
| 76 | for event, observer in event_observer_mapping.items(): |
| 77 | self.framework.observe(event, observer) |
| 78 | |
| 79 | @property |
| 80 | def num_units(self): |
| 81 | return self.kafka_cluster.num_units |
| 82 | |
| 83 | def _publish_info(self, event: EventBase): |
| 84 | """Publishes Kafka information. |
| 85 | |
| 86 | Args: |
| 87 | event (EventBase): Kafka relation event. |
| 88 | """ |
| 89 | if self.unit.is_leader(): |
| 90 | self.kafka_server.publish_info(self.app.name, KAFKA_PORT) |
| 91 | |
| 92 | def _check_missing_dependencies(self): |
| 93 | if self.zookeeper_client.is_missing_data_in_app(): |
| 94 | raise RelationsMissing(["zookeeper"]) |
| 95 | |
| 96 | def build_pod_spec(self, image_info): |
| 97 | # Validate config |
| 98 | config = ConfigModel(**dict(self.config)) |
| 99 | |
| 100 | # Check relations |
| 101 | self._check_missing_dependencies() |
| 102 | |
| 103 | # Create Builder for the PodSpec |
| sousaedu | 540d937 | 2021-09-29 01:53:30 +0100 | [diff] [blame] | 104 | pod_spec_builder = PodSpecV3Builder( |
| 105 | enable_security_context=config.security_context |
| 106 | ) |
| David Garcia | 36c8772 | 2021-08-30 18:01:22 +0200 | [diff] [blame] | 107 | |
| 108 | # Build Container |
| 109 | container_builder = ContainerV3Builder( |
| sousaedu | 540d937 | 2021-09-29 01:53:30 +0100 | [diff] [blame] | 110 | self.app.name, |
| 111 | image_info, |
| 112 | config.image_pull_policy, |
| 113 | run_as_non_root=config.security_context, |
| David Garcia | 36c8772 | 2021-08-30 18:01:22 +0200 | [diff] [blame] | 114 | ) |
| 115 | |
| 116 | container_builder.add_port(name="kafka", port=KAFKA_PORT) |
| 117 | container_builder.add_tcpsocket_readiness_probe( |
| 118 | KAFKA_PORT, |
| 119 | initial_delay_seconds=10, |
| 120 | timeout_seconds=5, |
| 121 | period_seconds=5, |
| 122 | ) |
| 123 | container_builder.add_tcpsocket_liveness_probe( |
| 124 | KAFKA_PORT, |
| 125 | initial_delay_seconds=60, |
| 126 | timeout_seconds=10, |
| 127 | period_seconds=5, |
| 128 | ) |
| 129 | container_builder.add_envs( |
| 130 | { |
| 131 | "ENABLE_AUTO_EXTEND": "true", |
| 132 | "KAFKA_ADVERTISED_HOST_NAME": self.app.name, |
| 133 | "KAFKA_ADVERTISED_PORT": KAFKA_PORT, |
| 134 | "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "true", |
| 135 | "KAFKA_RESERVED_BROKER_MAX_ID": KAFKA_RESERVED_BROKER_MAX_ID, |
| 136 | } |
| 137 | ) |
| 138 | container_builder.add_command( |
| 139 | [ |
| 140 | "sh", |
| 141 | "-c", |
| 142 | " ".join( |
| 143 | [ |
| 144 | "exec kafka-server-start.sh /opt/kafka/config/server.properties", |
| 145 | "--override broker.id=${HOSTNAME##*-}", |
| 146 | f"--override listeners=PLAINTEXT://:{KAFKA_PORT}", |
| 147 | f"--override zookeeper.connect={self.zookeeper_client.zookeeper_uri}", |
| 148 | "--override log.dir=/var/lib/kafka", |
| 149 | "--override auto.create.topics.enable=true", |
| 150 | "--override auto.leader.rebalance.enable=true", |
| 151 | "--override background.threads=10", |
| 152 | "--override compression.type=producer", |
| 153 | "--override delete.topic.enable=false", |
| 154 | "--override leader.imbalance.check.interval.seconds=300", |
| 155 | "--override leader.imbalance.per.broker.percentage=10", |
| 156 | "--override log.flush.interval.messages=9223372036854775807", |
| 157 | "--override log.flush.offset.checkpoint.interval.ms=60000", |
| 158 | "--override log.flush.scheduler.interval.ms=9223372036854775807", |
| 159 | "--override log.retention.bytes=-1", |
| 160 | "--override log.retention.hours=168", |
| 161 | "--override log.roll.hours=168", |
| 162 | "--override log.roll.jitter.hours=0", |
| 163 | "--override log.segment.bytes=1073741824", |
| 164 | "--override log.segment.delete.delay.ms=60000", |
| 165 | "--override message.max.bytes=1000012", |
| 166 | "--override min.insync.replicas=1", |
| 167 | "--override num.io.threads=8", |
| 168 | f"--override num.network.threads={self.num_units}", |
| 169 | "--override num.recovery.threads.per.data.dir=1", |
| 170 | "--override num.replica.fetchers=1", |
| 171 | "--override offset.metadata.max.bytes=4096", |
| 172 | "--override offsets.commit.required.acks=-1", |
| 173 | "--override offsets.commit.timeout.ms=5000", |
| 174 | "--override offsets.load.buffer.size=5242880", |
| 175 | "--override offsets.retention.check.interval.ms=600000", |
| 176 | "--override offsets.retention.minutes=1440", |
| 177 | "--override offsets.topic.compression.codec=0", |
| 178 | "--override offsets.topic.num.partitions=50", |
| 179 | f"--override offsets.topic.replication.factor={self.num_units}", |
| 180 | "--override offsets.topic.segment.bytes=104857600", |
| 181 | "--override queued.max.requests=500", |
| 182 | "--override quota.consumer.default=9223372036854775807", |
| 183 | "--override quota.producer.default=9223372036854775807", |
| 184 | "--override replica.fetch.min.bytes=1", |
| 185 | "--override replica.fetch.wait.max.ms=500", |
| 186 | "--override replica.high.watermark.checkpoint.interval.ms=5000", |
| 187 | "--override replica.lag.time.max.ms=10000", |
| 188 | "--override replica.socket.receive.buffer.bytes=65536", |
| 189 | "--override replica.socket.timeout.ms=30000", |
| 190 | "--override request.timeout.ms=30000", |
| 191 | "--override socket.receive.buffer.bytes=102400", |
| 192 | "--override socket.request.max.bytes=104857600", |
| 193 | "--override socket.send.buffer.bytes=102400", |
| 194 | "--override unclean.leader.election.enable=true", |
| 195 | "--override zookeeper.session.timeout.ms=6000", |
| 196 | "--override zookeeper.set.acl=false", |
| 197 | "--override broker.id.generation.enable=true", |
| 198 | "--override connections.max.idle.ms=600000", |
| 199 | "--override controlled.shutdown.enable=true", |
| 200 | "--override controlled.shutdown.max.retries=3", |
| 201 | "--override controlled.shutdown.retry.backoff.ms=5000", |
| 202 | "--override controller.socket.timeout.ms=30000", |
| 203 | "--override default.replication.factor=1", |
| 204 | "--override fetch.purgatory.purge.interval.requests=1000", |
| 205 | "--override group.max.session.timeout.ms=300000", |
| 206 | "--override group.min.session.timeout.ms=6000", |
| 207 | "--override log.cleaner.backoff.ms=15000", |
| 208 | "--override log.cleaner.dedupe.buffer.size=134217728", |
| 209 | "--override log.cleaner.delete.retention.ms=86400000", |
| 210 | "--override log.cleaner.enable=true", |
| 211 | "--override log.cleaner.io.buffer.load.factor=0.9", |
| 212 | "--override log.cleaner.io.buffer.size=524288", |
| 213 | "--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308", |
| 214 | "--override log.cleaner.min.cleanable.ratio=0.5", |
| 215 | "--override log.cleaner.min.compaction.lag.ms=0", |
| 216 | "--override log.cleaner.threads=1", |
| 217 | "--override log.cleanup.policy=delete", |
| 218 | "--override log.index.interval.bytes=4096", |
| 219 | "--override log.index.size.max.bytes=10485760", |
| 220 | "--override log.message.timestamp.difference.max.ms=9223372036854775807", |
| 221 | "--override log.message.timestamp.type=CreateTime", |
| 222 | "--override log.preallocate=false", |
| 223 | "--override log.retention.check.interval.ms=300000", |
| 224 | "--override max.connections.per.ip=2147483647", |
| 225 | f"--override num.partitions={config.num_partitions}", |
| 226 | "--override producer.purgatory.purge.interval.requests=1000", |
| 227 | "--override replica.fetch.backoff.ms=1000", |
| 228 | "--override replica.fetch.max.bytes=1048576", |
| 229 | "--override replica.fetch.response.max.bytes=10485760", |
| 230 | "--override reserved.broker.max.id=1000", |
| 231 | ] |
| 232 | ), |
| 233 | ] |
| 234 | ) |
| 235 | |
| 236 | container = container_builder.build() |
| 237 | |
| 238 | # Add container to pod spec |
| 239 | pod_spec_builder.add_container(container) |
| 240 | |
| 241 | return pod_spec_builder.build() |
| 242 | |
| 243 | |
| 244 | if __name__ == "__main__": |
| 245 | main(KafkaCharm) |