From 0e57d11ebb85637f38dd92a791abd6fe4889a565 Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Sun, 25 Mar 2018 14:43:52 -0300 Subject: [PATCH] Minor bugs fix Also-by: gcalvino Signed-off-by: Benjamin Diaz --- .gitignore | 1 + osm_mon/core/message_bus/common_consumer | 7 ++- osm_mon/core/message_bus/producer.py | 7 +-- osm_mon/core/models/delete_metric_req.json | 2 +- osm_mon/plugins/OpenStack/Aodh/alarming.py | 53 ++++++++++++++++---- osm_mon/plugins/OpenStack/Gnocchi/metrics.py | 44 +++++++++------- osm_mon/plugins/OpenStack/common.py | 2 +- osm_mon/plugins/OpenStack/settings.py | 12 ++--- osm_mon/test/OpenStack/test_alarming.py | 22 +++++--- osm_mon/test/OpenStack/test_metric_calls.py | 24 ++++----- osm_mon/test/OpenStack/test_settings.py | 3 +- 11 files changed, 114 insertions(+), 63 deletions(-) diff --git a/.gitignore b/.gitignore index 8243f05..f4d6bb1 100644 --- a/.gitignore +++ b/.gitignore @@ -75,3 +75,4 @@ ChangeLog .*sw? .settings/ __pycache__/ +.idea diff --git a/osm_mon/core/message_bus/common_consumer b/osm_mon/core/message_bus/common_consumer index 709c07e..efbb122 100755 --- a/osm_mon/core/message_bus/common_consumer +++ b/osm_mon/core/message_bus/common_consumer @@ -47,7 +47,12 @@ from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials from osm_mon.plugins.vRealiseOps import plugin_receiver # Initialize servers -server = {'server': 'localhost:9092'} +if "BROKER_URI" in os.environ: + server = {'server': os.getenv("BROKER_URI")} +else: + server = {'server': 'localhost:9092'} + + # Initialize consumers for alarms and metrics common_consumer = KafkaConsumer(bootstrap_servers=server['server']) diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/message_bus/producer.py index aad8b62..d4f8015 100755 --- a/osm_mon/core/message_bus/producer.py +++ b/osm_mon/core/message_bus/producer.py @@ -28,7 +28,7 @@ import logging import os -import jsmin +from jsmin import jsmin from kafka import KafkaProducer as kaf @@ -37,7 +37,8 @@ from kafka.errors import KafkaError __author__ = "Prithiv Mohan" __date__ = "06/Sep/2017" -json_path = os.path.abspath(os.pardir + "/MON/osm_mon/core/models/") +current_path = os.path.realpath(__file__) +json_path = os.path.abspath(os.path.join(current_path, '..', '..', 'models')) # TODO(): validate all of the request and response messages against the # json_schemas @@ -247,7 +248,7 @@ class KafkaProducer(object): # Internal to MON payload_metric_list_resp = jsmin( - open(os.path.join(json_path, 'list_metrics_resp.json')).read()) + open(os.path.join(json_path, 'list_metric_resp.json')).read()) self.publish(key, value=message, diff --git a/osm_mon/core/models/delete_metric_req.json b/osm_mon/core/models/delete_metric_req.json index 7e03e72..c4cfdad 100644 --- a/osm_mon/core/models/delete_metric_req.json +++ b/osm_mon/core/models/delete_metric_req.json @@ -24,7 +24,7 @@ "schema_type": { "type": "string" }, "metric_name": { "type": "string" }, "metric_uuid": { "type": "string" }, - "resource_id": { "type": "string" }, + "resource_uuid": { "type": "string" }, "tenant_uuid": { "type": "string" }, "correlation_id": { "type": "integer" }, "vim_type": { "type": "string" }, diff --git a/osm_mon/plugins/OpenStack/Aodh/alarming.py b/osm_mon/plugins/OpenStack/Aodh/alarming.py index b7978fb..abd6690 100644 --- a/osm_mon/plugins/OpenStack/Aodh/alarming.py +++ b/osm_mon/plugins/OpenStack/Aodh/alarming.py @@ -29,6 +29,7 @@ from osm_mon.core.message_bus.producer import KafkaProducer from osm_mon.plugins.OpenStack.response import OpenStack_Response from osm_mon.plugins.OpenStack.settings import Config +from osm_mon.plugins.OpenStack.Gnocchi.metrics import Metrics log = logging.getLogger(__name__) @@ -43,6 +44,18 @@ ALARM_NAMES = { "packets_out_above_threshold": "packets_sent", "cpu_utilization_above_threshold": "cpu_utilization"} +METRIC_MAPPINGS = { + "average_memory_utilization": "memory.percent", + "disk_read_ops": "disk.disk_ops", + "disk_write_ops": "disk.disk_ops", + "disk_read_bytes": "disk.read.bytes", + "disk_write_bytes": "disk.write.bytes", + "packets_dropped": "interface.if_dropped", + "packets_received": "interface.if_packets", + "packets_sent": "interface.if_packets", + "cpu_utilization": "cpu_util", +} + SEVERITIES = { "warning": "low", "minor": "low", @@ -51,7 +64,7 @@ SEVERITIES = { "indeterminate": "critical"} STATISTICS = { - "average": "avg", + "average": "mean", "minimum": "min", "maximum": "max", "count": "count", @@ -387,6 +400,7 @@ class Alarming(object): alarm_name, alarm_state=None): """Check that the payload is configuration for update/create alarm.""" try: + cfg = Config.instance() # Check state and severity severity = values['severity'].lower() if severity == "indeterminate": @@ -395,21 +409,25 @@ class Alarming(object): alarm_state = "ok" statistic = values['statistic'].lower() + granularity = values['granularity'] + resource_type = values['resource_type'].lower() + # Try to configure the payload for the update/create request # Can only update: threshold, operation, statistic and # the severity of the alarm rule = {'threshold': values['threshold_value'], 'comparison_operator': values['operation'].lower(), - 'metric': metric_name, + 'metric': METRIC_MAPPINGS[metric_name], 'resource_id': resource_id, - 'resource_type': 'generic', - 'aggregation_method': STATISTICS[statistic], } + 'resource_type': resource_type, + 'aggregation_method': STATISTICS[statistic], + 'granularity': granularity, } payload = json.dumps({'state': alarm_state, 'name': alarm_name, 'severity': SEVERITIES[severity], 'type': 'gnocchi_resources_threshold', 'gnocchi_resources_threshold_rule': rule, - 'alarm_actions': ['http://localhost:8662'], }) + 'alarm_actions': [cfg.OS_NOTIFIER_URI], }) return payload except KeyError as exc: log.warn("Alarm is not configured correctly: %s", exc) @@ -431,15 +449,28 @@ class Alarming(object): """Check for the alarm metric.""" try: endpoint = self.common.get_endpoint("metric") - - url = "{}/v1/metric/".format(endpoint) - metric_list = self.common._perform_request( + url = "{}/v1/metric?sort=name:asc".format(endpoint) + result = self.common._perform_request( url, auth_token, req_type="get") - - for metric in json.loads(metric_list.text): + metric_list = [] + metrics_partial = json.loads(result.text) + for metric in metrics_partial: + metric_list.append(metric) + + while len(json.loads(result.text)) > 0: + last_metric_id = metrics_partial[-1]['id'] + url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id) + result = self.common._perform_request( + url, auth_token, req_type="get") + if len(json.loads(result.text)) > 0: + metrics_partial = json.loads(result.text) + for metric in metrics_partial: + metric_list.append(metric) + + for metric in metric_list: name = metric['name'] resource = metric['resource_id'] - if (name == m_name and resource == r_id): + if (name == METRIC_MAPPINGS[m_name] and resource == r_id): metric_id = metric['id'] log.info("The required metric exists, an alarm will be created.") return metric_id diff --git a/osm_mon/plugins/OpenStack/Gnocchi/metrics.py b/osm_mon/plugins/OpenStack/Gnocchi/metrics.py index 8e2ab4e..d3d46d3 100644 --- a/osm_mon/plugins/OpenStack/Gnocchi/metrics.py +++ b/osm_mon/plugins/OpenStack/Gnocchi/metrics.py @@ -36,14 +36,14 @@ log = logging.getLogger(__name__) METRIC_MAPPINGS = { "average_memory_utilization": "memory.percent", - "disk_read_ops": "disk.disk_ops", - "disk_write_ops": "disk.disk_ops", - "disk_read_bytes": "disk.disk_octets", - "disk_write_bytes": "disk.disk_octets", + "disk_read_ops": "disk.read.requests", + "disk_write_ops": "disk.write.requests", + "digsk_read_bytes": "disk.read.bytes", + "disk_write_bytes": "disk.write.bytes", "packets_dropped": "interface.if_dropped", "packets_received": "interface.if_packets", "packets_sent": "interface.if_packets", - "cpu_utilization": "cpu.percent", + "cpu_utilization": "cpu_util", } PERIOD_MS = { @@ -212,8 +212,8 @@ class Metrics(object): return None, None, False # Check/Normalize metric name - metric_name, norm_name = self.get_metric_name(values) - if norm_name is None: + norm_name, metric_name = self.get_metric_name(values) + if metric_name is None: log.warn("This metric is not supported by this plugin.") return None, resource_id, False @@ -312,9 +312,13 @@ class Metrics(object): resource = None try: + url = "{}/v1/metric?sort=name:asc".format(endpoint) result = self._common._perform_request( url, auth_token, req_type="get") - metrics = json.loads(result.text) + metrics = [] + metrics_partial = json.loads(result.text) + for metric in metrics_partial: + metrics.append(metric) if metrics is not None: # Format the list response @@ -412,24 +416,28 @@ class Metrics(object): # Create required lists for row in metric_list: # Only list OSM metrics - if row['name'] in METRIC_MAPPINGS.keys(): - metric = {"metric_name": row['name'], + name = None + if row['name'] in METRIC_MAPPINGS.values(): + for k,v in METRIC_MAPPINGS.iteritems(): + if row['name'] == v: + name = k + metric = {"metric_name": name, "metric_uuid": row['id'], "metric_unit": row['unit'], "resource_uuid": row['resource_id']} resp_list.append(str(metric)) # Generate metric_name specific list - if metric_name is not None: - if row['name'] == metric_name: - metric = {"metric_name": row['name'], + if metric_name is not None and name is not None: + if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]: + metric = {"metric_name": metric_name, "metric_uuid": row['id'], "metric_unit": row['unit'], "resource_uuid": row['resource_id']} name_list.append(str(metric)) # Generate resource specific list - if resource is not None: + if resource is not None and name is not None: if row['resource_id'] == resource: - metric = {"metric_name": row['name'], + metric = {"metric_name": name, "metric_uuid": row['id'], "metric_unit": row['unit'], "resource_uuid": row['resource_id']} @@ -437,10 +445,12 @@ class Metrics(object): # Join required lists if metric_name is not None and resource is not None: - return list(set(res_list).intersection(name_list)) + intersection_set = set(res_list).intersection(name_list) + intersection = list(intersection_set) + return intersection elif metric_name is not None: return name_list elif resource is not None: - return list(set(res_list).intersection(resp_list)) + return res_list else: return resp_list diff --git a/osm_mon/plugins/OpenStack/common.py b/osm_mon/plugins/OpenStack/common.py index 447d89d..4401d0a 100644 --- a/osm_mon/plugins/OpenStack/common.py +++ b/osm_mon/plugins/OpenStack/common.py @@ -98,7 +98,7 @@ class Common(object): return self._ks.service_catalog.url_for( service_type=service_type, endpoint_type='publicURL', - region_name='RegionOne') + region_name='regionOne') except Exception as exc: log.warning("Failed to retreive endpoint for service due to: %s", exc) diff --git a/osm_mon/plugins/OpenStack/settings.py b/osm_mon/plugins/OpenStack/settings.py index a4b0752..1e8f54f 100644 --- a/osm_mon/plugins/OpenStack/settings.py +++ b/osm_mon/plugins/OpenStack/settings.py @@ -64,6 +64,7 @@ class Config(object): CfgParam('OS_USERNAME', None, six.text_type), CfgParam('OS_PASSWORD', "password", six.text_type), CfgParam('OS_TENANT_NAME', "service", six.text_type), + CfgParam('OS_NOTIFIER_URI', "http://localhost:8662", six.text_type), ] _config_dict = {cfg.key: cfg for cfg in _configuration} @@ -78,17 +79,12 @@ class Config(object): """Check the appropriate environment variables and update defaults.""" for key in self._config_keys: try: - if (key == "OS_IDENTITY_API_VERSION" or key == "OS_PASSWORD"): - val = str(os.environ[key]) - setattr(self, key, val) - elif (key == "OS_AUTH_URL"): + if key == "OS_AUTH_URL": val = str(os.environ[key]) + "/v3" setattr(self, key, val) else: - # Default username for a service is it's name - setattr(self, 'OS_USERNAME', service) - log.info("Configuration complete!") - return + val = str(os.environ[key]) + setattr(self, key, val) except KeyError as exc: log.warn("Falied to configure plugin: %s", exc) log.warn("Try re-authenticating your OpenStack deployment.") diff --git a/osm_mon/test/OpenStack/test_alarming.py b/osm_mon/test/OpenStack/test_alarming.py index 92c21f6..effd920 100644 --- a/osm_mon/test/OpenStack/test_alarming.py +++ b/osm_mon/test/OpenStack/test_alarming.py @@ -204,18 +204,21 @@ class TestAlarming(unittest.TestCase): values = {"severity": "warning", "statistic": "COUNT", "threshold_value": 12, - "operation": "GT"} + "operation": "GT", + "granularity": 300, + "resource_type": "generic"} payload = self.alarming.check_payload( - values, "my_metric", "r_id", "alarm_name") + values, "disk_write_ops", "r_id", "alarm_name") - self.assertEqual( + self.assertDictEqual( json.loads(payload), {"name": "alarm_name", "gnocchi_resources_threshold_rule": {"resource_id": "r_id", - "metric": "my_metric", + "metric": "disk.disk_ops", "comparison_operator": "gt", "aggregation_method": "count", "threshold": 12, + "granularity": 300, "resource_type": "generic"}, "severity": "low", "state": "ok", @@ -227,18 +230,21 @@ class TestAlarming(unittest.TestCase): values = {"severity": "warning", "statistic": "COUNT", "threshold_value": 12, - "operation": "GT"} + "operation": "GT", + "granularity": 300, + "resource_type": "generic"} payload = self.alarming.check_payload( - values, "my_metric", "r_id", "alarm_name", alarm_state="alarm") + values, "disk_write_ops", "r_id", "alarm_name", alarm_state="alarm") self.assertEqual( json.loads(payload), {"name": "alarm_name", "gnocchi_resources_threshold_rule": {"resource_id": "r_id", - "metric": "my_metric", + "metric": "disk.disk_ops", "comparison_operator": "gt", "aggregation_method": "count", "threshold": 12, + "granularity": 300, "resource_type": "generic"}, "severity": "low", "state": "alarm", @@ -270,4 +276,4 @@ class TestAlarming(unittest.TestCase): self.alarming.check_for_metric(auth_token, "metric_name", "r_id") perf_req.assert_called_with( - "gnocchi_endpoint/v1/metric/", auth_token, req_type="get") + "gnocchi_endpoint/v1/metric?sort=name:asc", auth_token, req_type="get") diff --git a/osm_mon/test/OpenStack/test_metric_calls.py b/osm_mon/test/OpenStack/test_metric_calls.py index 3c4a7c8..d209f61 100644 --- a/osm_mon/test/OpenStack/test_metric_calls.py +++ b/osm_mon/test/OpenStack/test_metric_calls.py @@ -40,7 +40,7 @@ endpoint = mock.ANY auth_token = mock.ANY # Mock a valid metric list for some tests, and a resultant list -metric_list = [{"name": "disk_write_ops", +metric_list = [{"name": "disk.write.requests", "id": "metric_id", "unit": "units", "resource_id": "r_id"}] @@ -52,7 +52,7 @@ class Response(object): def __init__(self): """Initialise test and status code values.""" - self.text = json.dumps("mock_response_text") + self.text = json.dumps([{"id": "test_id"}]) self.status_code = "STATUS_CODE" @@ -116,7 +116,7 @@ class TestMetricCalls(unittest.TestCase): # Test valid configuration and payload for creating a metric values = {"resource_uuid": "r_id", "metric_unit": "units"} - get_metric_name.return_value = "metric_name", "norm_name" + get_metric_name.return_value = "norm_name", "metric_name" get_metric.return_value = None payload = {"id": "r_id", "metrics": {"metric_name": @@ -158,8 +158,8 @@ class TestMetricCalls(unittest.TestCase): self.metrics.list_metrics(endpoint, auth_token, values) perf_req.assert_called_with( - "/v1/metric/", auth_token, req_type="get") - resp_list.assert_called_with("mock_response_text") + "/v1/metric?sort=name:asc", auth_token, req_type="get") + resp_list.assert_called_with([{u'id': u'test_id'}]) @mock.patch.object(metric_req.Metrics, "response_list") @mock.patch.object(Common, "_perform_request") @@ -172,9 +172,9 @@ class TestMetricCalls(unittest.TestCase): self.metrics.list_metrics(endpoint, auth_token, values) perf_req.assert_called_with( - "/v1/metric/", auth_token, req_type="get") + "/v1/metric?sort=name:asc", auth_token, req_type="get") resp_list.assert_called_with( - "mock_response_text", resource="resource_id") + [{u'id': u'test_id'}], resource="resource_id") @mock.patch.object(metric_req.Metrics, "response_list") @mock.patch.object(Common, "_perform_request") @@ -187,9 +187,9 @@ class TestMetricCalls(unittest.TestCase): self.metrics.list_metrics(endpoint, auth_token, values) perf_req.assert_called_with( - "/v1/metric/", auth_token, req_type="get") + "/v1/metric?sort=name:asc", auth_token, req_type="get") resp_list.assert_called_with( - "mock_response_text", metric_name="disk_write_bytes") + [{u'id': u'test_id'}], metric_name="disk_write_bytes") @mock.patch.object(metric_req.Metrics, "response_list") @mock.patch.object(Common, "_perform_request") @@ -203,9 +203,9 @@ class TestMetricCalls(unittest.TestCase): self.metrics.list_metrics(endpoint, auth_token, values) perf_req.assert_called_with( - "/v1/metric/", auth_token, req_type="get") + "/v1/metric?sort=name:asc", auth_token, req_type="get") resp_list.assert_called_with( - "mock_response_text", resource="resource_id", + [{u'id': u'test_id'}], resource="resource_id", metric_name="packets_sent") @mock.patch.object(Common, "_perform_request") @@ -224,7 +224,7 @@ class TestMetricCalls(unittest.TestCase): metric_name, norm_name = self.metrics.get_metric_name(values) self.assertEqual(metric_name, "disk_write_ops") - self.assertEqual(norm_name, "disk.disk_ops") + self.assertEqual(norm_name, "disk.write.requests") # test with an invalid metric name values = {"metric_name": "my_invalid_metric"} diff --git a/osm_mon/test/OpenStack/test_settings.py b/osm_mon/test/OpenStack/test_settings.py index 44bed82..66da6af 100644 --- a/osm_mon/test/OpenStack/test_settings.py +++ b/osm_mon/test/OpenStack/test_settings.py @@ -44,9 +44,10 @@ class TestSettings(unittest.TestCase): def test_set_os_username(self): """Test reading the environment for OpenStack plugin configuration.""" + os.environ["OS_USERNAME"] = "test" self.cfg.read_environ("my_service") - self.assertEqual(self.cfg.OS_USERNAME, "my_service") + self.assertEqual(self.cfg.OS_USERNAME, "test") @mock.patch.object(os, "environ") def test_read_environ(self, environ): -- 2.25.1