2 # Copyright 2021 Canonical Ltd.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: legal@canonical.com
19 # To get in touch with the maintainers, please contact:
20 # osm-charmers@lists.launchpad.net
23 # pylint: disable=E0213
26 from typing
import NoReturn
29 from ops
.framework
import EventBase
30 from ops
.main
import main
31 from opslib
.osm
.charm
import CharmedOsmBase
, RelationsMissing
32 from opslib
.osm
.interfaces
.kafka
import KafkaCluster
, KafkaServer
33 from opslib
.osm
.interfaces
.zookeeper
import ZookeeperClient
34 from opslib
.osm
.pod
import ContainerV3Builder
, PodSpecV3Builder
35 from opslib
.osm
.validator
import ModelValidator
, validator
37 logger
= logging
.getLogger(__name__
)
40 KAFKA_RESERVED_BROKER_MAX_ID
= "999999999"
43 class ConfigModel(ModelValidator
):
45 image_pull_policy
: str
47 @validator("image_pull_policy")
48 def validate_image_pull_policy(cls
, v
):
51 "ifnotpresent": "IfNotPresent",
55 if v
not in values
.keys():
56 raise ValueError("value must be always, ifnotpresent or never")
60 class KafkaCharm(CharmedOsmBase
):
63 def __init__(self
, *args
) -> NoReturn
:
64 """Kafka Charm constructor."""
65 super().__init
__(*args
, oci_image
="image")
66 self
.kafka_cluster
= KafkaCluster(self
, "cluster")
67 self
.kafka_server
= KafkaServer(self
, "kafka")
68 self
.zookeeper_client
= ZookeeperClient(self
, "zookeeper")
69 event_observer_mapping
= {
70 self
.on
["cluster"].relation_changed
: self
.configure_pod
,
71 self
.on
["kafka"].relation_joined
: self
._publish
_info
,
72 self
.on
["zookeeper"].relation_changed
: self
.configure_pod
,
73 self
.on
["zookeeper"].relation_broken
: self
.configure_pod
,
75 for event
, observer
in event_observer_mapping
.items():
76 self
.framework
.observe(event
, observer
)
80 return self
.kafka_cluster
.num_units
82 def _publish_info(self
, event
: EventBase
):
83 """Publishes Kafka information.
86 event (EventBase): Kafka relation event.
88 if self
.unit
.is_leader():
89 self
.kafka_server
.publish_info(self
.app
.name
, KAFKA_PORT
)
91 def _check_missing_dependencies(self
):
92 if self
.zookeeper_client
.is_missing_data_in_app():
93 raise RelationsMissing(["zookeeper"])
95 def build_pod_spec(self
, image_info
):
97 config
= ConfigModel(**dict(self
.config
))
100 self
._check
_missing
_dependencies
()
102 # Create Builder for the PodSpec
103 pod_spec_builder
= PodSpecV3Builder()
106 container_builder
= ContainerV3Builder(
107 self
.app
.name
, image_info
, config
.image_pull_policy
110 container_builder
.add_port(name
="kafka", port
=KAFKA_PORT
)
111 container_builder
.add_tcpsocket_readiness_probe(
113 initial_delay_seconds
=10,
117 container_builder
.add_tcpsocket_liveness_probe(
119 initial_delay_seconds
=60,
123 container_builder
.add_envs(
125 "ENABLE_AUTO_EXTEND": "true",
126 "KAFKA_ADVERTISED_HOST_NAME": self
.app
.name
,
127 "KAFKA_ADVERTISED_PORT": KAFKA_PORT
,
128 "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "true",
129 "KAFKA_RESERVED_BROKER_MAX_ID": KAFKA_RESERVED_BROKER_MAX_ID
,
132 container_builder
.add_command(
138 "exec kafka-server-start.sh /opt/kafka/config/server.properties",
139 "--override broker.id=${HOSTNAME##*-}",
140 f
"--override listeners=PLAINTEXT://:{KAFKA_PORT}",
141 f
"--override zookeeper.connect={self.zookeeper_client.zookeeper_uri}",
142 "--override log.dir=/var/lib/kafka",
143 "--override auto.create.topics.enable=true",
144 "--override auto.leader.rebalance.enable=true",
145 "--override background.threads=10",
146 "--override compression.type=producer",
147 "--override delete.topic.enable=false",
148 "--override leader.imbalance.check.interval.seconds=300",
149 "--override leader.imbalance.per.broker.percentage=10",
150 "--override log.flush.interval.messages=9223372036854775807",
151 "--override log.flush.offset.checkpoint.interval.ms=60000",
152 "--override log.flush.scheduler.interval.ms=9223372036854775807",
153 "--override log.retention.bytes=-1",
154 "--override log.retention.hours=168",
155 "--override log.roll.hours=168",
156 "--override log.roll.jitter.hours=0",
157 "--override log.segment.bytes=1073741824",
158 "--override log.segment.delete.delay.ms=60000",
159 "--override message.max.bytes=1000012",
160 "--override min.insync.replicas=1",
161 "--override num.io.threads=8",
162 f
"--override num.network.threads={self.num_units}",
163 "--override num.recovery.threads.per.data.dir=1",
164 "--override num.replica.fetchers=1",
165 "--override offset.metadata.max.bytes=4096",
166 "--override offsets.commit.required.acks=-1",
167 "--override offsets.commit.timeout.ms=5000",
168 "--override offsets.load.buffer.size=5242880",
169 "--override offsets.retention.check.interval.ms=600000",
170 "--override offsets.retention.minutes=1440",
171 "--override offsets.topic.compression.codec=0",
172 "--override offsets.topic.num.partitions=50",
173 f
"--override offsets.topic.replication.factor={self.num_units}",
174 "--override offsets.topic.segment.bytes=104857600",
175 "--override queued.max.requests=500",
176 "--override quota.consumer.default=9223372036854775807",
177 "--override quota.producer.default=9223372036854775807",
178 "--override replica.fetch.min.bytes=1",
179 "--override replica.fetch.wait.max.ms=500",
180 "--override replica.high.watermark.checkpoint.interval.ms=5000",
181 "--override replica.lag.time.max.ms=10000",
182 "--override replica.socket.receive.buffer.bytes=65536",
183 "--override replica.socket.timeout.ms=30000",
184 "--override request.timeout.ms=30000",
185 "--override socket.receive.buffer.bytes=102400",
186 "--override socket.request.max.bytes=104857600",
187 "--override socket.send.buffer.bytes=102400",
188 "--override unclean.leader.election.enable=true",
189 "--override zookeeper.session.timeout.ms=6000",
190 "--override zookeeper.set.acl=false",
191 "--override broker.id.generation.enable=true",
192 "--override connections.max.idle.ms=600000",
193 "--override controlled.shutdown.enable=true",
194 "--override controlled.shutdown.max.retries=3",
195 "--override controlled.shutdown.retry.backoff.ms=5000",
196 "--override controller.socket.timeout.ms=30000",
197 "--override default.replication.factor=1",
198 "--override fetch.purgatory.purge.interval.requests=1000",
199 "--override group.max.session.timeout.ms=300000",
200 "--override group.min.session.timeout.ms=6000",
201 "--override log.cleaner.backoff.ms=15000",
202 "--override log.cleaner.dedupe.buffer.size=134217728",
203 "--override log.cleaner.delete.retention.ms=86400000",
204 "--override log.cleaner.enable=true",
205 "--override log.cleaner.io.buffer.load.factor=0.9",
206 "--override log.cleaner.io.buffer.size=524288",
207 "--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308",
208 "--override log.cleaner.min.cleanable.ratio=0.5",
209 "--override log.cleaner.min.compaction.lag.ms=0",
210 "--override log.cleaner.threads=1",
211 "--override log.cleanup.policy=delete",
212 "--override log.index.interval.bytes=4096",
213 "--override log.index.size.max.bytes=10485760",
214 "--override log.message.timestamp.difference.max.ms=9223372036854775807",
215 "--override log.message.timestamp.type=CreateTime",
216 "--override log.preallocate=false",
217 "--override log.retention.check.interval.ms=300000",
218 "--override max.connections.per.ip=2147483647",
219 f
"--override num.partitions={config.num_partitions}",
220 "--override producer.purgatory.purge.interval.requests=1000",
221 "--override replica.fetch.backoff.ms=1000",
222 "--override replica.fetch.max.bytes=1048576",
223 "--override replica.fetch.response.max.bytes=10485760",
224 "--override reserved.broker.max.id=1000",
230 container
= container_builder
.build()
232 # Add container to pod spec
233 pod_spec_builder
.add_container(container
)
235 return pod_spec_builder
.build()
238 if __name__
== "__main__":