1 # Copyright 2022 Canonical Ltd.
2 # See LICENSE file for licensing details.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 This [library](https://juju.is/docs/sdk/libraries) implements both sides of the
20 `kafka` [interface](https://juju.is/docs/sdk/relations).
22 The *provider* side of this interface is implemented by the
23 [kafka-k8s Charmed Operator](https://charmhub.io/kafka-k8s).
25 Any Charmed Operator that *requires* Kafka for providing its
26 service should implement the *requirer* side of this interface.
28 In a nutshell using this library to implement a Charmed Operator *requiring*
32 $ charmcraft fetch-lib charms.kafka_k8s.v0.kafka
47 from charms.kafka_k8s.v0.kafka import KafkaEvents, KafkaRequires
48 from ops.charm import CharmBase
51 class MyCharm(CharmBase):
55 def __init__(self, *args):
56 super().__init__(*args)
57 self.kafka = KafkaRequires(self)
58 self.framework.observe(
59 self.on.kafka_available,
60 self._on_kafka_available,
62 self.framework.observe(
63 self.on["kafka"].relation_broken,
64 self._on_kafka_broken,
67 def _on_kafka_available(self, event):
68 # Get Kafka host and port
69 host: str = self.kafka.host
70 port: int = self.kafka.port
74 def _on_kafka_broken(self, event):
77 self.unit.status = BlockedStatus("need kafka relation")
81 [here](https://github.com/charmed-osm/kafka-k8s-operator/issues)!
84 from typing
import Optional
86 from ops
.charm
import CharmBase
, CharmEvents
87 from ops
.framework
import EventBase
, EventSource
, Object
89 # The unique Charmhub library identifier, never change it
90 from ops
.model
import Relation
92 LIBID
= "eacc8c85082347c9aae740e0220b8376"
94 # Increment this major API version when introducing breaking changes
97 # Increment this PATCH version before using `charmcraft publish-lib` or reset
98 # to 0 if you are raising the major API version
102 KAFKA_HOST_APP_KEY
= "host"
103 KAFKA_PORT_APP_KEY
= "port"
106 class _KafkaAvailableEvent(EventBase
):
107 """Event emitted when Kafka is available."""
110 class KafkaEvents(CharmEvents
):
113 This class defines the events that Kafka can emit.
116 kafka_available (_KafkaAvailableEvent)
119 kafka_available
= EventSource(_KafkaAvailableEvent
)
122 class KafkaRequires(Object
):
123 """Requires-side of the Kafka relation."""
125 def __init__(self
, charm
: CharmBase
, endpoint_name
: str = "kafka") -> None:
126 super().__init
__(charm
, endpoint_name
)
128 self
._endpoint
_name
= endpoint_name
130 # Observe relation events
131 event_observe_mapping
= {
132 charm
.on
[self
._endpoint
_name
].relation_changed
: self
._on
_relation
_changed
,
134 for event
, observer
in event_observe_mapping
.items():
135 self
.framework
.observe(event
, observer
)
137 def _on_relation_changed(self
, event
) -> None:
138 if event
.relation
.app
and all(
139 key
in event
.relation
.data
[event
.relation
.app
]
140 for key
in (KAFKA_HOST_APP_KEY
, KAFKA_PORT_APP_KEY
)
142 self
.charm
.on
.kafka_available
.emit()
145 def host(self
) -> str:
146 """Get kafka hostname."""
147 relation
: Relation
= self
.model
.get_relation(self
._endpoint
_name
)
149 relation
.data
[relation
.app
].get(KAFKA_HOST_APP_KEY
)
150 if relation
and relation
.app
155 def port(self
) -> int:
156 """Get kafka port number."""
157 relation
: Relation
= self
.model
.get_relation(self
._endpoint
_name
)
159 int(relation
.data
[relation
.app
].get(KAFKA_PORT_APP_KEY
))
160 if relation
and relation
.app
165 class KafkaProvides(Object
):
166 """Provides-side of the Kafka relation."""
168 def __init__(self
, charm
: CharmBase
, endpoint_name
: str = "kafka") -> None:
169 super().__init
__(charm
, endpoint_name
)
170 self
._endpoint
_name
= endpoint_name
172 def set_host_info(self
, host
: str, port
: int, relation
: Optional
[Relation
] = None) -> None:
173 """Set Kafka host and port.
175 This function writes in the application data of the relation, therefore,
176 only the unit leader can call it.
179 host (str): Kafka hostname or IP address.
180 port (int): Kafka port.
181 relation (Optional[Relation]): Relation to update.
182 If not specified, all relations will be updated.
185 Exception: if a non-leader unit calls this function.
187 if not self
.model
.unit
.is_leader():
188 raise Exception("only the leader set host information.")
191 self
._update
_relation
_data
(host
, port
, relation
)
194 for relation
in self
.model
.relations
[self
._endpoint
_name
]:
195 self
._update
_relation
_data
(host
, port
, relation
)
197 def _update_relation_data(self
, host
: str, port
: int, relation
: Relation
) -> None:
198 """Update data in relation if needed."""
199 relation
.data
[self
.model
.app
][KAFKA_HOST_APP_KEY
] = host
200 relation
.data
[self
.model
.app
][KAFKA_PORT_APP_KEY
] = str(port
)