- return self.conf.get('prometheus', 'url') + "/api/v1/query?" + query
-
- def _get_metric_value_from_response(self, json_response):
- result = json_response['data']['result']
+ return self.conf.get("prometheus", "url") + "/api/v1/query?" + query
+
+ def _build_headers(self) -> Dict[str, str]:
+ headers = {}
+ user = self.conf.get("prometheus", "user")
+ password = self.conf.get("prometheus", "password")
+ if user and password:
+ _phrase = f"{user}:{password}".encode("utf-8")
+ token = base64.b64encode(_phrase).decode("utf-8")
+ headers["Authorization"] = f"Basic {token}"
+ return headers
+
+ def _get_metric_data_from_response(self, json_response) -> List[Dict[str, str]]:
+ result = json_response["data"]["result"]
+ metrics_data = []