2 # Copyright 2021 Canonical Ltd.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: legal@canonical.com
19 # To get in touch with the maintainers, please contact:
20 # osm-charmers@lists.launchpad.net
23 # pylint: disable=E0213
26 from typing
import NoReturn
29 from ops
.framework
import EventBase
30 from ops
.main
import main
31 from opslib
.osm
.charm
import CharmedOsmBase
, RelationsMissing
32 from opslib
.osm
.interfaces
.kafka
import KafkaCluster
, KafkaServer
33 from opslib
.osm
.interfaces
.zookeeper
import ZookeeperClient
34 from opslib
.osm
.pod
import ContainerV3Builder
, PodSpecV3Builder
35 from opslib
.osm
.validator
import ModelValidator
, validator
37 logger
= logging
.getLogger(__name__
)
40 KAFKA_RESERVED_BROKER_MAX_ID
= "999999999"
43 class ConfigModel(ModelValidator
):
45 image_pull_policy
: str
46 security_context
: bool
48 @validator("image_pull_policy")
49 def validate_image_pull_policy(cls
, v
):
52 "ifnotpresent": "IfNotPresent",
56 if v
not in values
.keys():
57 raise ValueError("value must be always, ifnotpresent or never")
61 class KafkaCharm(CharmedOsmBase
):
64 def __init__(self
, *args
) -> NoReturn
:
65 """Kafka Charm constructor."""
66 super().__init
__(*args
, oci_image
="image")
67 self
.kafka_cluster
= KafkaCluster(self
, "cluster")
68 self
.kafka_server
= KafkaServer(self
, "kafka")
69 self
.zookeeper_client
= ZookeeperClient(self
, "zookeeper")
70 event_observer_mapping
= {
71 self
.on
["cluster"].relation_changed
: self
.configure_pod
,
72 self
.on
["kafka"].relation_joined
: self
._publish
_info
,
73 self
.on
["zookeeper"].relation_changed
: self
.configure_pod
,
74 self
.on
["zookeeper"].relation_broken
: self
.configure_pod
,
76 for event
, observer
in event_observer_mapping
.items():
77 self
.framework
.observe(event
, observer
)
81 return self
.kafka_cluster
.num_units
83 def _publish_info(self
, event
: EventBase
):
84 """Publishes Kafka information.
87 event (EventBase): Kafka relation event.
89 if self
.unit
.is_leader():
90 self
.kafka_server
.publish_info(self
.app
.name
, KAFKA_PORT
)
92 def _check_missing_dependencies(self
):
93 if self
.zookeeper_client
.is_missing_data_in_app():
94 raise RelationsMissing(["zookeeper"])
96 def build_pod_spec(self
, image_info
):
98 config
= ConfigModel(**dict(self
.config
))
101 self
._check
_missing
_dependencies
()
103 # Create Builder for the PodSpec
104 pod_spec_builder
= PodSpecV3Builder(
105 enable_security_context
=config
.security_context
109 container_builder
= ContainerV3Builder(
112 config
.image_pull_policy
,
113 run_as_non_root
=config
.security_context
,
116 container_builder
.add_port(name
="kafka", port
=KAFKA_PORT
)
117 container_builder
.add_tcpsocket_readiness_probe(
119 initial_delay_seconds
=10,
123 container_builder
.add_tcpsocket_liveness_probe(
125 initial_delay_seconds
=60,
129 container_builder
.add_envs(
131 "ENABLE_AUTO_EXTEND": "true",
132 "KAFKA_ADVERTISED_HOST_NAME": self
.app
.name
,
133 "KAFKA_ADVERTISED_PORT": KAFKA_PORT
,
134 "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "true",
135 "KAFKA_RESERVED_BROKER_MAX_ID": KAFKA_RESERVED_BROKER_MAX_ID
,
138 container_builder
.add_command(
144 "exec kafka-server-start.sh /opt/kafka/config/server.properties",
145 "--override broker.id=${HOSTNAME##*-}",
146 f
"--override listeners=PLAINTEXT://:{KAFKA_PORT}",
147 f
"--override zookeeper.connect={self.zookeeper_client.zookeeper_uri}",
148 "--override log.dir=/var/lib/kafka",
149 "--override auto.create.topics.enable=true",
150 "--override auto.leader.rebalance.enable=true",
151 "--override background.threads=10",
152 "--override compression.type=producer",
153 "--override delete.topic.enable=false",
154 "--override leader.imbalance.check.interval.seconds=300",
155 "--override leader.imbalance.per.broker.percentage=10",
156 "--override log.flush.interval.messages=9223372036854775807",
157 "--override log.flush.offset.checkpoint.interval.ms=60000",
158 "--override log.flush.scheduler.interval.ms=9223372036854775807",
159 "--override log.retention.bytes=-1",
160 "--override log.retention.hours=168",
161 "--override log.roll.hours=168",
162 "--override log.roll.jitter.hours=0",
163 "--override log.segment.bytes=1073741824",
164 "--override log.segment.delete.delay.ms=60000",
165 "--override message.max.bytes=1000012",
166 "--override min.insync.replicas=1",
167 "--override num.io.threads=8",
168 f
"--override num.network.threads={self.num_units}",
169 "--override num.recovery.threads.per.data.dir=1",
170 "--override num.replica.fetchers=1",
171 "--override offset.metadata.max.bytes=4096",
172 "--override offsets.commit.required.acks=-1",
173 "--override offsets.commit.timeout.ms=5000",
174 "--override offsets.load.buffer.size=5242880",
175 "--override offsets.retention.check.interval.ms=600000",
176 "--override offsets.retention.minutes=1440",
177 "--override offsets.topic.compression.codec=0",
178 "--override offsets.topic.num.partitions=50",
179 f
"--override offsets.topic.replication.factor={self.num_units}",
180 "--override offsets.topic.segment.bytes=104857600",
181 "--override queued.max.requests=500",
182 "--override quota.consumer.default=9223372036854775807",
183 "--override quota.producer.default=9223372036854775807",
184 "--override replica.fetch.min.bytes=1",
185 "--override replica.fetch.wait.max.ms=500",
186 "--override replica.high.watermark.checkpoint.interval.ms=5000",
187 "--override replica.lag.time.max.ms=10000",
188 "--override replica.socket.receive.buffer.bytes=65536",
189 "--override replica.socket.timeout.ms=30000",
190 "--override request.timeout.ms=30000",
191 "--override socket.receive.buffer.bytes=102400",
192 "--override socket.request.max.bytes=104857600",
193 "--override socket.send.buffer.bytes=102400",
194 "--override unclean.leader.election.enable=true",
195 "--override zookeeper.session.timeout.ms=6000",
196 "--override zookeeper.set.acl=false",
197 "--override broker.id.generation.enable=true",
198 "--override connections.max.idle.ms=600000",
199 "--override controlled.shutdown.enable=true",
200 "--override controlled.shutdown.max.retries=3",
201 "--override controlled.shutdown.retry.backoff.ms=5000",
202 "--override controller.socket.timeout.ms=30000",
203 "--override default.replication.factor=1",
204 "--override fetch.purgatory.purge.interval.requests=1000",
205 "--override group.max.session.timeout.ms=300000",
206 "--override group.min.session.timeout.ms=6000",
207 "--override log.cleaner.backoff.ms=15000",
208 "--override log.cleaner.dedupe.buffer.size=134217728",
209 "--override log.cleaner.delete.retention.ms=86400000",
210 "--override log.cleaner.enable=true",
211 "--override log.cleaner.io.buffer.load.factor=0.9",
212 "--override log.cleaner.io.buffer.size=524288",
213 "--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308",
214 "--override log.cleaner.min.cleanable.ratio=0.5",
215 "--override log.cleaner.min.compaction.lag.ms=0",
216 "--override log.cleaner.threads=1",
217 "--override log.cleanup.policy=delete",
218 "--override log.index.interval.bytes=4096",
219 "--override log.index.size.max.bytes=10485760",
220 "--override log.message.timestamp.difference.max.ms=9223372036854775807",
221 "--override log.message.timestamp.type=CreateTime",
222 "--override log.preallocate=false",
223 "--override log.retention.check.interval.ms=300000",
224 "--override max.connections.per.ip=2147483647",
225 f
"--override num.partitions={config.num_partitions}",
226 "--override producer.purgatory.purge.interval.requests=1000",
227 "--override replica.fetch.backoff.ms=1000",
228 "--override replica.fetch.max.bytes=1048576",
229 "--override replica.fetch.response.max.bytes=10485760",
230 "--override reserved.broker.max.id=1000",
236 container
= container_builder
.build()
238 # Add container to pod spec
239 pod_spec_builder
.add_container(container
)
241 return pod_spec_builder
.build()
244 if __name__
== "__main__":