log.info("OpenStack alarm action required.")
- auth_token = Common.get_auth_token(vim_uuid)
+ verify_ssl = self._auth_manager.is_verify_ssl(vim_uuid)
- alarm_endpoint = Common.get_endpoint("alarming", vim_uuid)
- metric_endpoint = Common.get_endpoint("metric", vim_uuid)
+ auth_token = Common.get_auth_token(vim_uuid, verify_ssl=verify_ssl)
+
+ alarm_endpoint = Common.get_endpoint("alarming", vim_uuid, verify_ssl=verify_ssl)
+ metric_endpoint = Common.get_endpoint("metric", vim_uuid, verify_ssl=verify_ssl)
vim_account = self._auth_manager.get_credentials(vim_uuid)
vim_config = json.loads(vim_account.config)
metric_name = alarm_details['metric_name'].lower()
resource_id = alarm_details['resource_uuid']
- self.check_for_metric(auth_token, metric_endpoint, metric_name, resource_id)
+ self.check_for_metric(auth_token, metric_endpoint, metric_name, resource_id, verify_ssl)
alarm_id = self.configure_alarm(
- alarm_endpoint, auth_token, alarm_details, vim_config)
+ alarm_endpoint, auth_token, alarm_details, vim_config, verify_ssl)
log.info("Alarm successfully created")
self._database_manager.save_alarm(alarm_id,
alarm_list = None
try:
alarm_list = self.list_alarms(
- alarm_endpoint, auth_token, list_details)
+ alarm_endpoint, auth_token, list_details, verify_ssl)
except Exception as e:
log.exception("Error listing alarms")
raise e
status = False
try:
self.delete_alarm(
- alarm_endpoint, auth_token, alarm_id)
+ alarm_endpoint, auth_token, alarm_id, verify_ssl)
status = True
except Exception as e:
log.exception("Error deleting alarm")
alarm_id = values['ack_details']['alarm_uuid']
self.update_alarm_state(
- alarm_endpoint, auth_token, alarm_id)
+ alarm_endpoint, auth_token, alarm_id, verify_ssl)
log.info("Acknowledged the alarm and cleared it.")
except Exception as e:
status = False
try:
alarm_id = self.update_alarm(
- alarm_endpoint, auth_token, alarm_details, vim_config)
+ alarm_endpoint, auth_token, alarm_details, vim_config, verify_ssl)
status = True
except Exception as e:
log.exception("Error updating alarm")
else:
log.debug("Unknown key, no action will be performed")
- def configure_alarm(self, alarm_endpoint, auth_token, values, vim_config):
+ def configure_alarm(self, alarm_endpoint, auth_token, values, vim_config, verify_ssl):
"""Create requested alarm in Aodh."""
url = "{}/v2/alarms/".format(alarm_endpoint)
payload = self.check_payload(values, metric_name, resource_id,
alarm_name)
new_alarm = Common.perform_request(
- url, auth_token, req_type="post", payload=payload)
+ url, auth_token, req_type="post", payload=payload, verify_ssl=verify_ssl)
return json.loads(new_alarm.text)['alarm_id']
- def delete_alarm(self, endpoint, auth_token, alarm_id):
+ def delete_alarm(self, endpoint, auth_token, alarm_id, verify_ssl):
"""Delete alarm function."""
url = "{}/v2/alarms/%s".format(endpoint) % alarm_id
result = Common.perform_request(
- url, auth_token, req_type="delete")
+ url, auth_token, req_type="delete", verify_ssl=verify_ssl)
if str(result.status_code) == "404":
raise ValueError("Alarm {} doesn't exist".format(alarm_id))
- def list_alarms(self, endpoint, auth_token, list_details):
+ def list_alarms(self, endpoint, auth_token, list_details, verify_ssl):
"""Generate the requested list of alarms."""
url = "{}/v2/alarms/".format(endpoint)
a_list, name_list, sev_list, res_list = [], [], [], []
# Perform the request to get the desired list
try:
result = Common.perform_request(
- url, auth_token, req_type="get")
+ url, auth_token, req_type="get", verify_ssl=verify_ssl)
if result is not None:
# Get list based on resource id
log.exception("Failed to generate alarm list: ")
raise e
- def update_alarm_state(self, endpoint, auth_token, alarm_id):
+ def update_alarm_state(self, endpoint, auth_token, alarm_id, verify_ssl):
"""Set the state of an alarm to ok when ack message is received."""
url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
payload = json.dumps("ok")
Common.perform_request(
- url, auth_token, req_type="put", payload=payload)
+ url, auth_token, req_type="put", payload=payload, verify_ssl=verify_ssl)
- def update_alarm(self, endpoint, auth_token, values, vim_config):
+ def update_alarm(self, endpoint, auth_token, values, vim_config, verify_ssl):
"""Get alarm name for an alarm configuration update."""
# Get already existing alarm details
url = "{}/v2/alarms/%s".format(endpoint) % values['alarm_uuid']
# Updates the alarm configurations with the valid payload
update_alarm = Common.perform_request(
- url, auth_token, req_type="put", payload=payload)
+ url, auth_token, req_type="put", payload=payload, verify_ssl=verify_ssl)
return json.loads(update_alarm.text)['alarm_id']
url, auth_token, req_type="get")
return json.loads(alarm_state.text)
- def check_for_metric(self, auth_token, metric_endpoint, metric_name, resource_id):
+ def check_for_metric(self, auth_token, metric_endpoint, metric_name, resource_id, verify_ssl):
"""
Checks if resource has a specific metric. If not, throws exception.
:param auth_token: OpenStack auth token
try:
url = "{}/v1/resource/generic/{}".format(metric_endpoint, resource_id)
result = Common.perform_request(
- url, auth_token, req_type="get")
+ url, auth_token, req_type="get", verify_ssl=verify_ssl)
resource = json.loads(result.text)
metrics_dict = resource['metrics']
return metrics_dict[METRIC_MAPPINGS[metric_name]]
self.alarming.alarming(message, 'test_id')
- get_token.assert_called_with('test_id')
- get_endpoint.assert_any_call('alarming', 'test_id')
+ get_token.assert_called_with('test_id', verify_ssl=True)
+ get_endpoint.assert_any_call('alarming', 'test_id', verify_ssl=True)
@mock.patch.object(Common, 'get_endpoint', mock.Mock())
@mock.patch.object(Common, 'get_auth_token', mock.Mock())
# Call the alarming functionality and check delete request
self.alarming.alarming(message, 'test_id')
- del_alarm.assert_called_with(mock.ANY, mock.ANY, 'my_alarm_id')
+ del_alarm.assert_called_with(mock.ANY, mock.ANY, 'my_alarm_id', True)
@mock.patch.object(Common, 'get_endpoint', mock.Mock())
@mock.patch.object(Common, 'get_auth_token', mock.Mock())
# Call the alarming functionality and check list functionality
self.alarming.alarming(message, 'test_id')
- list_alarm.assert_called_with(mock.ANY, mock.ANY, {'correlation_id': 1})
+ list_alarm.assert_called_with(mock.ANY, mock.ANY, {'correlation_id': 1}, True)
@mock.patch.object(Common, 'get_auth_token', mock.Mock())
@mock.patch.object(Common, 'get_endpoint', mock.Mock())
# Call alarming functionality and check acknowledge functionality
self.alarming.alarming(message, 'test_id')
- ack_alarm.assert_called_with(mock.ANY, mock.ANY, 'my_alarm_id')
+ ack_alarm.assert_called_with(mock.ANY, mock.ANY, 'my_alarm_id', True)
@mock.patch.object(Common, 'get_auth_token', mock.Mock())
@mock.patch.object(Common, 'get_endpoint', mock.Mock())
'metric_name': 'cpu_utilization',
'vdu_name': 'vdu',
'vnf_member_index': '1', 'ns_id': '1',
- 'resource_uuid': '123'}, {})
+ 'resource_uuid': '123'}, {}, True)
"metric_name": "my_metric",
"resource_uuid": "my_r_id"}
with self.assertRaises(KeyError):
- self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {})
+ self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {}, True)
perf_req.assert_not_called()
perf_req.reset_mock()
"resource_uuid": "my_r_id"}
with self.assertRaises(KeyError):
- self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {})
+ self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {}, True)
perf_req.assert_not_called()
@mock.patch.object(Common, "perform_request")
perf_req.return_value = type('obj', (object,), {'text': '{"alarm_id":"1"}'})
- self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {})
+ self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {}, True)
payload = {"name": "disk_write_ops",
"gnocchi_resources_threshold_rule": {"resource_type": "generic", "comparison_operator": "gt",
"granularity": "300", "metric": "disk.write.requests",
"severity": "critical"}
perf_req.assert_called_with(
"alarm_endpoint/v2/alarms/", auth_token,
- req_type="post", payload=json.dumps(payload, sort_keys=True))
+ req_type="post", payload=json.dumps(payload, sort_keys=True), verify_ssl=True)
@mock.patch.object(Common, "perform_request")
def test_delete_alarm_req(self, perf_req):
"""Test delete alarm request."""
- self.alarming.delete_alarm(alarm_endpoint, auth_token, "my_alarm_id")
+ self.alarming.delete_alarm(alarm_endpoint, auth_token, "my_alarm_id", True)
perf_req.assert_called_with(
- "alarm_endpoint/v2/alarms/my_alarm_id", auth_token, req_type="delete")
+ "alarm_endpoint/v2/alarms/my_alarm_id", auth_token, req_type="delete", verify_ssl=True)
@mock.patch.object(Common, "perform_request")
def test_invalid_list_alarm_req(self, perf_req):
# Request will not be performed without a resource_id
list_details = {"mock_details": "invalid_details"}
with self.assertRaises(KeyError):
- self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
+ self.alarming.list_alarms(alarm_endpoint, auth_token, list_details, True)
perf_req.assert_not_called()
@mock.patch.object(Common, "perform_request")
perf_req.return_value = type('obj', (object,),
{'text': json.dumps(mock_perf_req_return_value)})
- alarm_list = self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
+ alarm_list = self.alarming.list_alarms(alarm_endpoint, auth_token, list_details, True)
self.assertDictEqual(alarm_list[0], mock_perf_req_return_value[0])
perf_req.assert_called_with(
- "alarm_endpoint/v2/alarms/", auth_token, req_type="get")
+ "alarm_endpoint/v2/alarms/", auth_token, req_type="get", verify_ssl=True)
perf_req.reset_mock()
# Check list with alarm_name defined
list_details = {"resource_uuid": "mock_r_id",
"alarm_name": "mock_alarm",
"severity": "critical"}
- alarm_list = self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
+ alarm_list = self.alarming.list_alarms(alarm_endpoint, auth_token, list_details, True)
self.assertDictEqual(alarm_list[0], mock_perf_req_return_value[0])
perf_req.assert_called_with(
- "alarm_endpoint/v2/alarms/", auth_token, req_type="get")
+ "alarm_endpoint/v2/alarms/", auth_token, req_type="get", verify_ssl=True)
@mock.patch.object(Common, "perform_request")
def test_ack_alarm_req(self, perf_req):
"""Test update alarm state for acknowledge alarm request."""
- self.alarming.update_alarm_state(alarm_endpoint, auth_token, "my_alarm_id")
+ self.alarming.update_alarm_state(alarm_endpoint, auth_token, "my_alarm_id", True)
perf_req.assert_called_with(
"alarm_endpoint/v2/alarms/my_alarm_id/state", auth_token, req_type="put",
- payload=json.dumps("ok"))
+ payload=json.dumps("ok"), verify_ssl=True)
@mock.patch.object(Common, "perform_request")
def test_update_alarm_invalid(self, perf_req):
perf_req.return_value = type('obj', (object,), {'invalid_prop': 'Invalid response'})
with self.assertRaises(Exception):
- self.alarming.update_alarm(alarm_endpoint, auth_token, values, {})
+ self.alarming.update_alarm(alarm_endpoint, auth_token, values, {}, True)
perf_req.assert_called_with(mock.ANY, auth_token, req_type="get")
@mock.patch.object(Common, "perform_request")
values = {"alarm_uuid": "my_alarm_id"}
with self.assertRaises(Exception):
- self.alarming.update_alarm(alarm_endpoint, auth_token, values, {})
+ self.alarming.update_alarm(alarm_endpoint, auth_token, values, {}, True)
perf_req.assert_called_with(mock.ANY, auth_token, req_type="get")
self.assertEqual(perf_req.call_count, 1)
perf_req.return_value = resp
values = {"alarm_uuid": "my_alarm_id"}
- self.alarming.update_alarm(alarm_endpoint, auth_token, values, {})
+ self.alarming.update_alarm(alarm_endpoint, auth_token, values, {}, True)
check_pay.assert_called_with(values, "disk_write_ops", "my_resource_id",
"my_alarm", alarm_state="alarm")
# Second call is the update request
perf_req.assert_called_with(
'alarm_endpoint/v2/alarms/my_alarm_id', auth_token,
- req_type="put", payload=check_pay.return_value)
+ req_type="put", payload=check_pay.return_value, verify_ssl=True)
@mock.patch.object(Config, "instance")
def test_check_valid_payload(self, cfg):
mock_perf_req_return_value = {"metrics": {"cpu_util": 123}}
perf_req.return_value = type('obj', (object,), {'text': json.dumps(mock_perf_req_return_value)})
- self.alarming.check_for_metric(auth_token, metric_endpoint, "cpu_utilization", "r_id")
+ self.alarming.check_for_metric(auth_token, metric_endpoint, "cpu_utilization", "r_id", True)
perf_req.assert_called_with(
- "metric_endpoint/v1/resource/generic/r_id", auth_token, req_type="get")
+ "metric_endpoint/v1/resource/generic/r_id", auth_token, req_type="get", verify_ssl=True)