Prepare installer and pods for Rel TWELVE
[osm/devops.git] / installers / charm / kafka / src / charm.py
1 #!/usr/bin/env python3
2 # Copyright 2021 Canonical Ltd.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: legal@canonical.com
18 #
19 # To get in touch with the maintainers, please contact:
20 # osm-charmers@lists.launchpad.net
21 ##
22
23 # pylint: disable=E0213
24
25 import logging
26 from typing import NoReturn
27
28
29 from ops.framework import EventBase
30 from ops.main import main
31 from opslib.osm.charm import CharmedOsmBase, RelationsMissing
32 from opslib.osm.interfaces.kafka import KafkaCluster, KafkaServer
33 from opslib.osm.interfaces.zookeeper import ZookeeperClient
34 from opslib.osm.pod import ContainerV3Builder, PodSpecV3Builder
35 from opslib.osm.validator import ModelValidator, validator
36
37 logger = logging.getLogger(__name__)
38
39 KAFKA_PORT = 9092
40 KAFKA_RESERVED_BROKER_MAX_ID = "999999999"
41
42
43 class ConfigModel(ModelValidator):
44 num_partitions: int
45 image_pull_policy: str
46 security_context: bool
47
48 @validator("image_pull_policy")
49 def validate_image_pull_policy(cls, v):
50 values = {
51 "always": "Always",
52 "ifnotpresent": "IfNotPresent",
53 "never": "Never",
54 }
55 v = v.lower()
56 if v not in values.keys():
57 raise ValueError("value must be always, ifnotpresent or never")
58 return values[v]
59
60
61 class KafkaCharm(CharmedOsmBase):
62 """Kafka Charm."""
63
64 def __init__(self, *args) -> NoReturn:
65 """Kafka Charm constructor."""
66 super().__init__(*args, oci_image="image")
67 self.kafka_cluster = KafkaCluster(self, "cluster")
68 self.kafka_server = KafkaServer(self, "kafka")
69 self.zookeeper_client = ZookeeperClient(self, "zookeeper")
70 event_observer_mapping = {
71 self.on["cluster"].relation_changed: self.configure_pod,
72 self.on["kafka"].relation_joined: self._publish_info,
73 self.on["zookeeper"].relation_changed: self.configure_pod,
74 self.on["zookeeper"].relation_broken: self.configure_pod,
75 }
76 for event, observer in event_observer_mapping.items():
77 self.framework.observe(event, observer)
78
79 @property
80 def num_units(self):
81 return self.kafka_cluster.num_units
82
83 def _publish_info(self, event: EventBase):
84 """Publishes Kafka information.
85
86 Args:
87 event (EventBase): Kafka relation event.
88 """
89 if self.unit.is_leader():
90 self.kafka_server.publish_info(self.app.name, KAFKA_PORT)
91
92 def _check_missing_dependencies(self):
93 if self.zookeeper_client.is_missing_data_in_app():
94 raise RelationsMissing(["zookeeper"])
95
96 def build_pod_spec(self, image_info):
97 # Validate config
98 config = ConfigModel(**dict(self.config))
99
100 # Check relations
101 self._check_missing_dependencies()
102
103 # Create Builder for the PodSpec
104 pod_spec_builder = PodSpecV3Builder(
105 enable_security_context=config.security_context
106 )
107
108 # Build Container
109 container_builder = ContainerV3Builder(
110 self.app.name,
111 image_info,
112 config.image_pull_policy,
113 run_as_non_root=config.security_context,
114 )
115
116 container_builder.add_port(name="kafka", port=KAFKA_PORT)
117 container_builder.add_tcpsocket_readiness_probe(
118 KAFKA_PORT,
119 initial_delay_seconds=10,
120 timeout_seconds=5,
121 period_seconds=5,
122 )
123 container_builder.add_tcpsocket_liveness_probe(
124 KAFKA_PORT,
125 initial_delay_seconds=60,
126 timeout_seconds=10,
127 period_seconds=5,
128 )
129 container_builder.add_envs(
130 {
131 "ENABLE_AUTO_EXTEND": "true",
132 "KAFKA_ADVERTISED_HOST_NAME": self.app.name,
133 "KAFKA_ADVERTISED_PORT": KAFKA_PORT,
134 "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "true",
135 "KAFKA_RESERVED_BROKER_MAX_ID": KAFKA_RESERVED_BROKER_MAX_ID,
136 }
137 )
138 container_builder.add_command(
139 [
140 "sh",
141 "-c",
142 " ".join(
143 [
144 "exec kafka-server-start.sh /opt/kafka/config/server.properties",
145 "--override broker.id=${HOSTNAME##*-}",
146 f"--override listeners=PLAINTEXT://:{KAFKA_PORT}",
147 f"--override zookeeper.connect={self.zookeeper_client.zookeeper_uri}",
148 "--override log.dir=/var/lib/kafka",
149 "--override auto.create.topics.enable=true",
150 "--override auto.leader.rebalance.enable=true",
151 "--override background.threads=10",
152 "--override compression.type=producer",
153 "--override delete.topic.enable=false",
154 "--override leader.imbalance.check.interval.seconds=300",
155 "--override leader.imbalance.per.broker.percentage=10",
156 "--override log.flush.interval.messages=9223372036854775807",
157 "--override log.flush.offset.checkpoint.interval.ms=60000",
158 "--override log.flush.scheduler.interval.ms=9223372036854775807",
159 "--override log.retention.bytes=-1",
160 "--override log.retention.hours=168",
161 "--override log.roll.hours=168",
162 "--override log.roll.jitter.hours=0",
163 "--override log.segment.bytes=1073741824",
164 "--override log.segment.delete.delay.ms=60000",
165 "--override message.max.bytes=1000012",
166 "--override min.insync.replicas=1",
167 "--override num.io.threads=8",
168 f"--override num.network.threads={self.num_units}",
169 "--override num.recovery.threads.per.data.dir=1",
170 "--override num.replica.fetchers=1",
171 "--override offset.metadata.max.bytes=4096",
172 "--override offsets.commit.required.acks=-1",
173 "--override offsets.commit.timeout.ms=5000",
174 "--override offsets.load.buffer.size=5242880",
175 "--override offsets.retention.check.interval.ms=600000",
176 "--override offsets.retention.minutes=1440",
177 "--override offsets.topic.compression.codec=0",
178 "--override offsets.topic.num.partitions=50",
179 f"--override offsets.topic.replication.factor={self.num_units}",
180 "--override offsets.topic.segment.bytes=104857600",
181 "--override queued.max.requests=500",
182 "--override quota.consumer.default=9223372036854775807",
183 "--override quota.producer.default=9223372036854775807",
184 "--override replica.fetch.min.bytes=1",
185 "--override replica.fetch.wait.max.ms=500",
186 "--override replica.high.watermark.checkpoint.interval.ms=5000",
187 "--override replica.lag.time.max.ms=10000",
188 "--override replica.socket.receive.buffer.bytes=65536",
189 "--override replica.socket.timeout.ms=30000",
190 "--override request.timeout.ms=30000",
191 "--override socket.receive.buffer.bytes=102400",
192 "--override socket.request.max.bytes=104857600",
193 "--override socket.send.buffer.bytes=102400",
194 "--override unclean.leader.election.enable=true",
195 "--override zookeeper.session.timeout.ms=6000",
196 "--override zookeeper.set.acl=false",
197 "--override broker.id.generation.enable=true",
198 "--override connections.max.idle.ms=600000",
199 "--override controlled.shutdown.enable=true",
200 "--override controlled.shutdown.max.retries=3",
201 "--override controlled.shutdown.retry.backoff.ms=5000",
202 "--override controller.socket.timeout.ms=30000",
203 "--override default.replication.factor=1",
204 "--override fetch.purgatory.purge.interval.requests=1000",
205 "--override group.max.session.timeout.ms=300000",
206 "--override group.min.session.timeout.ms=6000",
207 "--override log.cleaner.backoff.ms=15000",
208 "--override log.cleaner.dedupe.buffer.size=134217728",
209 "--override log.cleaner.delete.retention.ms=86400000",
210 "--override log.cleaner.enable=true",
211 "--override log.cleaner.io.buffer.load.factor=0.9",
212 "--override log.cleaner.io.buffer.size=524288",
213 "--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308",
214 "--override log.cleaner.min.cleanable.ratio=0.5",
215 "--override log.cleaner.min.compaction.lag.ms=0",
216 "--override log.cleaner.threads=1",
217 "--override log.cleanup.policy=delete",
218 "--override log.index.interval.bytes=4096",
219 "--override log.index.size.max.bytes=10485760",
220 "--override log.message.timestamp.difference.max.ms=9223372036854775807",
221 "--override log.message.timestamp.type=CreateTime",
222 "--override log.preallocate=false",
223 "--override log.retention.check.interval.ms=300000",
224 "--override max.connections.per.ip=2147483647",
225 f"--override num.partitions={config.num_partitions}",
226 "--override producer.purgatory.purge.interval.requests=1000",
227 "--override replica.fetch.backoff.ms=1000",
228 "--override replica.fetch.max.bytes=1048576",
229 "--override replica.fetch.response.max.bytes=10485760",
230 "--override reserved.broker.max.id=1000",
231 ]
232 ),
233 ]
234 )
235
236 container = container_builder.build()
237
238 # Add container to pod spec
239 pod_spec_builder.add_container(container)
240
241 return pod_spec_builder.build()
242
243
244 if __name__ == "__main__":
245 main(KafkaCharm)