CharmHub and new kafka and zookeeper charms
[osm/devops.git] / installers / charm / pla / lib / charms / kafka_k8s / v0 / kafka.py
1 # Copyright 2022 Canonical Ltd.
2 # See LICENSE file for licensing details.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 """Kafka library.
18
19 This [library](https://juju.is/docs/sdk/libraries) implements both sides of the
20 `kafka` [interface](https://juju.is/docs/sdk/relations).
21
22 The *provider* side of this interface is implemented by the
23 [kafka-k8s Charmed Operator](https://charmhub.io/kafka-k8s).
24
25 Any Charmed Operator that *requires* Kafka for providing its
26 service should implement the *requirer* side of this interface.
27
28 In a nutshell using this library to implement a Charmed Operator *requiring*
29 Kafka would look like
30
31 ```
32 $ charmcraft fetch-lib charms.kafka_k8s.v0.kafka
33 ```
34
35 `metadata.yaml`:
36
37 ```
38 requires:
39 kafka:
40 interface: kafka
41 limit: 1
42 ```
43
44 `src/charm.py`:
45
46 ```
47 from charms.kafka_k8s.v0.kafka import KafkaEvents, KafkaRequires
48 from ops.charm import CharmBase
49
50
51 class MyCharm(CharmBase):
52
53 on = KafkaEvents()
54
55 def __init__(self, *args):
56 super().__init__(*args)
57 self.kafka = KafkaRequires(self)
58 self.framework.observe(
59 self.on.kafka_available,
60 self._on_kafka_available,
61 )
62 self.framework.observe(
63 self.on.kafka_broken,
64 self._on_kafka_broken,
65 )
66
67 def _on_kafka_available(self, event):
68 # Get Kafka host and port
69 host: str = self.kafka.host
70 port: int = self.kafka.port
71 # host => "kafka-k8s"
72 # port => 9092
73
74 def _on_kafka_broken(self, event):
75 # Stop service
76 # ...
77 self.unit.status = BlockedStatus("need kafka relation")
78 ```
79
80 You can file bugs
81 [here](https://github.com/charmed-osm/kafka-k8s-operator/issues)!
82 """
83
84 from typing import Optional
85
86 from ops.charm import CharmBase, CharmEvents
87 from ops.framework import EventBase, EventSource, Object
88
89 # The unique Charmhub library identifier, never change it
90 from ops.model import Relation
91
92 LIBID = "eacc8c85082347c9aae740e0220b8376"
93
94 # Increment this major API version when introducing breaking changes
95 LIBAPI = 0
96
97 # Increment this PATCH version before using `charmcraft publish-lib` or reset
98 # to 0 if you are raising the major API version
99 LIBPATCH = 3
100
101
102 KAFKA_HOST_APP_KEY = "host"
103 KAFKA_PORT_APP_KEY = "port"
104
105
106 class _KafkaAvailableEvent(EventBase):
107 """Event emitted when Kafka is available."""
108
109
110 class _KafkaBrokenEvent(EventBase):
111 """Event emitted when Kafka relation is broken."""
112
113
114 class KafkaEvents(CharmEvents):
115 """Kafka events.
116
117 This class defines the events that Kafka can emit.
118
119 Events:
120 kafka_available (_KafkaAvailableEvent)
121 """
122
123 kafka_available = EventSource(_KafkaAvailableEvent)
124 kafka_broken = EventSource(_KafkaBrokenEvent)
125
126
127 class KafkaRequires(Object):
128 """Requires-side of the Kafka relation."""
129
130 def __init__(self, charm: CharmBase, endpoint_name: str = "kafka") -> None:
131 super().__init__(charm, endpoint_name)
132 self.charm = charm
133 self._endpoint_name = endpoint_name
134
135 # Observe relation events
136 event_observe_mapping = {
137 charm.on[self._endpoint_name].relation_changed: self._on_relation_changed,
138 charm.on[self._endpoint_name].relation_broken: self._on_relation_broken,
139 }
140 for event, observer in event_observe_mapping.items():
141 self.framework.observe(event, observer)
142
143 def _on_relation_changed(self, event) -> None:
144 if event.relation.app and all(
145 key in event.relation.data[event.relation.app]
146 for key in (KAFKA_HOST_APP_KEY, KAFKA_PORT_APP_KEY)
147 ):
148 self.charm.on.kafka_available.emit()
149
150 def _on_relation_broken(self, _) -> None:
151 self.charm.on.kafka_broken.emit()
152
153 @property
154 def host(self) -> str:
155 relation: Relation = self.model.get_relation(self._endpoint_name)
156 return (
157 relation.data[relation.app].get(KAFKA_HOST_APP_KEY)
158 if relation and relation.app
159 else None
160 )
161
162 @property
163 def port(self) -> int:
164 relation: Relation = self.model.get_relation(self._endpoint_name)
165 return (
166 int(relation.data[relation.app].get(KAFKA_PORT_APP_KEY))
167 if relation and relation.app
168 else None
169 )
170
171
172 class KafkaProvides(Object):
173 """Provides-side of the Kafka relation."""
174
175 def __init__(self, charm: CharmBase, endpoint_name: str = "kafka") -> None:
176 super().__init__(charm, endpoint_name)
177 self._endpoint_name = endpoint_name
178
179 def set_host_info(self, host: str, port: int, relation: Optional[Relation] = None) -> None:
180 """Set Kafka host and port.
181
182 This function writes in the application data of the relation, therefore,
183 only the unit leader can call it.
184
185 Args:
186 host (str): Kafka hostname or IP address.
187 port (int): Kafka port.
188 relation (Optional[Relation]): Relation to update.
189 If not specified, all relations will be updated.
190
191 Raises:
192 Exception: if a non-leader unit calls this function.
193 """
194 if not self.model.unit.is_leader():
195 raise Exception("only the leader set host information.")
196
197 if relation:
198 self._update_relation_data(host, port, relation)
199 return
200
201 for relation in self.model.relations[self._endpoint_name]:
202 self._update_relation_data(host, port, relation)
203
204 def _update_relation_data(self, host: str, port: int, relation: Relation) -> None:
205 """Update data in relation if needed."""
206 relation.data[self.model.app][KAFKA_HOST_APP_KEY] = host
207 relation.data[self.model.app][KAFKA_PORT_APP_KEY] = str(port)