2 # Copyright 2021 Canonical Ltd.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: legal@canonical.com
19 # To get in touch with the maintainers, please contact:
20 # osm-charmers@lists.launchpad.net
23 # pylint: disable=E0213
25 from ipaddress
import ip_network
27 from pathlib
import Path
28 from typing
import NoReturn
, Optional
29 from urllib
.parse
import urlparse
31 from charms
.kafka_k8s
.v0
.kafka
import KafkaEvents
, KafkaRequires
32 from ops
.main
import main
33 from opslib
.osm
.charm
import CharmedOsmBase
, RelationsMissing
34 from opslib
.osm
.interfaces
.grafana
import GrafanaDashboardTarget
35 from opslib
.osm
.interfaces
.prometheus
import PrometheusScrapeTarget
36 from opslib
.osm
.pod
import (
38 IngressResourceV3Builder
,
41 from opslib
.osm
.validator
import ModelValidator
, validator
44 logger
= logging
.getLogger(__name__
)
49 class ConfigModel(ModelValidator
):
50 site_url
: Optional
[str]
51 cluster_issuer
: Optional
[str]
52 ingress_class
: Optional
[str]
53 ingress_whitelist_source_range
: Optional
[str]
54 tls_secret_name
: Optional
[str]
55 image_pull_policy
: str
56 security_context
: bool
57 kafka_endpoint
: Optional
[str]
59 @validator("site_url")
60 def validate_site_url(cls
, v
):
63 if not parsed
.scheme
.startswith("http"):
64 raise ValueError("value must start with http")
67 @validator("ingress_whitelist_source_range")
68 def validate_ingress_whitelist_source_range(cls
, v
):
73 @validator("image_pull_policy")
74 def validate_image_pull_policy(cls
, v
):
77 "ifnotpresent": "IfNotPresent",
81 if v
not in values
.keys():
82 raise ValueError("value must be always, ifnotpresent or never")
85 @validator("kafka_endpoint")
86 def validate_kafka_endpoint(cls
, v
):
87 if v
and len(v
.split(":")) != 2:
88 raise ValueError("value must be in the format <host>:<port>")
93 def __init__(self
, host
: str, port
: str) -> None:
98 class KafkaExporterCharm(CharmedOsmBase
):
101 def __init__(self
, *args
) -> NoReturn
:
102 super().__init
__(*args
, oci_image
="image")
104 # Provision Kafka relation to exchange information
105 self
.kafka
= KafkaRequires(self
)
106 self
.framework
.observe(self
.on
.kafka_available
, self
.configure_pod
)
107 self
.framework
.observe(self
.on
.kafka_broken
, self
.configure_pod
)
109 # Register relation to provide a Scraping Target
110 self
.scrape_target
= PrometheusScrapeTarget(self
, "prometheus-scrape")
111 self
.framework
.observe(
112 self
.on
["prometheus-scrape"].relation_joined
, self
._publish
_scrape
_info
115 # Register relation to provide a Dasboard Target
116 self
.dashboard_target
= GrafanaDashboardTarget(self
, "grafana-dashboard")
117 self
.framework
.observe(
118 self
.on
["grafana-dashboard"].relation_joined
, self
._publish
_dashboard
_info
121 def _publish_scrape_info(self
, event
) -> NoReturn
:
122 """Publishes scraping information for Prometheus.
125 event (EventBase): Prometheus relation event.
127 if self
.unit
.is_leader():
129 urlparse(self
.model
.config
["site_url"]).hostname
130 if self
.model
.config
["site_url"]
131 else self
.model
.app
.name
134 if self
.model
.config
.get("site_url", "").startswith("https://"):
136 elif self
.model
.config
.get("site_url", "").startswith("http://"):
139 self
.scrape_target
.publish_info(
142 metrics_path
="/metrics",
143 scrape_interval
="30s",
144 scrape_timeout
="15s",
147 def _publish_dashboard_info(self
, event
) -> NoReturn
:
148 """Publish dashboards for Grafana.
151 event (EventBase): Grafana relation event.
153 if self
.unit
.is_leader():
154 self
.dashboard_target
.publish_info(
156 dashboard
=Path("templates/kafka_exporter_dashboard.json").read_text(),
159 def _is_kafka_endpoint_set(self
, config
: ConfigModel
) -> bool:
160 """Check if Kafka endpoint is set."""
161 return config
.kafka_endpoint
or self
._is
_kafka
_relation
_set
()
163 def _is_kafka_relation_set(self
) -> bool:
164 """Check if the Kafka relation is set or not."""
165 return self
.kafka
.host
and self
.kafka
.port
168 def kafka_endpoint(self
) -> KafkaEndpoint
:
169 config
= ConfigModel(**dict(self
.config
))
170 if config
.kafka_endpoint
:
171 host
, port
= config
.kafka_endpoint
.split(":")
173 host
= self
.kafka
.host
174 port
= self
.kafka
.port
175 return KafkaEndpoint(host
, port
)
177 def build_pod_spec(self
, image_info
):
178 """Build the PodSpec to be used.
181 image_info (str): container image information.
184 Dict: PodSpec information.
187 config
= ConfigModel(**dict(self
.config
))
190 if not self
._is
_kafka
_endpoint
_set
(config
):
191 raise RelationsMissing(["kafka"])
193 # Create Builder for the PodSpec
194 pod_spec_builder
= PodSpecV3Builder(
195 enable_security_context
=config
.security_context
199 container_builder
= ContainerV3Builder(
202 config
.image_pull_policy
,
203 run_as_non_root
=config
.security_context
,
205 container_builder
.add_port(name
="exporter", port
=PORT
)
206 container_builder
.add_http_readiness_probe(
209 initial_delay_seconds
=10,
215 container_builder
.add_http_liveness_probe(
218 initial_delay_seconds
=60,
220 failure_threshold
=10,
222 container_builder
.add_command(
225 f
"--kafka.server={self.kafka_endpoint.host}:{self.kafka_endpoint.port}",
228 container
= container_builder
.build()
230 # Add container to PodSpec
231 pod_spec_builder
.add_container(container
)
233 # Add ingress resources to PodSpec if site url exists
235 parsed
= urlparse(config
.site_url
)
237 if config
.ingress_class
:
238 annotations
["kubernetes.io/ingress.class"] = config
.ingress_class
239 ingress_resource_builder
= IngressResourceV3Builder(
240 f
"{self.app.name}-ingress", annotations
243 if config
.ingress_whitelist_source_range
:
245 "nginx.ingress.kubernetes.io/whitelist-source-range"
246 ] = config
.ingress_whitelist_source_range
248 if config
.cluster_issuer
:
249 annotations
["cert-manager.io/cluster-issuer"] = config
.cluster_issuer
251 if parsed
.scheme
== "https":
252 ingress_resource_builder
.add_tls(
253 [parsed
.hostname
], config
.tls_secret_name
256 annotations
["nginx.ingress.kubernetes.io/ssl-redirect"] = "false"
258 ingress_resource_builder
.add_rule(parsed
.hostname
, self
.app
.name
, PORT
)
259 ingress_resource
= ingress_resource_builder
.build()
260 pod_spec_builder
.add_ingress_resource(ingress_resource
)
262 return pod_spec_builder
.build()
265 if __name__
== "__main__":
266 main(KafkaExporterCharm
)