Add Kafka and Zookeeper charms in operator framework
[osm/devops.git] / installers / charm / kafka / src / charm.py
1 #!/usr/bin/env python3
2 # Copyright 2021 Canonical Ltd.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: legal@canonical.com
18 #
19 # To get in touch with the maintainers, please contact:
20 # osm-charmers@lists.launchpad.net
21 ##
22
23 # pylint: disable=E0213
24
25 import logging
26 from typing import NoReturn
27
28
29 from ops.framework import EventBase
30 from ops.main import main
31 from opslib.osm.charm import CharmedOsmBase, RelationsMissing
32 from opslib.osm.interfaces.kafka import KafkaCluster, KafkaServer
33 from opslib.osm.interfaces.zookeeper import ZookeeperClient
34 from opslib.osm.pod import ContainerV3Builder, PodSpecV3Builder
35 from opslib.osm.validator import ModelValidator, validator
36
37 logger = logging.getLogger(__name__)
38
39 KAFKA_PORT = 9092
40 KAFKA_RESERVED_BROKER_MAX_ID = "999999999"
41
42
43 class ConfigModel(ModelValidator):
44 num_partitions: int
45 image_pull_policy: str
46
47 @validator("image_pull_policy")
48 def validate_image_pull_policy(cls, v):
49 values = {
50 "always": "Always",
51 "ifnotpresent": "IfNotPresent",
52 "never": "Never",
53 }
54 v = v.lower()
55 if v not in values.keys():
56 raise ValueError("value must be always, ifnotpresent or never")
57 return values[v]
58
59
60 class KafkaCharm(CharmedOsmBase):
61 """Kafka Charm."""
62
63 def __init__(self, *args) -> NoReturn:
64 """Kafka Charm constructor."""
65 super().__init__(*args, oci_image="image")
66 self.kafka_cluster = KafkaCluster(self, "cluster")
67 self.kafka_server = KafkaServer(self, "kafka")
68 self.zookeeper_client = ZookeeperClient(self, "zookeeper")
69 event_observer_mapping = {
70 self.on["cluster"].relation_changed: self.configure_pod,
71 self.on["kafka"].relation_joined: self._publish_info,
72 self.on["zookeeper"].relation_changed: self.configure_pod,
73 self.on["zookeeper"].relation_broken: self.configure_pod,
74 }
75 for event, observer in event_observer_mapping.items():
76 self.framework.observe(event, observer)
77
78 @property
79 def num_units(self):
80 return self.kafka_cluster.num_units
81
82 def _publish_info(self, event: EventBase):
83 """Publishes Kafka information.
84
85 Args:
86 event (EventBase): Kafka relation event.
87 """
88 if self.unit.is_leader():
89 self.kafka_server.publish_info(self.app.name, KAFKA_PORT)
90
91 def _check_missing_dependencies(self):
92 if self.zookeeper_client.is_missing_data_in_app():
93 raise RelationsMissing(["zookeeper"])
94
95 def build_pod_spec(self, image_info):
96 # Validate config
97 config = ConfigModel(**dict(self.config))
98
99 # Check relations
100 self._check_missing_dependencies()
101
102 # Create Builder for the PodSpec
103 pod_spec_builder = PodSpecV3Builder()
104
105 # Build Container
106 container_builder = ContainerV3Builder(
107 self.app.name, image_info, config.image_pull_policy
108 )
109
110 container_builder.add_port(name="kafka", port=KAFKA_PORT)
111 container_builder.add_tcpsocket_readiness_probe(
112 KAFKA_PORT,
113 initial_delay_seconds=10,
114 timeout_seconds=5,
115 period_seconds=5,
116 )
117 container_builder.add_tcpsocket_liveness_probe(
118 KAFKA_PORT,
119 initial_delay_seconds=60,
120 timeout_seconds=10,
121 period_seconds=5,
122 )
123 container_builder.add_envs(
124 {
125 "ENABLE_AUTO_EXTEND": "true",
126 "KAFKA_ADVERTISED_HOST_NAME": self.app.name,
127 "KAFKA_ADVERTISED_PORT": KAFKA_PORT,
128 "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "true",
129 "KAFKA_RESERVED_BROKER_MAX_ID": KAFKA_RESERVED_BROKER_MAX_ID,
130 }
131 )
132 container_builder.add_command(
133 [
134 "sh",
135 "-c",
136 " ".join(
137 [
138 "exec kafka-server-start.sh /opt/kafka/config/server.properties",
139 "--override broker.id=${HOSTNAME##*-}",
140 f"--override listeners=PLAINTEXT://:{KAFKA_PORT}",
141 f"--override zookeeper.connect={self.zookeeper_client.zookeeper_uri}",
142 "--override log.dir=/var/lib/kafka",
143 "--override auto.create.topics.enable=true",
144 "--override auto.leader.rebalance.enable=true",
145 "--override background.threads=10",
146 "--override compression.type=producer",
147 "--override delete.topic.enable=false",
148 "--override leader.imbalance.check.interval.seconds=300",
149 "--override leader.imbalance.per.broker.percentage=10",
150 "--override log.flush.interval.messages=9223372036854775807",
151 "--override log.flush.offset.checkpoint.interval.ms=60000",
152 "--override log.flush.scheduler.interval.ms=9223372036854775807",
153 "--override log.retention.bytes=-1",
154 "--override log.retention.hours=168",
155 "--override log.roll.hours=168",
156 "--override log.roll.jitter.hours=0",
157 "--override log.segment.bytes=1073741824",
158 "--override log.segment.delete.delay.ms=60000",
159 "--override message.max.bytes=1000012",
160 "--override min.insync.replicas=1",
161 "--override num.io.threads=8",
162 f"--override num.network.threads={self.num_units}",
163 "--override num.recovery.threads.per.data.dir=1",
164 "--override num.replica.fetchers=1",
165 "--override offset.metadata.max.bytes=4096",
166 "--override offsets.commit.required.acks=-1",
167 "--override offsets.commit.timeout.ms=5000",
168 "--override offsets.load.buffer.size=5242880",
169 "--override offsets.retention.check.interval.ms=600000",
170 "--override offsets.retention.minutes=1440",
171 "--override offsets.topic.compression.codec=0",
172 "--override offsets.topic.num.partitions=50",
173 f"--override offsets.topic.replication.factor={self.num_units}",
174 "--override offsets.topic.segment.bytes=104857600",
175 "--override queued.max.requests=500",
176 "--override quota.consumer.default=9223372036854775807",
177 "--override quota.producer.default=9223372036854775807",
178 "--override replica.fetch.min.bytes=1",
179 "--override replica.fetch.wait.max.ms=500",
180 "--override replica.high.watermark.checkpoint.interval.ms=5000",
181 "--override replica.lag.time.max.ms=10000",
182 "--override replica.socket.receive.buffer.bytes=65536",
183 "--override replica.socket.timeout.ms=30000",
184 "--override request.timeout.ms=30000",
185 "--override socket.receive.buffer.bytes=102400",
186 "--override socket.request.max.bytes=104857600",
187 "--override socket.send.buffer.bytes=102400",
188 "--override unclean.leader.election.enable=true",
189 "--override zookeeper.session.timeout.ms=6000",
190 "--override zookeeper.set.acl=false",
191 "--override broker.id.generation.enable=true",
192 "--override connections.max.idle.ms=600000",
193 "--override controlled.shutdown.enable=true",
194 "--override controlled.shutdown.max.retries=3",
195 "--override controlled.shutdown.retry.backoff.ms=5000",
196 "--override controller.socket.timeout.ms=30000",
197 "--override default.replication.factor=1",
198 "--override fetch.purgatory.purge.interval.requests=1000",
199 "--override group.max.session.timeout.ms=300000",
200 "--override group.min.session.timeout.ms=6000",
201 "--override log.cleaner.backoff.ms=15000",
202 "--override log.cleaner.dedupe.buffer.size=134217728",
203 "--override log.cleaner.delete.retention.ms=86400000",
204 "--override log.cleaner.enable=true",
205 "--override log.cleaner.io.buffer.load.factor=0.9",
206 "--override log.cleaner.io.buffer.size=524288",
207 "--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308",
208 "--override log.cleaner.min.cleanable.ratio=0.5",
209 "--override log.cleaner.min.compaction.lag.ms=0",
210 "--override log.cleaner.threads=1",
211 "--override log.cleanup.policy=delete",
212 "--override log.index.interval.bytes=4096",
213 "--override log.index.size.max.bytes=10485760",
214 "--override log.message.timestamp.difference.max.ms=9223372036854775807",
215 "--override log.message.timestamp.type=CreateTime",
216 "--override log.preallocate=false",
217 "--override log.retention.check.interval.ms=300000",
218 "--override max.connections.per.ip=2147483647",
219 f"--override num.partitions={config.num_partitions}",
220 "--override producer.purgatory.purge.interval.requests=1000",
221 "--override replica.fetch.backoff.ms=1000",
222 "--override replica.fetch.max.bytes=1048576",
223 "--override replica.fetch.response.max.bytes=10485760",
224 "--override reserved.broker.max.id=1000",
225 ]
226 ),
227 ]
228 )
229
230 container = container_builder.build()
231
232 # Add container to pod spec
233 pod_spec_builder.add_container(container)
234
235 return pod_spec_builder.build()
236
237
238 if __name__ == "__main__":
239 main(KafkaCharm)