.*sw?
.settings/
__pycache__/
+.idea
from osm_mon.plugins.vRealiseOps import plugin_receiver
# Initialize servers
-server = {'server': 'localhost:9092'}
+if "BROKER_URI" in os.environ:
+ server = {'server': os.getenv("BROKER_URI")}
+else:
+ server = {'server': 'localhost:9092'}
+
+
# Initialize consumers for alarms and metrics
common_consumer = KafkaConsumer(bootstrap_servers=server['server'])
import os
-import jsmin
+from jsmin import jsmin
from kafka import KafkaProducer as kaf
__author__ = "Prithiv Mohan"
__date__ = "06/Sep/2017"
-json_path = os.path.abspath(os.pardir + "/MON/osm_mon/core/models/")
+current_path = os.path.realpath(__file__)
+json_path = os.path.abspath(os.path.join(current_path, '..', '..', 'models'))
# TODO(): validate all of the request and response messages against the
# json_schemas
# Internal to MON
payload_metric_list_resp = jsmin(
- open(os.path.join(json_path, 'list_metrics_resp.json')).read())
+ open(os.path.join(json_path, 'list_metric_resp.json')).read())
self.publish(key,
value=message,
"schema_type": { "type": "string" },
"metric_name": { "type": "string" },
"metric_uuid": { "type": "string" },
- "resource_id": { "type": "string" },
+ "resource_uuid": { "type": "string" },
"tenant_uuid": { "type": "string" },
"correlation_id": { "type": "integer" },
"vim_type": { "type": "string" },
from osm_mon.plugins.OpenStack.response import OpenStack_Response
from osm_mon.plugins.OpenStack.settings import Config
+from osm_mon.plugins.OpenStack.Gnocchi.metrics import Metrics
log = logging.getLogger(__name__)
"packets_out_above_threshold": "packets_sent",
"cpu_utilization_above_threshold": "cpu_utilization"}
+METRIC_MAPPINGS = {
+ "average_memory_utilization": "memory.percent",
+ "disk_read_ops": "disk.disk_ops",
+ "disk_write_ops": "disk.disk_ops",
+ "disk_read_bytes": "disk.read.bytes",
+ "disk_write_bytes": "disk.write.bytes",
+ "packets_dropped": "interface.if_dropped",
+ "packets_received": "interface.if_packets",
+ "packets_sent": "interface.if_packets",
+ "cpu_utilization": "cpu_util",
+}
+
SEVERITIES = {
"warning": "low",
"minor": "low",
"indeterminate": "critical"}
STATISTICS = {
- "average": "avg",
+ "average": "mean",
"minimum": "min",
"maximum": "max",
"count": "count",
alarm_name, alarm_state=None):
"""Check that the payload is configuration for update/create alarm."""
try:
+ cfg = Config.instance()
# Check state and severity
severity = values['severity'].lower()
if severity == "indeterminate":
alarm_state = "ok"
statistic = values['statistic'].lower()
+ granularity = values['granularity']
+ resource_type = values['resource_type'].lower()
+
# Try to configure the payload for the update/create request
# Can only update: threshold, operation, statistic and
# the severity of the alarm
rule = {'threshold': values['threshold_value'],
'comparison_operator': values['operation'].lower(),
- 'metric': metric_name,
+ 'metric': METRIC_MAPPINGS[metric_name],
'resource_id': resource_id,
- 'resource_type': 'generic',
- 'aggregation_method': STATISTICS[statistic], }
+ 'resource_type': resource_type,
+ 'aggregation_method': STATISTICS[statistic],
+ 'granularity': granularity, }
payload = json.dumps({'state': alarm_state,
'name': alarm_name,
'severity': SEVERITIES[severity],
'type': 'gnocchi_resources_threshold',
'gnocchi_resources_threshold_rule': rule,
- 'alarm_actions': ['http://localhost:8662'], })
+ 'alarm_actions': [cfg.OS_NOTIFIER_URI], })
return payload
except KeyError as exc:
log.warn("Alarm is not configured correctly: %s", exc)
"""Check for the alarm metric."""
try:
endpoint = self.common.get_endpoint("metric")
-
- url = "{}/v1/metric/".format(endpoint)
- metric_list = self.common._perform_request(
+ url = "{}/v1/metric?sort=name:asc".format(endpoint)
+ result = self.common._perform_request(
url, auth_token, req_type="get")
-
- for metric in json.loads(metric_list.text):
+ metric_list = []
+ metrics_partial = json.loads(result.text)
+ for metric in metrics_partial:
+ metric_list.append(metric)
+
+ while len(json.loads(result.text)) > 0:
+ last_metric_id = metrics_partial[-1]['id']
+ url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id)
+ result = self.common._perform_request(
+ url, auth_token, req_type="get")
+ if len(json.loads(result.text)) > 0:
+ metrics_partial = json.loads(result.text)
+ for metric in metrics_partial:
+ metric_list.append(metric)
+
+ for metric in metric_list:
name = metric['name']
resource = metric['resource_id']
- if (name == m_name and resource == r_id):
+ if (name == METRIC_MAPPINGS[m_name] and resource == r_id):
metric_id = metric['id']
log.info("The required metric exists, an alarm will be created.")
return metric_id
METRIC_MAPPINGS = {
"average_memory_utilization": "memory.percent",
- "disk_read_ops": "disk.disk_ops",
- "disk_write_ops": "disk.disk_ops",
- "disk_read_bytes": "disk.disk_octets",
- "disk_write_bytes": "disk.disk_octets",
+ "disk_read_ops": "disk.read.requests",
+ "disk_write_ops": "disk.write.requests",
+ "digsk_read_bytes": "disk.read.bytes",
+ "disk_write_bytes": "disk.write.bytes",
"packets_dropped": "interface.if_dropped",
"packets_received": "interface.if_packets",
"packets_sent": "interface.if_packets",
- "cpu_utilization": "cpu.percent",
+ "cpu_utilization": "cpu_util",
}
PERIOD_MS = {
return None, None, False
# Check/Normalize metric name
- metric_name, norm_name = self.get_metric_name(values)
- if norm_name is None:
+ norm_name, metric_name = self.get_metric_name(values)
+ if metric_name is None:
log.warn("This metric is not supported by this plugin.")
return None, resource_id, False
resource = None
try:
+ url = "{}/v1/metric?sort=name:asc".format(endpoint)
result = self._common._perform_request(
url, auth_token, req_type="get")
- metrics = json.loads(result.text)
+ metrics = []
+ metrics_partial = json.loads(result.text)
+ for metric in metrics_partial:
+ metrics.append(metric)
if metrics is not None:
# Format the list response
# Create required lists
for row in metric_list:
# Only list OSM metrics
- if row['name'] in METRIC_MAPPINGS.keys():
- metric = {"metric_name": row['name'],
+ name = None
+ if row['name'] in METRIC_MAPPINGS.values():
+ for k,v in METRIC_MAPPINGS.iteritems():
+ if row['name'] == v:
+ name = k
+ metric = {"metric_name": name,
"metric_uuid": row['id'],
"metric_unit": row['unit'],
"resource_uuid": row['resource_id']}
resp_list.append(str(metric))
# Generate metric_name specific list
- if metric_name is not None:
- if row['name'] == metric_name:
- metric = {"metric_name": row['name'],
+ if metric_name is not None and name is not None:
+ if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]:
+ metric = {"metric_name": metric_name,
"metric_uuid": row['id'],
"metric_unit": row['unit'],
"resource_uuid": row['resource_id']}
name_list.append(str(metric))
# Generate resource specific list
- if resource is not None:
+ if resource is not None and name is not None:
if row['resource_id'] == resource:
- metric = {"metric_name": row['name'],
+ metric = {"metric_name": name,
"metric_uuid": row['id'],
"metric_unit": row['unit'],
"resource_uuid": row['resource_id']}
# Join required lists
if metric_name is not None and resource is not None:
- return list(set(res_list).intersection(name_list))
+ intersection_set = set(res_list).intersection(name_list)
+ intersection = list(intersection_set)
+ return intersection
elif metric_name is not None:
return name_list
elif resource is not None:
- return list(set(res_list).intersection(resp_list))
+ return res_list
else:
return resp_list
return self._ks.service_catalog.url_for(
service_type=service_type,
endpoint_type='publicURL',
- region_name='RegionOne')
+ region_name='regionOne')
except Exception as exc:
log.warning("Failed to retreive endpoint for service due to: %s",
exc)
CfgParam('OS_USERNAME', None, six.text_type),
CfgParam('OS_PASSWORD', "password", six.text_type),
CfgParam('OS_TENANT_NAME', "service", six.text_type),
+ CfgParam('OS_NOTIFIER_URI', "http://localhost:8662", six.text_type),
]
_config_dict = {cfg.key: cfg for cfg in _configuration}
"""Check the appropriate environment variables and update defaults."""
for key in self._config_keys:
try:
- if (key == "OS_IDENTITY_API_VERSION" or key == "OS_PASSWORD"):
- val = str(os.environ[key])
- setattr(self, key, val)
- elif (key == "OS_AUTH_URL"):
+ if key == "OS_AUTH_URL":
val = str(os.environ[key]) + "/v3"
setattr(self, key, val)
else:
- # Default username for a service is it's name
- setattr(self, 'OS_USERNAME', service)
- log.info("Configuration complete!")
- return
+ val = str(os.environ[key])
+ setattr(self, key, val)
except KeyError as exc:
log.warn("Falied to configure plugin: %s", exc)
log.warn("Try re-authenticating your OpenStack deployment.")
values = {"severity": "warning",
"statistic": "COUNT",
"threshold_value": 12,
- "operation": "GT"}
+ "operation": "GT",
+ "granularity": 300,
+ "resource_type": "generic"}
payload = self.alarming.check_payload(
- values, "my_metric", "r_id", "alarm_name")
+ values, "disk_write_ops", "r_id", "alarm_name")
- self.assertEqual(
+ self.assertDictEqual(
json.loads(payload), {"name": "alarm_name",
"gnocchi_resources_threshold_rule":
{"resource_id": "r_id",
- "metric": "my_metric",
+ "metric": "disk.disk_ops",
"comparison_operator": "gt",
"aggregation_method": "count",
"threshold": 12,
+ "granularity": 300,
"resource_type": "generic"},
"severity": "low",
"state": "ok",
values = {"severity": "warning",
"statistic": "COUNT",
"threshold_value": 12,
- "operation": "GT"}
+ "operation": "GT",
+ "granularity": 300,
+ "resource_type": "generic"}
payload = self.alarming.check_payload(
- values, "my_metric", "r_id", "alarm_name", alarm_state="alarm")
+ values, "disk_write_ops", "r_id", "alarm_name", alarm_state="alarm")
self.assertEqual(
json.loads(payload), {"name": "alarm_name",
"gnocchi_resources_threshold_rule":
{"resource_id": "r_id",
- "metric": "my_metric",
+ "metric": "disk.disk_ops",
"comparison_operator": "gt",
"aggregation_method": "count",
"threshold": 12,
+ "granularity": 300,
"resource_type": "generic"},
"severity": "low",
"state": "alarm",
self.alarming.check_for_metric(auth_token, "metric_name", "r_id")
perf_req.assert_called_with(
- "gnocchi_endpoint/v1/metric/", auth_token, req_type="get")
+ "gnocchi_endpoint/v1/metric?sort=name:asc", auth_token, req_type="get")
auth_token = mock.ANY
# Mock a valid metric list for some tests, and a resultant list
-metric_list = [{"name": "disk_write_ops",
+metric_list = [{"name": "disk.write.requests",
"id": "metric_id",
"unit": "units",
"resource_id": "r_id"}]
def __init__(self):
"""Initialise test and status code values."""
- self.text = json.dumps("mock_response_text")
+ self.text = json.dumps([{"id": "test_id"}])
self.status_code = "STATUS_CODE"
# Test valid configuration and payload for creating a metric
values = {"resource_uuid": "r_id",
"metric_unit": "units"}
- get_metric_name.return_value = "metric_name", "norm_name"
+ get_metric_name.return_value = "norm_name", "metric_name"
get_metric.return_value = None
payload = {"id": "r_id",
"metrics": {"metric_name":
self.metrics.list_metrics(endpoint, auth_token, values)
perf_req.assert_called_with(
- "<ANY>/v1/metric/", auth_token, req_type="get")
- resp_list.assert_called_with("mock_response_text")
+ "<ANY>/v1/metric?sort=name:asc", auth_token, req_type="get")
+ resp_list.assert_called_with([{u'id': u'test_id'}])
@mock.patch.object(metric_req.Metrics, "response_list")
@mock.patch.object(Common, "_perform_request")
self.metrics.list_metrics(endpoint, auth_token, values)
perf_req.assert_called_with(
- "<ANY>/v1/metric/", auth_token, req_type="get")
+ "<ANY>/v1/metric?sort=name:asc", auth_token, req_type="get")
resp_list.assert_called_with(
- "mock_response_text", resource="resource_id")
+ [{u'id': u'test_id'}], resource="resource_id")
@mock.patch.object(metric_req.Metrics, "response_list")
@mock.patch.object(Common, "_perform_request")
self.metrics.list_metrics(endpoint, auth_token, values)
perf_req.assert_called_with(
- "<ANY>/v1/metric/", auth_token, req_type="get")
+ "<ANY>/v1/metric?sort=name:asc", auth_token, req_type="get")
resp_list.assert_called_with(
- "mock_response_text", metric_name="disk_write_bytes")
+ [{u'id': u'test_id'}], metric_name="disk_write_bytes")
@mock.patch.object(metric_req.Metrics, "response_list")
@mock.patch.object(Common, "_perform_request")
self.metrics.list_metrics(endpoint, auth_token, values)
perf_req.assert_called_with(
- "<ANY>/v1/metric/", auth_token, req_type="get")
+ "<ANY>/v1/metric?sort=name:asc", auth_token, req_type="get")
resp_list.assert_called_with(
- "mock_response_text", resource="resource_id",
+ [{u'id': u'test_id'}], resource="resource_id",
metric_name="packets_sent")
@mock.patch.object(Common, "_perform_request")
metric_name, norm_name = self.metrics.get_metric_name(values)
self.assertEqual(metric_name, "disk_write_ops")
- self.assertEqual(norm_name, "disk.disk_ops")
+ self.assertEqual(norm_name, "disk.write.requests")
# test with an invalid metric name
values = {"metric_name": "my_invalid_metric"}
def test_set_os_username(self):
"""Test reading the environment for OpenStack plugin configuration."""
+ os.environ["OS_USERNAME"] = "test"
self.cfg.read_environ("my_service")
- self.assertEqual(self.cfg.OS_USERNAME, "my_service")
+ self.assertEqual(self.cfg.OS_USERNAME, "test")
@mock.patch.object(os, "environ")
def test_read_environ(self, environ):