9f03a343a681dbb80eae69e98f3bd24959555e88
[osm/devops.git] / installers / charm / kafka-exporter / src / charm.py
1 #!/usr/bin/env python3
2 # Copyright 2021 Canonical Ltd.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: legal@canonical.com
18 #
19 # To get in touch with the maintainers, please contact:
20 # osm-charmers@lists.launchpad.net
21 ##
22
23 import logging
24 from pathlib import Path
25 from typing import Dict, List, NoReturn
26 from urllib.parse import urlparse
27
28 from ops.charm import CharmBase
29 from ops.framework import EventBase, StoredState
30 from ops.main import main
31 from ops.model import ActiveStatus, Application, BlockedStatus, MaintenanceStatus, Unit
32 from oci_image import OCIImageResource, OCIImageResourceError
33
34 from pod_spec import make_pod_spec
35
36 logger = logging.getLogger(__name__)
37
38 KAFKA_EXPORTER_PORT = 9308
39
40
41 class RelationsMissing(Exception):
42 def __init__(self, missing_relations: List):
43 self.message = ""
44 if missing_relations and isinstance(missing_relations, list):
45 self.message += f'Waiting for {", ".join(missing_relations)} relation'
46 if "," in self.message:
47 self.message += "s"
48
49
50 class RelationDefinition:
51 def __init__(self, relation_name: str, keys: List, source_type):
52 if source_type != Application and source_type != Unit:
53 raise TypeError(
54 "source_type should be ops.model.Application or ops.model.Unit"
55 )
56 self.relation_name = relation_name
57 self.keys = keys
58 self.source_type = source_type
59
60
61 def check_missing_relation_data(
62 data: Dict,
63 expected_relations_data: List[RelationDefinition],
64 ):
65 missing_relations = []
66 for relation_data in expected_relations_data:
67 if not all(
68 f"{relation_data.relation_name}_{k}" in data for k in relation_data.keys
69 ):
70 missing_relations.append(relation_data.relation_name)
71 if missing_relations:
72 raise RelationsMissing(missing_relations)
73
74
75 def get_relation_data(
76 charm: CharmBase,
77 relation_data: RelationDefinition,
78 ) -> Dict:
79 data = {}
80 relation = charm.model.get_relation(relation_data.relation_name)
81 if relation:
82 self_app_unit = (
83 charm.app if relation_data.source_type == Application else charm.unit
84 )
85 expected_type = relation_data.source_type
86 for app_unit in relation.data:
87 if app_unit != self_app_unit and isinstance(app_unit, expected_type):
88 if all(k in relation.data[app_unit] for k in relation_data.keys):
89 for k in relation_data.keys:
90 data[f"{relation_data.relation_name}_{k}"] = relation.data[
91 app_unit
92 ].get(k)
93 break
94 return data
95
96
97 class KafkaExporterCharm(CharmBase):
98 """Kafka Exporter Charm."""
99
100 state = StoredState()
101
102 def __init__(self, *args) -> NoReturn:
103 """Kafka Exporter Charm constructor."""
104 super().__init__(*args)
105
106 # Internal state initialization
107 self.state.set_default(pod_spec=None)
108
109 self.port = KAFKA_EXPORTER_PORT
110 self.image = OCIImageResource(self, "image")
111
112 # Registering regular events
113 self.framework.observe(self.on.start, self.configure_pod)
114 self.framework.observe(self.on.config_changed, self.configure_pod)
115
116 # Registering required relation events
117 self.framework.observe(self.on.kafka_relation_changed, self.configure_pod)
118
119 # Registering required relation departed events
120 self.framework.observe(self.on.kafka_relation_departed, self.configure_pod)
121
122 # Registering provided relation events
123 self.framework.observe(
124 self.on.prometheus_scrape_relation_joined, self._publish_scrape_info
125 )
126 self.framework.observe(
127 self.on.grafana_dashboard_relation_joined, self._publish_dashboard_info
128 )
129
130 def _publish_scrape_info(self, event: EventBase) -> NoReturn:
131 """Publishes scrape information.
132
133 Args:
134 event (EventBase): Exporter relation event.
135 """
136 rel_data = {
137 "hostname": urlparse(self.model.config["site_url"]).hostname
138 if self.model.config["site_url"]
139 else self.model.app.name,
140 "port": "80" if self.model.config["site_url"] else str(KAFKA_EXPORTER_PORT),
141 "metrics_path": "/metrics",
142 "scrape_interval": "30s",
143 "scrape_timeout": "15s",
144 }
145 for k, v in rel_data.items():
146 event.relation.data[self.unit][k] = v
147
148 def _publish_dashboard_info(self, event: EventBase) -> NoReturn:
149 """Publishes dashboard information.
150
151 Args:
152 event (EventBase): Exporter relation event.
153 """
154 rel_data = {
155 "name": "osm-kafka",
156 "dashboard": Path("files/kafka_exporter_dashboard.json").read_text(),
157 }
158 for k, v in rel_data.items():
159 event.relation.data[self.unit][k] = v
160
161 @property
162 def relations_requirements(self):
163 return [RelationDefinition("kafka", ["host", "port"], Unit)]
164
165 def get_relation_state(self):
166 relation_state = {}
167 for relation_requirements in self.relations_requirements:
168 data = get_relation_data(self, relation_requirements)
169 relation_state = {**relation_state, **data}
170 check_missing_relation_data(relation_state, self.relations_requirements)
171 return relation_state
172
173 def configure_pod(self, _=None) -> NoReturn:
174 """Assemble the pod spec and apply it, if possible.
175
176 Args:
177 event (EventBase): Hook or Relation event that started the
178 function.
179 """
180 if not self.unit.is_leader():
181 self.unit.status = ActiveStatus("ready")
182 return
183
184 relation_state = None
185 try:
186 relation_state = self.get_relation_state()
187 except RelationsMissing as exc:
188 logger.exception("Relation missing error")
189 self.unit.status = BlockedStatus(exc.message)
190 return
191
192 self.unit.status = MaintenanceStatus("Assembling pod spec")
193
194 # Fetch image information
195 try:
196 self.unit.status = MaintenanceStatus("Fetching image information")
197 image_info = self.image.fetch()
198 except OCIImageResourceError:
199 self.unit.status = BlockedStatus("Error fetching image information")
200 return
201
202 try:
203 pod_spec = make_pod_spec(
204 image_info,
205 self.model.config,
206 relation_state,
207 self.model.app.name,
208 self.port,
209 )
210 except ValueError as exc:
211 logger.exception("Config/Relation data validation error")
212 self.unit.status = BlockedStatus(str(exc))
213 return
214
215 if self.state.pod_spec != pod_spec:
216 self.model.pod.set_spec(pod_spec)
217 self.state.pod_spec = pod_spec
218
219 self.unit.status = ActiveStatus("ready")
220
221
222 if __name__ == "__main__":
223 main(KafkaExporterCharm)