Skip to content
Snippets Groups Projects
charm.py 8.67 KiB
Newer Older
#!/usr/bin/env python3
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: legal@canonical.com
#
# To get in touch with the maintainers, please contact:
# osm-charmers@lists.launchpad.net
##

# pylint: disable=E0213

from ipaddress import ip_network
import logging
from pathlib import Path
from typing import NoReturn, Optional
from urllib.parse import urlparse
from charms.kafka_k8s.v0.kafka import KafkaEvents, KafkaRequires
from ops.main import main
from opslib.osm.charm import CharmedOsmBase, RelationsMissing
from opslib.osm.interfaces.grafana import GrafanaDashboardTarget
from opslib.osm.interfaces.prometheus import PrometheusScrapeTarget
from opslib.osm.pod import (
    ContainerV3Builder,
    IngressResourceV3Builder,
    PodSpecV3Builder,
)
from opslib.osm.validator import ModelValidator, validator


logger = logging.getLogger(__name__)

PORT = 9308
class ConfigModel(ModelValidator):
    site_url: Optional[str]
    cluster_issuer: Optional[str]
    ingress_class: Optional[str]
    ingress_whitelist_source_range: Optional[str]
    tls_secret_name: Optional[str]
    image_pull_policy: str
    security_context: bool
    kafka_endpoint: Optional[str]
    @validator("site_url")
    def validate_site_url(cls, v):
        if v:
            parsed = urlparse(v)
            if not parsed.scheme.startswith("http"):
                raise ValueError("value must start with http")
        return v
    @validator("ingress_whitelist_source_range")
    def validate_ingress_whitelist_source_range(cls, v):
        if v:
            ip_network(v)
        return v
    @validator("image_pull_policy")
    def validate_image_pull_policy(cls, v):
        values = {
            "always": "Always",
            "ifnotpresent": "IfNotPresent",
            "never": "Never",
        }
        v = v.lower()
        if v not in values.keys():
            raise ValueError("value must be always, ifnotpresent or never")
        return values[v]

    @validator("kafka_endpoint")
    def validate_kafka_endpoint(cls, v):
        if v and len(v.split(":")) != 2:
            raise ValueError("value must be in the format <host>:<port>")
        return v


class KafkaEndpoint:
    def __init__(self, host: str, port: str) -> None:
        self.host = host
        self.port = port

class KafkaExporterCharm(CharmedOsmBase):
    on = KafkaEvents()

    def __init__(self, *args) -> NoReturn:
        super().__init__(*args, oci_image="image")
        # Provision Kafka relation to exchange information
        self.kafka = KafkaRequires(self)
        self.framework.observe(self.on.kafka_available, self.configure_pod)
        self.framework.observe(self.on.kafka_broken, self.configure_pod)
        # Register relation to provide a Scraping Target
        self.scrape_target = PrometheusScrapeTarget(self, "prometheus-scrape")
        self.framework.observe(
            self.on["prometheus-scrape"].relation_joined, self._publish_scrape_info

        # Register relation to provide a Dasboard Target
        self.dashboard_target = GrafanaDashboardTarget(self, "grafana-dashboard")
        self.framework.observe(
            self.on["grafana-dashboard"].relation_joined, self._publish_dashboard_info
    def _publish_scrape_info(self, event) -> NoReturn:
        """Publishes scraping information for Prometheus.

        Args:
            event (EventBase): Prometheus relation event.
        """
        if self.unit.is_leader():
            hostname = (
                urlparse(self.model.config["site_url"]).hostname
                if self.model.config["site_url"]
                else self.model.app.name
            )
            port = str(PORT)
            if self.model.config.get("site_url", "").startswith("https://"):
                port = "443"
            elif self.model.config.get("site_url", "").startswith("http://"):
                port = "80"

            self.scrape_target.publish_info(
                hostname=hostname,
                port=port,
                metrics_path="/metrics",
                scrape_interval="30s",
                scrape_timeout="15s",
            )

    def _publish_dashboard_info(self, event) -> NoReturn:
        """Publish dashboards for Grafana.
            event (EventBase): Grafana relation event.
        if self.unit.is_leader():
            self.dashboard_target.publish_info(
                name="osm-kafka",
                dashboard=Path("templates/kafka_exporter_dashboard.json").read_text(),
    def _is_kafka_endpoint_set(self, config: ConfigModel) -> bool:
        """Check if Kafka endpoint is set."""
        return config.kafka_endpoint or self._is_kafka_relation_set()
    def _is_kafka_relation_set(self) -> bool:
        """Check if the Kafka relation is set or not."""
        return self.kafka.host and self.kafka.port
    @property
    def kafka_endpoint(self) -> KafkaEndpoint:
        config = ConfigModel(**dict(self.config))
        if config.kafka_endpoint:
            host, port = config.kafka_endpoint.split(":")
        else:
            host = self.kafka.host
            port = self.kafka.port
        return KafkaEndpoint(host, port)

    def build_pod_spec(self, image_info):
        """Build the PodSpec to be used.
            image_info (str): container image information.

        Returns:
            Dict: PodSpec information.
        # Validate config
        config = ConfigModel(**dict(self.config))

        # Check relations
        if not self._is_kafka_endpoint_set(config):
            raise RelationsMissing(["kafka"])

        # Create Builder for the PodSpec
        pod_spec_builder = PodSpecV3Builder(
            enable_security_context=config.security_context
        )

        # Build container
        container_builder = ContainerV3Builder(
            self.app.name,
            image_info,
            config.image_pull_policy,
            run_as_non_root=config.security_context,
        container_builder.add_port(name="exporter", port=PORT)
        container_builder.add_http_readiness_probe(
            path="/api/health",
            port=PORT,
            initial_delay_seconds=10,
            period_seconds=10,
            timeout_seconds=5,
            success_threshold=1,
            failure_threshold=3,
        )
        container_builder.add_http_liveness_probe(
            path="/api/health",
            port=PORT,
            initial_delay_seconds=60,
            timeout_seconds=30,
            failure_threshold=10,
        )
        container_builder.add_command(
            [
                "kafka_exporter",
                f"--kafka.server={self.kafka_endpoint.host}:{self.kafka_endpoint.port}",
            ]
        )
        container = container_builder.build()

        # Add container to PodSpec
        pod_spec_builder.add_container(container)

        # Add ingress resources to PodSpec if site url exists
        if config.site_url:
            parsed = urlparse(config.site_url)
            annotations = {}
            if config.ingress_class:
                annotations["kubernetes.io/ingress.class"] = config.ingress_class
            ingress_resource_builder = IngressResourceV3Builder(
                f"{self.app.name}-ingress", annotations
            if config.ingress_whitelist_source_range:
                annotations[
                    "nginx.ingress.kubernetes.io/whitelist-source-range"
                ] = config.ingress_whitelist_source_range

            if config.cluster_issuer:
                annotations["cert-manager.io/cluster-issuer"] = config.cluster_issuer

            if parsed.scheme == "https":
                ingress_resource_builder.add_tls(
                    [parsed.hostname], config.tls_secret_name
                )
            else:
                annotations["nginx.ingress.kubernetes.io/ssl-redirect"] = "false"

            ingress_resource_builder.add_rule(parsed.hostname, self.app.name, PORT)
            ingress_resource = ingress_resource_builder.build()
            pod_spec_builder.add_ingress_resource(ingress_resource)

        return pod_spec_builder.build()


if __name__ == "__main__":
    main(KafkaExporterCharm)