1 # Copyright 2022 Canonical Ltd.
2 # See LICENSE file for licensing details.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 This [library](https://juju.is/docs/sdk/libraries) implements both sides of the
20 `kafka` [interface](https://juju.is/docs/sdk/relations).
22 The *provider* side of this interface is implemented by the
23 [kafka-k8s Charmed Operator](https://charmhub.io/kafka-k8s).
25 Any Charmed Operator that *requires* Kafka for providing its
26 service should implement the *requirer* side of this interface.
28 In a nutshell using this library to implement a Charmed Operator *requiring*
32 $ charmcraft fetch-lib charms.kafka_k8s.v0.kafka
47 from charms.kafka_k8s.v0.kafka import KafkaEvents, KafkaRequires
48 from ops.charm import CharmBase
51 class MyCharm(CharmBase):
55 def __init__(self, *args):
56 super().__init__(*args)
57 self.kafka = KafkaRequires(self)
58 self.framework.observe(
59 self.on.kafka_available,
60 self._on_kafka_available,
62 self.framework.observe(
64 self._on_kafka_broken,
67 def _on_kafka_available(self, event):
68 # Get Kafka host and port
69 host: str = self.kafka.host
70 port: int = self.kafka.port
74 def _on_kafka_broken(self, event):
77 self.unit.status = BlockedStatus("need kafka relation")
81 [here](https://github.com/charmed-osm/kafka-k8s-operator/issues)!
84 from typing
import Optional
86 from ops
.charm
import CharmBase
, CharmEvents
87 from ops
.framework
import EventBase
, EventSource
, Object
89 # The unique Charmhub library identifier, never change it
90 from ops
.model
import Relation
92 LIBID
= "eacc8c85082347c9aae740e0220b8376"
94 # Increment this major API version when introducing breaking changes
97 # Increment this PATCH version before using `charmcraft publish-lib` or reset
98 # to 0 if you are raising the major API version
102 KAFKA_HOST_APP_KEY
= "host"
103 KAFKA_PORT_APP_KEY
= "port"
106 class _KafkaAvailableEvent(EventBase
):
107 """Event emitted when Kafka is available."""
110 class _KafkaBrokenEvent(EventBase
):
111 """Event emitted when Kafka relation is broken."""
114 class KafkaEvents(CharmEvents
):
117 This class defines the events that Kafka can emit.
120 kafka_available (_KafkaAvailableEvent)
123 kafka_available
= EventSource(_KafkaAvailableEvent
)
124 kafka_broken
= EventSource(_KafkaBrokenEvent
)
127 class KafkaRequires(Object
):
128 """Requires-side of the Kafka relation."""
130 def __init__(self
, charm
: CharmBase
, endpoint_name
: str = "kafka") -> None:
131 super().__init
__(charm
, endpoint_name
)
133 self
._endpoint
_name
= endpoint_name
135 # Observe relation events
136 event_observe_mapping
= {
137 charm
.on
[self
._endpoint
_name
].relation_changed
: self
._on
_relation
_changed
,
138 charm
.on
[self
._endpoint
_name
].relation_broken
: self
._on
_relation
_broken
,
140 for event
, observer
in event_observe_mapping
.items():
141 self
.framework
.observe(event
, observer
)
143 def _on_relation_changed(self
, event
) -> None:
144 if event
.relation
.app
and all(
145 key
in event
.relation
.data
[event
.relation
.app
]
146 for key
in (KAFKA_HOST_APP_KEY
, KAFKA_PORT_APP_KEY
)
148 self
.charm
.on
.kafka_available
.emit()
150 def _on_relation_broken(self
, _
) -> None:
151 self
.charm
.on
.kafka_broken
.emit()
154 def host(self
) -> str:
155 relation
: Relation
= self
.model
.get_relation(self
._endpoint
_name
)
157 relation
.data
[relation
.app
].get(KAFKA_HOST_APP_KEY
)
158 if relation
and relation
.app
163 def port(self
) -> int:
164 relation
: Relation
= self
.model
.get_relation(self
._endpoint
_name
)
166 int(relation
.data
[relation
.app
].get(KAFKA_PORT_APP_KEY
))
167 if relation
and relation
.app
172 class KafkaProvides(Object
):
173 """Provides-side of the Kafka relation."""
175 def __init__(self
, charm
: CharmBase
, endpoint_name
: str = "kafka") -> None:
176 super().__init
__(charm
, endpoint_name
)
177 self
._endpoint
_name
= endpoint_name
179 def set_host_info(self
, host
: str, port
: int, relation
: Optional
[Relation
] = None) -> None:
180 """Set Kafka host and port.
182 This function writes in the application data of the relation, therefore,
183 only the unit leader can call it.
186 host (str): Kafka hostname or IP address.
187 port (int): Kafka port.
188 relation (Optional[Relation]): Relation to update.
189 If not specified, all relations will be updated.
192 Exception: if a non-leader unit calls this function.
194 if not self
.model
.unit
.is_leader():
195 raise Exception("only the leader set host information.")
198 self
._update
_relation
_data
(host
, port
, relation
)
201 for relation
in self
.model
.relations
[self
._endpoint
_name
]:
202 self
._update
_relation
_data
(host
, port
, relation
)
204 def _update_relation_data(self
, host
: str, port
: int, relation
: Relation
) -> None:
205 """Update data in relation if needed."""
206 relation
.data
[self
.model
.app
][KAFKA_HOST_APP_KEY
] = host
207 relation
.data
[self
.model
.app
][KAFKA_PORT_APP_KEY
] = str(port
)