def __init__(self):
"""Create the OpenStack alarming instance."""
- # Initialize configuration and notifications
- config = Config.instance()
- config.read_environ()
-
self._database_manager = DatabaseManager()
self._auth_manager = AuthManager()
# Initializer a producer to send responses back to SO
self._producer = KafkaProducer("alarm_response")
- def configure_alarm(self, alarm_endpoint, metric_endpoint, auth_token, values, vim_config):
- """Create requested alarm in Aodh."""
- url = "{}/v2/alarms/".format(alarm_endpoint)
-
- # Check if the desired alarm is supported
- alarm_name = values['alarm_name'].lower()
- metric_name = values['metric_name'].lower()
- resource_id = values['resource_uuid']
-
- if metric_name not in METRIC_MAPPINGS.keys():
- log.warning("This metric is not supported.")
- return None, False
-
- # Check for the required metric
- metric_id = self.check_for_metric(auth_token, metric_endpoint, metric_name, resource_id)
-
- try:
- if metric_id is not None:
- # Create the alarm if metric is available
- if 'granularity' in vim_config and 'granularity' not in values:
- values['granularity'] = vim_config['granularity']
- payload = self.check_payload(values, metric_name, resource_id,
- alarm_name)
- new_alarm = Common.perform_request(
- url, auth_token, req_type="post", payload=payload)
- return json.loads(new_alarm.text)['alarm_id'], True
- else:
- log.warning("The required Gnocchi metric does not exist.")
- return None, False
-
- except Exception as exc:
- log.warning("Failed to create the alarm: %s", exc)
- return None, False
-
def alarming(self, message, vim_uuid):
- """Consume info from the message bus to manage alarms."""
+ """
+ Processes alarm request message depending on it's key
+ :param message: Message containing key and value attributes. This last one can be in JSON or YAML format.
+ :param vim_uuid: UUID of the VIM to handle the alarm request.
+ :return:
+ """
try:
values = json.loads(message.value)
except ValueError:
vim_config = json.loads(vim_account.config)
if message.key == "create_alarm_request":
- # Configure/Update an alarm
alarm_details = values['alarm_create_request']
+ alarm_id = None
+ status = False
+ try:
+ metric_name = alarm_details['metric_name'].lower()
+ resource_id = alarm_details['resource_uuid']
+
+ self.check_for_metric(auth_token, metric_endpoint, metric_name, resource_id)
- alarm_id, alarm_status = self.configure_alarm(
- alarm_endpoint, metric_endpoint, auth_token, alarm_details, vim_config)
+ alarm_id = self.configure_alarm(
+ alarm_endpoint, auth_token, alarm_details, vim_config)
- # Generate a valid response message, send via producer
- if alarm_status is True:
log.info("Alarm successfully created")
self._database_manager.save_alarm(alarm_id,
vim_uuid,
alarm_details['vnf_member_index'].lower(),
alarm_details['ns_id'].lower()
)
- try:
- resp_message = self._response.generate_response(
- 'create_alarm_response', status=alarm_status,
- alarm_id=alarm_id,
- cor_id=alarm_details['correlation_id'])
- log.info("Response Message: %s", resp_message)
- self._producer.create_alarm_response(
- 'create_alarm_response', resp_message)
- except Exception:
- log.exception("Response creation failed:")
+ status = True
+ except Exception as e:
+ log.exception("Error creating alarm")
+ raise e
+ finally:
+ self._generate_and_send_response('create_alarm_response',
+ alarm_details['correlation_id'],
+ status=status,
+ alarm_id=alarm_id)
elif message.key == "list_alarm_request":
- # Check for a specified: alarm_name, resource_uuid, severity
- # and generate the appropriate list
list_details = values['alarm_list_request']
-
- alarm_list = self.list_alarms(
- alarm_endpoint, auth_token, list_details)
-
+ alarm_list = None
try:
- # Generate and send a list response back
- resp_message = self._response.generate_response(
- 'list_alarm_response', alarm_list=alarm_list,
- cor_id=list_details['correlation_id'])
- log.info("Response Message: %s", resp_message)
- self._producer.list_alarm_response(
- 'list_alarm_response', resp_message)
- except Exception:
- log.exception("Failed to send a valid response back.")
+ alarm_list = self.list_alarms(
+ alarm_endpoint, auth_token, list_details)
+ except Exception as e:
+ log.exception("Error listing alarms")
+ raise e
+ finally:
+ self._generate_and_send_response('list_alarm_response',
+ list_details['correlation_id'],
+ alarm_list=alarm_list)
elif message.key == "delete_alarm_request":
request_details = values['alarm_delete_request']
alarm_id = request_details['alarm_uuid']
-
- resp_status = self.delete_alarm(
- alarm_endpoint, auth_token, alarm_id)
-
- # Generate and send a response message
+ status = False
try:
- resp_message = self._response.generate_response(
- 'delete_alarm_response', alarm_id=alarm_id,
- status=resp_status,
- cor_id=request_details['correlation_id'])
- log.info("Response message: %s", resp_message)
- self._producer.delete_alarm_response(
- 'delete_alarm_response', resp_message)
- except Exception:
- log.exception("Failed to create delete response: ")
+ self.delete_alarm(
+ alarm_endpoint, auth_token, alarm_id)
+ status = True
+ except Exception as e:
+ log.exception("Error deleting alarm")
+ raise e
+ finally:
+ self._generate_and_send_response('delete_alarm_response',
+ request_details['correlation_id'],
+ status=status,
+ alarm_id=alarm_id)
elif message.key == "acknowledge_alarm":
- # Acknowledge that an alarm has been dealt with by the SO
- alarm_id = values['ack_details']['alarm_uuid']
+ try:
+ alarm_id = values['ack_details']['alarm_uuid']
- response = self.update_alarm_state(
- alarm_endpoint, auth_token, alarm_id)
+ self.update_alarm_state(
+ alarm_endpoint, auth_token, alarm_id)
- # Log if an alarm was reset
- if response is True:
log.info("Acknowledged the alarm and cleared it.")
- else:
- log.warning("Failed to acknowledge/clear the alarm.")
+ except Exception as e:
+ log.exception("Error acknowledging alarm")
+ raise e
elif message.key == "update_alarm_request":
# Update alarm configurations
alarm_details = values['alarm_update_request']
-
- alarm_id, status = self.update_alarm(
- alarm_endpoint, auth_token, alarm_details, vim_config)
-
- # Generate a response for an update request
+ alarm_id = None
+ status = False
try:
- resp_message = self._response.generate_response(
- 'update_alarm_response', alarm_id=alarm_id,
- cor_id=alarm_details['correlation_id'],
- status=status)
- log.info("Response message: %s", resp_message)
- self._producer.update_alarm_response(
- 'update_alarm_response', resp_message)
- except Exception:
- log.exception("Failed to send an update response: ")
+ alarm_id = self.update_alarm(
+ alarm_endpoint, auth_token, alarm_details, vim_config)
+ status = True
+ except Exception as e:
+ log.exception("Error updating alarm")
+ raise e
+ finally:
+ self._generate_and_send_response('create_alarm_response',
+ alarm_details['correlation_id'],
+ status=status,
+ alarm_id=alarm_id)
else:
log.debug("Unknown key, no action will be performed")
- return
+ def configure_alarm(self, alarm_endpoint, auth_token, values, vim_config):
+ """Create requested alarm in Aodh."""
+ url = "{}/v2/alarms/".format(alarm_endpoint)
+
+ # Check if the desired alarm is supported
+ alarm_name = values['alarm_name'].lower()
+ metric_name = values['metric_name'].lower()
+ resource_id = values['resource_uuid']
+
+ if metric_name not in METRIC_MAPPINGS.keys():
+ raise KeyError("Metric {} is not supported.".format(metric_name))
+
+ if 'granularity' in vim_config and 'granularity' not in values:
+ values['granularity'] = vim_config['granularity']
+ payload = self.check_payload(values, metric_name, resource_id,
+ alarm_name)
+ new_alarm = Common.perform_request(
+ url, auth_token, req_type="post", payload=payload)
+ return json.loads(new_alarm.text)['alarm_id']
def delete_alarm(self, endpoint, auth_token, alarm_id):
"""Delete alarm function."""
url = "{}/v2/alarms/%s".format(endpoint) % alarm_id
- try:
- result = Common.perform_request(
- url, auth_token, req_type="delete")
- if str(result.status_code) == "404":
- log.info("Alarm doesn't exist: %s", result.status_code)
- # If status code is 404 alarm did not exist
- return False
- else:
- return True
-
- except Exception:
- log.exception("Failed to delete alarm %s :", alarm_id)
- return False
+ result = Common.perform_request(
+ url, auth_token, req_type="delete")
+ if str(result.status_code) == "404":
+ raise ValueError("Alarm {} doesn't exist".format(alarm_id))
def list_alarms(self, endpoint, auth_token, list_details):
"""Generate the requested list of alarms."""
# Check for a resource id
try:
resource = list_details['resource_uuid']
- except KeyError as exc:
- log.warning("Resource id not specified for list request: %s", exc)
- return None
-
- # Checking what fields are specified for a list request
- try:
name = list_details['alarm_name'].lower()
- except KeyError as exc:
- log.info("Alarm name isn't specified.")
- name = None
-
- try:
severity = list_details['severity'].lower()
sev = SEVERITIES[severity]
- except KeyError as exc:
- log.info("Severity is unspecified/incorrectly configured")
- sev = None
+ except KeyError as e:
+ log.warning("Missing parameter for alarm list request: %s", e)
+ raise e
# Perform the request to get the desired list
try:
for alarm in json.loads(result.text):
rule = alarm['gnocchi_resources_threshold_rule']
if resource == rule['resource_id']:
- res_list.append(alarm)
- if not res_list:
- log.info("No alarms for this resource")
- return a_list
+ res_list.append(alarm['alarm_id'])
# Generate specified listed if requested
if name is not None and sev is not None:
name, sev)
for alarm in json.loads(result.text):
if name == alarm['name']:
- name_list.append(alarm)
+ name_list.append(alarm['alarm_id'])
for alarm in json.loads(result.text):
if sev == alarm['severity']:
- sev_list.append(alarm)
+ sev_list.append(alarm['alarm_id'])
name_sev_list = list(set(name_list).intersection(sev_list))
a_list = list(set(name_sev_list).intersection(res_list))
elif name is not None:
log.info("Returning a %s list of alarms.", name)
for alarm in json.loads(result.text):
if name == alarm['name']:
- name_list.append(alarm)
+ name_list.append(alarm['alarm_id'])
a_list = list(set(name_list).intersection(res_list))
elif sev is not None:
log.info("Returning %s severity alarm list.", sev)
for alarm in json.loads(result.text):
if sev == alarm['severity']:
- sev_list.append(alarm)
+ sev_list.append(alarm['alarm_id'])
a_list = list(set(sev_list).intersection(res_list))
else:
log.info("Returning an entire list of alarms.")
a_list = res_list
else:
log.info("There are no alarms!")
+ response_list = []
+ for alarm in json.loads(result.text):
+ if alarm['alarm_id'] in a_list:
+ response_list.append(alarm)
+ return response_list
- except Exception as exc:
- log.info("Failed to generate required list: %s", exc)
- return None
-
- return a_list
+ except Exception as e:
+ log.exception("Failed to generate alarm list: ")
+ raise e
def update_alarm_state(self, endpoint, auth_token, alarm_id):
"""Set the state of an alarm to ok when ack message is received."""
url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
payload = json.dumps("ok")
- try:
- Common.perform_request(
- url, auth_token, req_type="put", payload=payload)
- return True
- except Exception:
- log.exception("Unable to update alarm state: ")
- return False
+ Common.perform_request(
+ url, auth_token, req_type="put", payload=payload)
def update_alarm(self, endpoint, auth_token, values, vim_config):
"""Get alarm name for an alarm configuration update."""
url = "{}/v2/alarms/%s".format(endpoint) % values['alarm_uuid']
# Gets current configurations about the alarm
- try:
- result = Common.perform_request(
- url, auth_token, req_type="get")
- alarm_name = json.loads(result.text)['name']
- rule = json.loads(result.text)['gnocchi_resources_threshold_rule']
- alarm_state = json.loads(result.text)['state']
- resource_id = rule['resource_id']
- metric_name = [key for key, value in six.iteritems(METRIC_MAPPINGS) if value == rule['metric']][0]
- except Exception as exc:
- log.exception("Failed to retrieve existing alarm info. Can only update OSM alarms.")
- return None, False
+ result = Common.perform_request(
+ url, auth_token, req_type="get")
+ alarm_name = json.loads(result.text)['name']
+ rule = json.loads(result.text)['gnocchi_resources_threshold_rule']
+ alarm_state = json.loads(result.text)['state']
+ resource_id = rule['resource_id']
+ metric_name = [key for key, value in six.iteritems(METRIC_MAPPINGS) if value == rule['metric']][0]
# Generates and check payload configuration for alarm update
if 'granularity' in vim_config and 'granularity' not in values:
alarm_name, alarm_state=alarm_state)
# Updates the alarm configurations with the valid payload
- if payload is not None:
- try:
- update_alarm = Common.perform_request(
- url, auth_token, req_type="put", payload=payload)
+ update_alarm = Common.perform_request(
+ url, auth_token, req_type="put", payload=payload)
- return json.loads(update_alarm.text)['alarm_id'], True
- except Exception as exc:
- log.exception("Alarm update could not be performed: ")
- return None, False
+ return json.loads(update_alarm.text)['alarm_id']
def check_payload(self, values, metric_name, resource_id,
alarm_name, alarm_state=None):
"""Check that the payload is configuration for update/create alarm."""
- try:
- cfg = Config.instance()
- # Check state and severity
-
- severity = 'critical'
- if 'severity' in values:
- severity = values['severity'].lower()
-
- if severity == "indeterminate":
- alarm_state = "insufficient data"
- if alarm_state is None:
- alarm_state = "ok"
-
- statistic = values['statistic'].lower()
-
- granularity = cfg.OS_DEFAULT_GRANULARITY
- if 'granularity' in values:
- granularity = values['granularity']
-
- resource_type = 'generic'
- if 'resource_type' in values:
- resource_type = values['resource_type'].lower()
-
- # Try to configure the payload for the update/create request
- # Can only update: threshold, operation, statistic and
- # the severity of the alarm
- rule = {'threshold': values['threshold_value'],
- 'comparison_operator': values['operation'].lower(),
- 'metric': METRIC_MAPPINGS[metric_name],
- 'resource_id': resource_id,
- 'resource_type': resource_type,
- 'aggregation_method': STATISTICS[statistic],
- 'granularity': granularity, }
- payload = json.dumps({'state': alarm_state,
- 'name': alarm_name,
- 'severity': SEVERITIES[severity],
- 'type': 'gnocchi_resources_threshold',
- 'gnocchi_resources_threshold_rule': rule,
- 'alarm_actions': [cfg.OS_NOTIFIER_URI], })
- return payload
- except KeyError as exc:
- log.warning("Alarm is not configured correctly: %s", exc)
- return None
+ cfg = Config.instance()
+ # Check state and severity
+
+ severity = 'critical'
+ if 'severity' in values:
+ severity = values['severity'].lower()
+
+ if severity == "indeterminate":
+ alarm_state = "insufficient data"
+ if alarm_state is None:
+ alarm_state = "ok"
+
+ statistic = values['statistic'].lower()
+
+ granularity = cfg.OS_DEFAULT_GRANULARITY
+ if 'granularity' in values:
+ granularity = values['granularity']
+
+ resource_type = 'generic'
+ if 'resource_type' in values:
+ resource_type = values['resource_type'].lower()
+
+ # Try to configure the payload for the update/create request
+ # Can only update: threshold, operation, statistic and
+ # the severity of the alarm
+ rule = {'threshold': values['threshold_value'],
+ 'comparison_operator': values['operation'].lower(),
+ 'metric': METRIC_MAPPINGS[metric_name],
+ 'resource_id': resource_id,
+ 'resource_type': resource_type,
+ 'aggregation_method': STATISTICS[statistic],
+ 'granularity': granularity, }
+ payload = json.dumps({'state': alarm_state,
+ 'name': alarm_name,
+ 'severity': SEVERITIES[severity],
+ 'type': 'gnocchi_resources_threshold',
+ 'gnocchi_resources_threshold_rule': rule,
+ 'alarm_actions': [cfg.OS_NOTIFIER_URI], }, sort_keys=True)
+ return payload
def get_alarm_state(self, endpoint, auth_token, alarm_id):
"""Get the state of the alarm."""
url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
+ alarm_state = Common.perform_request(
+ url, auth_token, req_type="get")
+ return json.loads(alarm_state.text)
+
+ def check_for_metric(self, auth_token, metric_endpoint, metric_name, resource_id):
+ """
+ Checks if resource has a specific metric. If not, throws exception.
+ :param auth_token: OpenStack auth token
+ :param metric_endpoint: OpenStack metric endpoint
+ :param metric_name: Metric name
+ :param resource_id: Resource UUID
+ :return: Metric details from resource
+ :raise Exception: Could not retrieve metric from resource
+ """
try:
- alarm_state = Common.perform_request(
- url, auth_token, req_type="get")
- return json.loads(alarm_state.text)
- except Exception as exc:
- log.warning("Failed to get the state of the alarm:%s", exc)
- return None
-
- def check_for_metric(self, auth_token, metric_endpoint, m_name, r_id):
- """Check for the alarm metric."""
- try:
- url = "{}/v1/resource/generic/{}".format(metric_endpoint, r_id)
+ url = "{}/v1/resource/generic/{}".format(metric_endpoint, resource_id)
result = Common.perform_request(
url, auth_token, req_type="get")
resource = json.loads(result.text)
- metric_list = resource['metrics']
- if metric_list.get(METRIC_MAPPINGS[m_name]):
- metric_id = metric_list[METRIC_MAPPINGS[m_name]]
- else:
- metric_id = None
- log.info("Desired Gnocchi metric not found")
- return metric_id
- except Exception as exc:
- log.info("Desired Gnocchi metric not found:%s", exc)
- return None
\ No newline at end of file
+ metrics_dict = resource['metrics']
+ return metrics_dict[METRIC_MAPPINGS[metric_name]]
+ except Exception as e:
+ log.exception("Desired Gnocchi metric not found:", e)
+ raise e
+
+ def _generate_and_send_response(self, topic, correlation_id, **kwargs):
+ try:
+ resp_message = self._response.generate_response(
+ topic, cor_id=correlation_id, **kwargs)
+ log.info("Response Message: %s", resp_message)
+ self._producer.create_alarm_response(
+ topic, resp_message)
+ except Exception as e:
+ log.exception("Response creation failed:")
+ raise e
from osm_mon.core.message_bus.producer import KafkaProducer
from osm_mon.plugins.OpenStack.response import OpenStack_Response
-from osm_mon.core.settings import Config
class NotifierHandler(BaseHTTPRequestHandler):
# Gets the size of data
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
+ # Python 2/3 string compatibility
try:
post_data = post_data.decode()
except AttributeError:
pass
- log.info("This alarm was triggered: %s", json.loads(post_data))
+ log.info("This alarm was triggered: %s", json.dumps(post_data))
- # Generate a notify_alarm response for the SO
- self.notify_alarm(json.loads(post_data))
+ # Send alarm notification to message bus
+ try:
+ self.notify_alarm(json.dumps(post_data))
+ except Exception:
+ log.exception("Error notifying alarm")
def notify_alarm(self, values):
- """Send a notification response message to the SO."""
-
- try:
- # Initialise configuration and authentication for response message
- config = Config.instance()
- config.read_environ()
- response = OpenStack_Response()
- producer = KafkaProducer('alarm_response')
-
- database_manager = DatabaseManager()
-
- alarm_id = values['alarm_id']
- alarm = database_manager.get_alarm(alarm_id, 'openstack')
- # Process an alarm notification if resource_id is valid
- # Get date and time for response message
- a_date = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
- # Try generate and send response
- try:
- resp_message = response.generate_response(
- 'notify_alarm', a_id=alarm_id,
- vdu_name=alarm.vdu_name,
- vnf_member_index=alarm.vnf_member_index,
- ns_id=alarm.ns_id,
- metric_name=alarm.metric_name,
- operation=alarm.operation,
- threshold_value=alarm.threshold,
- sev=values['severity'],
- date=a_date,
- state=values['current'])
- producer.notify_alarm(
- 'notify_alarm', resp_message)
- log.info("Sent an alarm response to SO: %s", resp_message)
- except Exception as exc:
- log.exception("Couldn't notify SO of the alarm:")
-
- except:
- log.exception("Could not notify alarm.")
+ """Sends alarm notification message to bus."""
+
+ # Initialise configuration and authentication for response message
+ response = OpenStack_Response()
+ producer = KafkaProducer('alarm_response')
+
+ database_manager = DatabaseManager()
+
+ alarm_id = values['alarm_id']
+ alarm = database_manager.get_alarm(alarm_id, 'openstack')
+ # Process an alarm notification if resource_id is valid
+ # Get date and time for response message
+ a_date = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
+ # Generate and send response
+ resp_message = response.generate_response(
+ 'notify_alarm',
+ a_id=alarm_id,
+ vdu_name=alarm.vdu_name,
+ vnf_member_index=alarm.vnf_member_index,
+ ns_id=alarm.ns_id,
+ metric_name=alarm.metric_name,
+ operation=alarm.operation,
+ threshold_value=alarm.threshold,
+ sev=values['severity'],
+ date=a_date,
+ state=values['current'])
+ producer.notify_alarm(
+ 'notify_alarm', resp_message)
+ log.info("Sent alarm notification: %s", resp_message)
def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
class OpenStack_Response(object):
- """Generates responses for SO from OpenStaack plugins."""
+ """Generates responses for OpenStack plugin."""
def __init__(self):
"""Initialize OpenStack Response instance."""
@mock.patch.object(Common, 'get_auth_token', mock.Mock())
@mock.patch.object(Common, 'get_endpoint', mock.Mock())
@mock.patch.object(DatabaseManager, 'save_alarm', mock.Mock())
+ @mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(alarm_req.Alarming, 'configure_alarm')
- def test_config_alarm_key(self, config_alarm, get_creds):
+ def test_config_alarm_key(self, config_alarm, get_creds, perf_req):
"""Test the functionality for a create alarm request."""
# Mock a message with config alarm key and value
message = Message()
'operation': 'GT', 'metric_name': 'cpu_utilization',
'vdu_name': 'vdu',
'vnf_member_index': '1',
- 'ns_id': '1'}})
-
+ 'ns_id': '1',
+ 'resource_uuid': '123'}})
+ mock_perf_req_return_value = {"metrics": {"cpu_util": 123}}
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps(mock_perf_req_return_value, sort_keys=True)})
get_creds.return_value = mock_creds
# Call alarming functionality and check config alarm call
- config_alarm.return_value = 'my_alarm_id', True
+ config_alarm.return_value = 'my_alarm_id'
self.alarming.alarming(message, 'test_id')
- config_alarm.assert_called_with(mock.ANY, mock.ANY, mock.ANY, {'correlation_id': 1, 'threshold_value': 50,
- 'operation': 'GT',
- 'metric_name': 'cpu_utilization',
- 'vdu_name': 'vdu',
- 'vnf_member_index': '1', 'ns_id': '1'}, {})
+ config_alarm.assert_called_with(mock.ANY, mock.ANY, {'correlation_id': 1, 'threshold_value': 50,
+ 'operation': 'GT',
+ 'metric_name': 'cpu_utilization',
+ 'vdu_name': 'vdu',
+ 'vnf_member_index': '1', 'ns_id': '1',
+ 'resource_uuid': '123'}, {})
super(TestAlarming, self).setUp()
self.alarming = alarm_req.Alarming()
- @mock.patch.object(alarm_req.Alarming, "check_payload")
- @mock.patch.object(alarm_req.Alarming, "check_for_metric")
@mock.patch.object(Common, "perform_request")
- def test_config_invalid_alarm_req(self, perf_req, check_metric, check_pay):
+ def test_config_invalid_alarm_req(self, perf_req):
"""Test configure an invalid alarm request."""
- # Configuring with invalid alarm name results in failure
+ # Configuring with invalid metric name results in failure
values = {"alarm_name": "my_alarm",
"metric_name": "my_metric",
"resource_uuid": "my_r_id"}
- self.alarming.configure_alarm(alarm_endpoint, metric_endpoint, auth_token, values, {})
+ with self.assertRaises(KeyError):
+ self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {})
perf_req.assert_not_called()
perf_req.reset_mock()
- # Correct alarm_name will check for metric in Gnocchi
- # If there isn't one an alarm won;t be created
+ # Configuring with missing metric name results in failure
values = {"alarm_name": "disk_write_ops",
- "metric_name": "disk_write_ops",
"resource_uuid": "my_r_id"}
- check_metric.return_value = None
-
- self.alarming.configure_alarm(alarm_endpoint, metric_endpoint, auth_token, values, {})
+ with self.assertRaises(KeyError):
+ self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {})
perf_req.assert_not_called()
- @mock.patch.object(alarm_req.Alarming, "check_payload")
- @mock.patch.object(alarm_req.Alarming, "check_for_metric")
@mock.patch.object(Common, "perform_request")
- def test_config_valid_alarm_req(self, perf_req, check_metric, check_pay):
+ def test_config_valid_alarm_req(self, perf_req):
"""Test config a valid alarm."""
- # Correct alarm_name will check for metric in Gnocchi
- # And conform that the payload is configured correctly
values = {"alarm_name": "disk_write_ops",
"metric_name": "disk_write_ops",
- "resource_uuid": "my_r_id"}
-
- check_metric.return_value = "my_metric_id"
- check_pay.return_value = "my_payload"
+ "resource_uuid": "my_r_id",
+ "statistic": "AVERAGE",
+ "threshold_value": 60,
+ "operation": "GT"}
perf_req.return_value = type('obj', (object,), {'text': '{"alarm_id":"1"}'})
- self.alarming.configure_alarm(alarm_endpoint, metric_endpoint, auth_token, values, {})
+ self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {})
+ payload = {"name": "disk_write_ops",
+ "gnocchi_resources_threshold_rule": {"resource_type": "generic", "comparison_operator": "gt",
+ "granularity": "300", "metric": "disk.write.requests",
+ "aggregation_method": "mean", "threshold": 60,
+ "resource_id": "my_r_id"},
+ "alarm_actions": ["http://localhost:8662"], "state": "ok", "type": "gnocchi_resources_threshold",
+ "severity": "critical"}
perf_req.assert_called_with(
"alarm_endpoint/v2/alarms/", auth_token,
- req_type="post", payload="my_payload")
+ req_type="post", payload=json.dumps(payload, sort_keys=True))
@mock.patch.object(Common, "perform_request")
def test_delete_alarm_req(self, perf_req):
@mock.patch.object(Common, "perform_request")
def test_invalid_list_alarm_req(self, perf_req):
"""Test invalid list alarm_req."""
- # Request will not be performed with out a resoure_id
+ # Request will not be performed without a resource_id
list_details = {"mock_details": "invalid_details"}
- self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
-
+ with self.assertRaises(KeyError):
+ self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
perf_req.assert_not_called()
@mock.patch.object(Common, "perform_request")
def test_valid_list_alarm_req(self, perf_req):
"""Test valid list alarm request."""
# Minimum requirement for an alarm list is resource_id
- list_details = {"resource_uuid": "mock_r_id"}
- self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
+ list_details = {"resource_uuid": "mock_r_id", "alarm_name": "mock_alarm", "severity": "critical"}
+
+ mock_perf_req_return_value = [
+ {"alarm_id": "1", "name": "mock_alarm", "severity": "critical",
+ "gnocchi_resources_threshold_rule": {"resource_id": "mock_r_id"}}]
+ perf_req.return_value = type('obj', (object,),
+ {'text': json.dumps(mock_perf_req_return_value)})
+
+ alarm_list = self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
+
+ self.assertDictEqual(alarm_list[0], mock_perf_req_return_value[0])
perf_req.assert_called_with(
"alarm_endpoint/v2/alarms/", auth_token, req_type="get")
# Check list with alarm_name defined
list_details = {"resource_uuid": "mock_r_id",
- "alarm_name": "my_alarm",
+ "alarm_name": "mock_alarm",
"severity": "critical"}
- self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
+ alarm_list = self.alarming.list_alarms(alarm_endpoint, auth_token, list_details)
+
+ self.assertDictEqual(alarm_list[0], mock_perf_req_return_value[0])
perf_req.assert_called_with(
"alarm_endpoint/v2/alarms/", auth_token, req_type="get")
"alarm_endpoint/v2/alarms/my_alarm_id/state", auth_token, req_type="put",
payload=json.dumps("ok"))
- @mock.patch.object(alarm_req.Alarming, "check_payload")
@mock.patch.object(Common, "perform_request")
- def test_update_alarm_invalid(self, perf_req, check_pay):
+ def test_update_alarm_invalid(self, perf_req):
"""Test update alarm with invalid get response."""
values = {"alarm_uuid": "my_alarm_id"}
perf_req.return_value = type('obj', (object,), {'invalid_prop': 'Invalid response'})
- self.alarming.update_alarm(alarm_endpoint, auth_token, values, {})
-
+ with self.assertRaises(Exception):
+ self.alarming.update_alarm(alarm_endpoint, auth_token, values, {})
perf_req.assert_called_with(mock.ANY, auth_token, req_type="get")
- check_pay.assert_not_called()
- @mock.patch.object(alarm_req.Alarming, "check_payload")
@mock.patch.object(Common, "perform_request")
- def test_update_alarm_invalid_payload(self, perf_req, check_pay):
+ def test_update_alarm_invalid_payload(self, perf_req):
"""Test update alarm with invalid payload."""
resp = Response({"name": "my_alarm",
"state": "alarm",
{"resource_id": "my_resource_id",
"metric": "my_metric"}})
perf_req.return_value = resp
- check_pay.return_value = None
values = {"alarm_uuid": "my_alarm_id"}
- self.alarming.update_alarm(alarm_endpoint, auth_token, values, {})
-
+ with self.assertRaises(Exception):
+ self.alarming.update_alarm(alarm_endpoint, auth_token, values, {})
perf_req.assert_called_with(mock.ANY, auth_token, req_type="get")
self.assertEqual(perf_req.call_count, 1)
def test_check_invalid_payload(self):
"""Test the check payload function for an invalid payload."""
values = {"alarm_values": "mock_invalid_details"}
- payload = self.alarming.check_payload(
- values, "my_metric", "r_id", "alarm_name")
-
- self.assertEqual(payload, None)
+ with self.assertRaises(Exception):
+ self.alarming.check_payload(values, "my_metric", "r_id", "alarm_name")
@mock.patch.object(Common, "perform_request")
def test_get_alarm_state(self, perf_req):
perf_req.assert_called_with(
"alarm_endpoint/v2/alarms/alarm_id/state", auth_token, req_type="get")
- @mock.patch.object(Common, "get_endpoint")
@mock.patch.object(Common, "perform_request")
- def test_check_for_metric(self, perf_req, get_endpoint):
+ def test_check_for_metric(self, perf_req):
"""Test the check for metric function."""
- get_endpoint.return_value = "gnocchi_endpoint"
+ mock_perf_req_return_value = {"metrics": {"cpu_util": 123}}
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps(mock_perf_req_return_value)})
- self.alarming.check_for_metric(auth_token, metric_endpoint, "metric_name", "r_id")
+ self.alarming.check_for_metric(auth_token, metric_endpoint, "cpu_utilization", "r_id")
perf_req.assert_called_with(
"metric_endpoint/v1/resource/generic/r_id", auth_token, req_type="get")
##
"""Tests for all common OpenStack methods."""
-# TODO: Mock database calls. Improve assertions.
-
import json
import unittest
import mock
-from six.moves.BaseHTTPServer import HTTPServer
-
from osm_mon.core.database import DatabaseManager, Alarm
from osm_mon.core.message_bus.producer import KafkaProducer
from osm_mon.plugins.OpenStack.Aodh.notifier import NotifierHandler
-from osm_mon.plugins.OpenStack.common import Common
-from osm_mon.plugins.OpenStack.response import OpenStack_Response
-
-# Mock data from post request
-post_data = json.dumps({"severity": "critical",
- "alarm_name": "my_alarm",
- "current": "current_state",
- "alarm_id": "my_alarm_id",
- "reason": "Threshold has been broken",
- "reason_data": {"count": 1,
- "most_recent": "null",
- "type": "threshold",
- "disposition": "unknown"},
- "previous": "previous_state"})
-
-valid_get_resp = '{"gnocchi_resources_threshold_rule":\
- {"resource_id": "my_resource_id"}}'
-
-invalid_get_resp = '{"gnocchi_resources_threshold_rule":\
- {"resource_id": null}}'
-
-valid_notify_resp = '{"notify_details": {"status": "current_state",\
- "severity": "critical",\
- "resource_uuid": "my_resource_id",\
- "alarm_uuid": "my_alarm_id",\
- "vim_type": "OpenStack",\
- "start_date": "dd-mm-yyyy 00:00"},\
- "schema_version": "1.0",\
- "schema_type": "notify_alarm"}'
-
-invalid_notify_resp = '{"notify_details": {"invalid":"mock_details"}'
+
+post_data = {"severity": "critical",
+ "alarm_name": "my_alarm",
+ "current": "current_state",
+ "alarm_id": "my_alarm_id",
+ "reason": "Threshold has been broken",
+ "reason_data": {"count": 1,
+ "most_recent": "null",
+ "type": "threshold",
+ "disposition": "unknown"},
+ "previous": "previous_state"}
class Response(object):
@mock.patch.object(NotifierHandler, "_set_headers")
def test_do_GET(self, set_head):
- """Test do_GET, generates headers for get request."""
+ """Tests do_GET. Validates _set_headers has been called."""
self.handler.do_GET()
set_head.assert_called_once()
@mock.patch.object(NotifierHandler, "notify_alarm")
@mock.patch.object(NotifierHandler, "_set_headers")
def test_do_POST(self, set_head, notify):
- """Test do_POST functionality for a POST request."""
+ """Tests do_POST. Validates notify_alarm has been called."""
self.handler.do_POST()
set_head.assert_called_once()
- notify.assert_called_with(json.loads(post_data))
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_unauth(self, perf_req, auth, endpoint):
- """Test notify alarm when not authenticated with keystone."""
- # Response request will not be performed unless there is a valid
- # auth_token and endpoint
- # Invalid auth_token and endpoint
- auth.return_value = None
- endpoint.return_value = None
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- # Valid endpoint
- auth.return_value = None
- endpoint.return_value = "my_endpoint"
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- # Valid auth_token
- auth.return_value = "my_auth_token"
- endpoint.return_value = None
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_invalid_alarm(self, perf_req, auth, resp, endpoint):
- """Test valid authentication, invalid alarm details."""
- # Mock valid auth_token and endpoint
- auth.return_value = "my_auth_token"
- endpoint.return_value = "my_endpoint"
- perf_req.return_value = Response(invalid_get_resp)
-
- self.handler.notify_alarm(json.loads(post_data))
-
- # Response is not generated
- resp.assert_not_called()
+ notify.assert_called_with(json.dumps(post_data))
@mock.patch.object(KafkaProducer, "notify_alarm")
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
@mock.patch.object(DatabaseManager, "get_alarm")
- def test_notify_alarm_resp_call(self, get_alarm, perf_req, auth, response, endpoint, notify):
- """Test notify_alarm tries to generate a response for SO."""
- # Mock valid auth token and endpoint, valid response from aodh
- auth.return_value = "my_auth_token"
- endpoint.returm_value = "my_endpoint"
- perf_req.return_value = Response(valid_get_resp)
+ def test_notify_alarm_valid_alarm(
+ self, get_alarm, notify):
+ """
+ Tests notify_alarm when request from OpenStack references an existing alarm in the DB.
+ Validates KafkaProducer.notify_alarm has been called.
+ """
+ # Generate return values for valid notify_alarm operation
mock_alarm = Alarm()
get_alarm.return_value = mock_alarm
- self.handler.notify_alarm(json.loads(post_data))
-
- notify.assert_called()
- response.assert_called_with('notify_alarm', a_id='my_alarm_id', date=mock.ANY, metric_name=None,
- ns_id=None, operation=None, sev='critical', state='current_state',
- threshold_value=None, vdu_name=None, vnf_member_index=None)
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(KafkaProducer, "notify_alarm")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- @unittest.skip("Schema validation not implemented yet.")
- def test_notify_alarm_invalid_resp(
- self, perf_req, auth, response, notify, endpoint):
- """Test the notify_alarm function, sends response to the producer."""
- # Generate return values for valid notify_alarm operation
- auth.return_value = "my_auth_token"
- endpoint.return_value = "my_endpoint"
- perf_req.return_value = Response(valid_get_resp)
- response.return_value = invalid_notify_resp
- self.handler.notify_alarm(json.loads(post_data))
+ self.handler.notify_alarm(post_data)
- notify.assert_not_called()
+ notify.assert_called_with("notify_alarm", mock.ANY)
- @mock.patch.object(Common, "get_endpoint")
@mock.patch.object(KafkaProducer, "notify_alarm")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
@mock.patch.object(DatabaseManager, "get_alarm")
- def test_notify_alarm_valid_resp(
- self, get_alarm, perf_req, auth, response, notify, endpoint):
- """Test the notify_alarm function, sends response to the producer."""
+ def test_notify_alarm_invalid_alarm(
+ self, get_alarm, notify):
+ """
+ Tests notify_alarm when request from OpenStack references a non existing alarm in the DB.
+ Validates Exception is thrown and KafkaProducer.notify_alarm has not been called.
+ """
# Generate return values for valid notify_alarm operation
- auth.return_value = "my_auth_token"
- endpoint.return_value = "my_endpoint"
- perf_req.return_value = Response(valid_get_resp)
- response.return_value = valid_notify_resp
- mock_alarm = Alarm()
- get_alarm.return_value = mock_alarm
- self.handler.notify_alarm(json.loads(post_data))
+ get_alarm.return_value = None
- notify.assert_called_with(
- "notify_alarm", valid_notify_resp)
+ with self.assertRaises(Exception):
+ self.handler.notify_alarm(post_data)
+ notify.assert_not_called()
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
+# a copy of the Lcompletoicense at
# http://www.apache.org/licenses/LICENSE-2.0
import logging
from typing import Dict, List
-import peewee
import yaml
-
from kafka import KafkaConsumer
-from osm_policy_module.core.config import Config
-from osm_policy_module.common.lcm_client import LcmClient
-
from osm_policy_module.common.alarm_config import AlarmConfig
+from osm_policy_module.common.lcm_client import LcmClient
from osm_policy_module.common.mon_client import MonClient
+from osm_policy_module.core.config import Config
from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
log = logging.getLogger(__name__)