2 # Copyright 2021 Canonical Ltd.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: legal@canonical.com
19 # To get in touch with the maintainers, please contact:
20 # osm-charmers@lists.launchpad.net
24 from pathlib
import Path
25 from typing
import Dict
, List
, NoReturn
26 from urllib
.parse
import urlparse
28 from ops
.charm
import CharmBase
29 from ops
.framework
import EventBase
, StoredState
30 from ops
.main
import main
31 from ops
.model
import ActiveStatus
, Application
, BlockedStatus
, MaintenanceStatus
, Unit
32 from oci_image
import OCIImageResource
, OCIImageResourceError
34 from pod_spec
import make_pod_spec
36 logger
= logging
.getLogger(__name__
)
38 KAFKA_EXPORTER_PORT
= 9308
41 class RelationsMissing(Exception):
42 def __init__(self
, missing_relations
: List
):
44 if missing_relations
and isinstance(missing_relations
, list):
45 self
.message
+= f
'Waiting for {", ".join(missing_relations)} relation'
46 if "," in self
.message
:
50 class RelationDefinition
:
51 def __init__(self
, relation_name
: str, keys
: List
, source_type
):
52 if source_type
!= Application
and source_type
!= Unit
:
54 "source_type should be ops.model.Application or ops.model.Unit"
56 self
.relation_name
= relation_name
58 self
.source_type
= source_type
61 def check_missing_relation_data(
63 expected_relations_data
: List
[RelationDefinition
],
65 missing_relations
= []
66 for relation_data
in expected_relations_data
:
68 f
"{relation_data.relation_name}_{k}" in data
for k
in relation_data
.keys
70 missing_relations
.append(relation_data
.relation_name
)
72 raise RelationsMissing(missing_relations
)
75 def get_relation_data(
77 relation_data
: RelationDefinition
,
80 relation
= charm
.model
.get_relation(relation_data
.relation_name
)
83 charm
.app
if relation_data
.source_type
== Application
else charm
.unit
85 expected_type
= relation_data
.source_type
86 for app_unit
in relation
.data
:
87 if app_unit
!= self_app_unit
and isinstance(app_unit
, expected_type
):
88 if all(k
in relation
.data
[app_unit
] for k
in relation_data
.keys
):
89 for k
in relation_data
.keys
:
90 data
[f
"{relation_data.relation_name}_{k}"] = relation
.data
[
97 class KafkaExporterCharm(CharmBase
):
98 """Kafka Exporter Charm."""
100 state
= StoredState()
102 def __init__(self
, *args
) -> NoReturn
:
103 """Kafka Exporter Charm constructor."""
104 super().__init
__(*args
)
106 # Internal state initialization
107 self
.state
.set_default(pod_spec
=None)
109 self
.port
= KAFKA_EXPORTER_PORT
110 self
.image
= OCIImageResource(self
, "image")
112 # Registering regular events
113 self
.framework
.observe(self
.on
.start
, self
.configure_pod
)
114 self
.framework
.observe(self
.on
.config_changed
, self
.configure_pod
)
116 # Registering required relation events
117 self
.framework
.observe(self
.on
.kafka_relation_changed
, self
.configure_pod
)
119 # Registering required relation departed events
120 self
.framework
.observe(self
.on
.kafka_relation_departed
, self
.configure_pod
)
122 # Registering provided relation events
123 self
.framework
.observe(
124 self
.on
.prometheus_scrape_relation_joined
, self
._publish
_scrape
_info
126 self
.framework
.observe(
127 self
.on
.grafana_dashboard_relation_joined
, self
._publish
_dashboard
_info
130 def _publish_scrape_info(self
, event
: EventBase
) -> NoReturn
:
131 """Publishes scrape information.
134 event (EventBase): Exporter relation event.
137 "hostname": urlparse(self
.model
.config
["site_url"]).hostname
138 if self
.model
.config
["site_url"]
139 else self
.model
.app
.name
,
140 "port": "80" if self
.model
.config
["site_url"] else str(KAFKA_EXPORTER_PORT
),
141 "metrics_path": "/metrics",
142 "scrape_interval": "30s",
143 "scrape_timeout": "15s",
145 for k
, v
in rel_data
.items():
146 event
.relation
.data
[self
.unit
][k
] = v
148 def _publish_dashboard_info(self
, event
: EventBase
) -> NoReturn
:
149 """Publishes dashboard information.
152 event (EventBase): Exporter relation event.
156 "dashboard": Path("files/kafka_exporter_dashboard.json").read_text(),
158 for k
, v
in rel_data
.items():
159 event
.relation
.data
[self
.unit
][k
] = v
162 def relations_requirements(self
):
163 return [RelationDefinition("kafka", ["host", "port"], Unit
)]
165 def get_relation_state(self
):
167 for relation_requirements
in self
.relations_requirements
:
168 data
= get_relation_data(self
, relation_requirements
)
169 relation_state
= {**relation_state
, **data
}
170 check_missing_relation_data(relation_state
, self
.relations_requirements
)
171 return relation_state
173 def configure_pod(self
, _
=None) -> NoReturn
:
174 """Assemble the pod spec and apply it, if possible.
177 event (EventBase): Hook or Relation event that started the
180 if not self
.unit
.is_leader():
181 self
.unit
.status
= ActiveStatus("ready")
184 relation_state
= None
186 relation_state
= self
.get_relation_state()
187 except RelationsMissing
as exc
:
188 logger
.exception("Relation missing error")
189 self
.unit
.status
= BlockedStatus(exc
.message
)
192 self
.unit
.status
= MaintenanceStatus("Assembling pod spec")
194 # Fetch image information
196 self
.unit
.status
= MaintenanceStatus("Fetching image information")
197 image_info
= self
.image
.fetch()
198 except OCIImageResourceError
:
199 self
.unit
.status
= BlockedStatus("Error fetching image information")
203 pod_spec
= make_pod_spec(
210 except ValueError as exc
:
211 logger
.exception("Config/Relation data validation error")
212 self
.unit
.status
= BlockedStatus(str(exc
))
215 if self
.state
.pod_spec
!= pod_spec
:
216 self
.model
.pod
.set_spec(pod_spec
)
217 self
.state
.pod_spec
= pod_spec
219 self
.unit
.status
= ActiveStatus("ready")
222 if __name__
== "__main__":
223 main(KafkaExporterCharm
)