#!/usr/bin/env python3 # Copyright 2021 Canonical Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # # For those usages not covered by the Apache License, Version 2.0 please # contact: legal@canonical.com # # To get in touch with the maintainers, please contact: # osm-charmers@lists.launchpad.net ## import logging from pathlib import Path from typing import Dict, List, NoReturn from urllib.parse import urlparse from ops.charm import CharmBase from ops.framework import EventBase, StoredState from ops.main import main from ops.model import ActiveStatus, Application, BlockedStatus, MaintenanceStatus, Unit from oci_image import OCIImageResource, OCIImageResourceError from pod_spec import make_pod_spec logger = logging.getLogger(__name__) KAFKA_EXPORTER_PORT = 9308 class RelationsMissing(Exception): def __init__(self, missing_relations: List): self.message = "" if missing_relations and isinstance(missing_relations, list): self.message += f'Waiting for {", ".join(missing_relations)} relation' if "," in self.message: self.message += "s" class RelationDefinition: def __init__(self, relation_name: str, keys: List, source_type): if source_type != Application and source_type != Unit: raise TypeError( "source_type should be ops.model.Application or ops.model.Unit" ) self.relation_name = relation_name self.keys = keys self.source_type = source_type def check_missing_relation_data( data: Dict, expected_relations_data: List[RelationDefinition], ): missing_relations = [] for relation_data in expected_relations_data: if not all( f"{relation_data.relation_name}_{k}" in data for k in relation_data.keys ): missing_relations.append(relation_data.relation_name) if missing_relations: raise RelationsMissing(missing_relations) def get_relation_data( charm: CharmBase, relation_data: RelationDefinition, ) -> Dict: data = {} relation = charm.model.get_relation(relation_data.relation_name) if relation: self_app_unit = ( charm.app if relation_data.source_type == Application else charm.unit ) expected_type = relation_data.source_type for app_unit in relation.data: if app_unit != self_app_unit and isinstance(app_unit, expected_type): if all(k in relation.data[app_unit] for k in relation_data.keys): for k in relation_data.keys: data[f"{relation_data.relation_name}_{k}"] = relation.data[ app_unit ].get(k) break return data class KafkaExporterCharm(CharmBase): """Kafka Exporter Charm.""" state = StoredState() def __init__(self, *args) -> NoReturn: """Kafka Exporter Charm constructor.""" super().__init__(*args) # Internal state initialization self.state.set_default(pod_spec=None) self.port = KAFKA_EXPORTER_PORT self.image = OCIImageResource(self, "image") # Registering regular events self.framework.observe(self.on.start, self.configure_pod) self.framework.observe(self.on.config_changed, self.configure_pod) # Registering required relation events self.framework.observe(self.on.kafka_relation_changed, self.configure_pod) # Registering required relation departed events self.framework.observe(self.on.kafka_relation_departed, self.configure_pod) # Registering provided relation events self.framework.observe( self.on.prometheus_scrape_relation_joined, self._publish_scrape_info ) self.framework.observe( self.on.grafana_dashboard_relation_joined, self._publish_dashboard_info ) def _publish_scrape_info(self, event: EventBase) -> NoReturn: """Publishes scrape information. Args: event (EventBase): Exporter relation event. """ rel_data = { "hostname": urlparse(self.model.config["site_url"]).hostname if self.model.config["site_url"] else self.model.app.name, "port": "80" if self.model.config["site_url"] else str(KAFKA_EXPORTER_PORT), "metrics_path": "/metrics", "scrape_interval": "30s", "scrape_timeout": "15s", } for k, v in rel_data.items(): event.relation.data[self.unit][k] = v def _publish_dashboard_info(self, event: EventBase) -> NoReturn: """Publishes dashboard information. Args: event (EventBase): Exporter relation event. """ rel_data = { "name": "osm-kafka", "dashboard": Path("files/kafka_exporter_dashboard.json").read_text(), } for k, v in rel_data.items(): event.relation.data[self.unit][k] = v @property def relations_requirements(self): return [RelationDefinition("kafka", ["host", "port"], Unit)] def get_relation_state(self): relation_state = {} for relation_requirements in self.relations_requirements: data = get_relation_data(self, relation_requirements) relation_state = {**relation_state, **data} check_missing_relation_data(relation_state, self.relations_requirements) return relation_state def configure_pod(self, _=None) -> NoReturn: """Assemble the pod spec and apply it, if possible. Args: event (EventBase): Hook or Relation event that started the function. """ if not self.unit.is_leader(): self.unit.status = ActiveStatus("ready") return relation_state = None try: relation_state = self.get_relation_state() except RelationsMissing as exc: logger.exception("Relation missing error") self.unit.status = BlockedStatus(exc.message) return self.unit.status = MaintenanceStatus("Assembling pod spec") # Fetch image information try: self.unit.status = MaintenanceStatus("Fetching image information") image_info = self.image.fetch() except OCIImageResourceError: self.unit.status = BlockedStatus("Error fetching image information") return try: pod_spec = make_pod_spec( image_info, self.model.config, relation_state, self.model.app.name, self.port, ) except ValueError as exc: logger.exception("Config/Relation data validation error") self.unit.status = BlockedStatus(str(exc)) return if self.state.pod_spec != pod_spec: self.model.pod.set_spec(pod_spec) self.state.pod_spec = pod_spec self.unit.status = ActiveStatus("ready") if __name__ == "__main__": main(KafkaExporterCharm)