+
+ def authenticate(self):
+ """Generate an authentication token and endpoint for alarm request."""
+ try:
+ # Check for a tenant_id
+ auth_token = self._common._authenticate()
+ endpoint = self._common.get_endpoint("alarming")
+ return auth_token, endpoint
+ except Exception as exc:
+ log.warn("Authentication to Keystone failed:%s", exc)
+ return None, None
+
+ def get_alarm_state(self, endpoint, auth_token, alarm_id):
+ """Get the state of the alarm."""
+ url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
+
+ try:
+ alarm_state = self._common._perform_request(
+ url, auth_token, req_type="get")
+ return json.loads(alarm_state.text)
+ except Exception as exc:
+ log.warn("Failed to get the state of the alarm:%s", exc)
+ return None
+
+ def check_for_metric(self, auth_token, m_name, r_id):
+ """Check for the alarm metric."""
+ try:
+ endpoint = self._common.get_endpoint("metric")
+
+ url = "{}/v1/metric/".format(endpoint)
+ metric_list = self._common._perform_request(
+ url, auth_token, req_type="get")
+
+ for metric in json.loads(metric_list.text):
+ name = metric['name']
+ resource = metric['resource_id']
+ if (name == m_name and resource == r_id):
+ metric_id = metric['id']
+ log.info("The required metric exists, an alarm will be created.")
+ return metric_id
+ except Exception as exc:
+ log.info("Desired Gnocchi metric not found:%s", exc)
+ return None