CharmHub and new kafka and zookeeper charms
- Charmed installer uses bundles published in CharmHub
- Use new zookeeper and kafka sidecar-charm
- Changes in the charms to integrate with the new Kafka
Change-Id: Ie59fe1c7c72774b317abe2433fafb28a11472b72
Signed-off-by: David Garcia <david.garcia@canonical.com>
diff --git a/installers/charm/pol/lib/charms/kafka_k8s/v0/kafka.py b/installers/charm/pol/lib/charms/kafka_k8s/v0/kafka.py
new file mode 100644
index 0000000..1baf9a8
--- /dev/null
+++ b/installers/charm/pol/lib/charms/kafka_k8s/v0/kafka.py
@@ -0,0 +1,207 @@
+# Copyright 2022 Canonical Ltd.
+# See LICENSE file for licensing details.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Kafka library.
+
+This [library](https://juju.is/docs/sdk/libraries) implements both sides of the
+`kafka` [interface](https://juju.is/docs/sdk/relations).
+
+The *provider* side of this interface is implemented by the
+[kafka-k8s Charmed Operator](https://charmhub.io/kafka-k8s).
+
+Any Charmed Operator that *requires* Kafka for providing its
+service should implement the *requirer* side of this interface.
+
+In a nutshell using this library to implement a Charmed Operator *requiring*
+Kafka would look like
+
+```
+$ charmcraft fetch-lib charms.kafka_k8s.v0.kafka
+```
+
+`metadata.yaml`:
+
+```
+requires:
+ kafka:
+ interface: kafka
+ limit: 1
+```
+
+`src/charm.py`:
+
+```
+from charms.kafka_k8s.v0.kafka import KafkaEvents, KafkaRequires
+from ops.charm import CharmBase
+
+
+class MyCharm(CharmBase):
+
+ on = KafkaEvents()
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.kafka = KafkaRequires(self)
+ self.framework.observe(
+ self.on.kafka_available,
+ self._on_kafka_available,
+ )
+ self.framework.observe(
+ self.on.kafka_broken,
+ self._on_kafka_broken,
+ )
+
+ def _on_kafka_available(self, event):
+ # Get Kafka host and port
+ host: str = self.kafka.host
+ port: int = self.kafka.port
+ # host => "kafka-k8s"
+ # port => 9092
+
+ def _on_kafka_broken(self, event):
+ # Stop service
+ # ...
+ self.unit.status = BlockedStatus("need kafka relation")
+```
+
+You can file bugs
+[here](https://github.com/charmed-osm/kafka-k8s-operator/issues)!
+"""
+
+from typing import Optional
+
+from ops.charm import CharmBase, CharmEvents
+from ops.framework import EventBase, EventSource, Object
+
+# The unique Charmhub library identifier, never change it
+from ops.model import Relation
+
+LIBID = "eacc8c85082347c9aae740e0220b8376"
+
+# Increment this major API version when introducing breaking changes
+LIBAPI = 0
+
+# Increment this PATCH version before using `charmcraft publish-lib` or reset
+# to 0 if you are raising the major API version
+LIBPATCH = 3
+
+
+KAFKA_HOST_APP_KEY = "host"
+KAFKA_PORT_APP_KEY = "port"
+
+
+class _KafkaAvailableEvent(EventBase):
+ """Event emitted when Kafka is available."""
+
+
+class _KafkaBrokenEvent(EventBase):
+ """Event emitted when Kafka relation is broken."""
+
+
+class KafkaEvents(CharmEvents):
+ """Kafka events.
+
+ This class defines the events that Kafka can emit.
+
+ Events:
+ kafka_available (_KafkaAvailableEvent)
+ """
+
+ kafka_available = EventSource(_KafkaAvailableEvent)
+ kafka_broken = EventSource(_KafkaBrokenEvent)
+
+
+class KafkaRequires(Object):
+ """Requires-side of the Kafka relation."""
+
+ def __init__(self, charm: CharmBase, endpoint_name: str = "kafka") -> None:
+ super().__init__(charm, endpoint_name)
+ self.charm = charm
+ self._endpoint_name = endpoint_name
+
+ # Observe relation events
+ event_observe_mapping = {
+ charm.on[self._endpoint_name].relation_changed: self._on_relation_changed,
+ charm.on[self._endpoint_name].relation_broken: self._on_relation_broken,
+ }
+ for event, observer in event_observe_mapping.items():
+ self.framework.observe(event, observer)
+
+ def _on_relation_changed(self, event) -> None:
+ if event.relation.app and all(
+ key in event.relation.data[event.relation.app]
+ for key in (KAFKA_HOST_APP_KEY, KAFKA_PORT_APP_KEY)
+ ):
+ self.charm.on.kafka_available.emit()
+
+ def _on_relation_broken(self, _) -> None:
+ self.charm.on.kafka_broken.emit()
+
+ @property
+ def host(self) -> str:
+ relation: Relation = self.model.get_relation(self._endpoint_name)
+ return (
+ relation.data[relation.app].get(KAFKA_HOST_APP_KEY)
+ if relation and relation.app
+ else None
+ )
+
+ @property
+ def port(self) -> int:
+ relation: Relation = self.model.get_relation(self._endpoint_name)
+ return (
+ int(relation.data[relation.app].get(KAFKA_PORT_APP_KEY))
+ if relation and relation.app
+ else None
+ )
+
+
+class KafkaProvides(Object):
+ """Provides-side of the Kafka relation."""
+
+ def __init__(self, charm: CharmBase, endpoint_name: str = "kafka") -> None:
+ super().__init__(charm, endpoint_name)
+ self._endpoint_name = endpoint_name
+
+ def set_host_info(self, host: str, port: int, relation: Optional[Relation] = None) -> None:
+ """Set Kafka host and port.
+
+ This function writes in the application data of the relation, therefore,
+ only the unit leader can call it.
+
+ Args:
+ host (str): Kafka hostname or IP address.
+ port (int): Kafka port.
+ relation (Optional[Relation]): Relation to update.
+ If not specified, all relations will be updated.
+
+ Raises:
+ Exception: if a non-leader unit calls this function.
+ """
+ if not self.model.unit.is_leader():
+ raise Exception("only the leader set host information.")
+
+ if relation:
+ self._update_relation_data(host, port, relation)
+ return
+
+ for relation in self.model.relations[self._endpoint_name]:
+ self._update_relation_data(host, port, relation)
+
+ def _update_relation_data(self, host: str, port: int, relation: Relation) -> None:
+ """Update data in relation if needed."""
+ relation.data[self.model.app][KAFKA_HOST_APP_KEY] = host
+ relation.data[self.model.app][KAFKA_PORT_APP_KEY] = str(port)
diff --git a/installers/charm/pol/src/charm.py b/installers/charm/pol/src/charm.py
index e2fcdb3..7b92b45 100755
--- a/installers/charm/pol/src/charm.py
+++ b/installers/charm/pol/src/charm.py
@@ -27,9 +27,9 @@
import re
from typing import NoReturn, Optional
+from charms.kafka_k8s.v0.kafka import KafkaEvents, KafkaRequires
from ops.main import main
from opslib.osm.charm import CharmedOsmBase, RelationsMissing
-from opslib.osm.interfaces.kafka import KafkaClient
from opslib.osm.interfaces.mongo import MongoClient
from opslib.osm.interfaces.mysql import MysqlClient
from opslib.osm.pod import (
@@ -87,6 +87,9 @@
class PolCharm(CharmedOsmBase):
+
+ on = KafkaEvents()
+
def __init__(self, *args) -> NoReturn:
super().__init__(
*args,
@@ -107,9 +110,9 @@
},
},
)
- self.kafka_client = KafkaClient(self, "kafka")
- self.framework.observe(self.on["kafka"].relation_changed, self.configure_pod)
- self.framework.observe(self.on["kafka"].relation_broken, self.configure_pod)
+ self.kafka = KafkaRequires(self)
+ self.framework.observe(self.on.kafka_available, self.configure_pod)
+ self.framework.observe(self.on.kafka_broken, self.configure_pod)
self.mongodb_client = MongoClient(self, "mongodb")
self.framework.observe(self.on["mongodb"].relation_changed, self.configure_pod)
@@ -122,10 +125,7 @@
def _check_missing_dependencies(self, config: ConfigModel):
missing_relations = []
- if (
- self.kafka_client.is_missing_data_in_unit()
- and self.kafka_client.is_missing_data_in_app()
- ):
+ if not self.kafka.host or not self.kafka.port:
missing_relations.append("kafka")
if not config.mongodb_uri and self.mongodb_client.is_missing_data_in_unit():
missing_relations.append("mongodb")
@@ -185,8 +185,8 @@
"OSMPOL_GLOBAL_LOGLEVEL": config.log_level,
# Kafka configuration
"OSMPOL_MESSAGE_DRIVER": "kafka",
- "OSMPOL_MESSAGE_HOST": self.kafka_client.host,
- "OSMPOL_MESSAGE_PORT": self.kafka_client.port,
+ "OSMPOL_MESSAGE_HOST": self.kafka.host,
+ "OSMPOL_MESSAGE_PORT": self.kafka.port,
# Database configuration
"OSMPOL_DATABASE_DRIVER": "mongo",
}
diff --git a/installers/charm/pol/tests/test_charm.py b/installers/charm/pol/tests/test_charm.py
index 330ecee..6cf435d 100644
--- a/installers/charm/pol/tests/test_charm.py
+++ b/installers/charm/pol/tests/test_charm.py
@@ -132,7 +132,7 @@
kafka_relation_id = self.harness.add_relation("kafka", "kafka")
self.harness.add_relation_unit(kafka_relation_id, "kafka/0")
self.harness.update_relation_data(
- kafka_relation_id, "kafka/0", {"host": "kafka", "port": 9092}
+ kafka_relation_id, "kafka", {"host": "kafka", "port": 9092}
)
def initialize_mongo_config(self):
diff --git a/installers/charm/pol/tox.ini b/installers/charm/pol/tox.ini
index 8e3318a..f3c9144 100644
--- a/installers/charm/pol/tox.ini
+++ b/installers/charm/pol/tox.ini
@@ -29,8 +29,10 @@
[testenv]
basepython = python3.8
-setenv = VIRTUAL_ENV={envdir}
- PYTHONDONTWRITEBYTECODE = 1
+setenv =
+ VIRTUAL_ENV={envdir}
+ PYTHONPATH = {toxinidir}:{toxinidir}/lib:{toxinidir}/src
+ PYTHONDONTWRITEBYTECODE = 1
deps = -r{toxinidir}/requirements.txt
@@ -99,7 +101,7 @@
charmcraft
sh
commands =
- charmcraft build
+ charmcraft pack
sh -c 'ubuntu_version=20.04; \
architectures="amd64-aarch64-arm64"; \
charm_name=`cat metadata.yaml | grep -E "^name: " | cut -f 2 -d " "`; \