1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
22 """Carry out OpenStack metric requests via Gnocchi API."""
30 from core
.message_bus
.producer
import KafkaProducer
32 from plugins
.OpenStack
.response
import OpenStack_Response
33 from plugins
.OpenStack
.settings
import Config
35 __author__
= "Helena McGough"
37 log
= logging
.getLogger(__name__
)
40 "average_memory_utilization": "memory.percent",
41 "disk_read_ops": "disk.disk_ops",
42 "disk_write_ops": "disk.disk_ops",
43 "disk_read_bytes": "disk.disk_octets",
44 "disk_write_bytes": "disk.disk_octets",
45 "packets_dropped": "interface.if_dropped",
46 "packets_received": "interface.if_packets",
47 "packets_sent": "interface.if_packets",
48 "cpu_utilization": "cpu.percent",
60 class Metrics(object):
61 """OpenStack metric requests performed via the Gnocchi API."""
64 """Initialize the metric actions."""
65 # Configure an instance of the OpenStack metric plugin
66 config
= Config
.instance()
67 config
.read_environ("gnocchi")
69 # Initialise authentication for API requests
70 self
.auth_token
= None
74 # Use the Response class to generate valid json response messages
75 self
._response
= OpenStack_Response()
77 # Initializer a producer to send responses back to SO
78 self
._producer
= KafkaProducer("metric_response")
80 def metric_calls(self
, message
, common
, auth_token
):
81 """Consume info from the message bus to manage metric requests."""
82 values
= json
.loads(message
.value
)
84 log
.info("OpenStack metric action required.")
86 # Generate and auth_token and endpoint for request
87 if auth_token
is not None:
88 if self
.auth_token
!= auth_token
:
89 log
.info("Auth_token for metrics set by access_credentials.")
90 self
.auth_token
= auth_token
92 log
.info("Auth_token has not been updated.")
94 log
.info("Using environment variables to set Gnocchi auth_token.")
95 self
.auth_token
= self
._common
._authenticate
()
97 if self
.endpoint
is None:
98 log
.info("Generating a new endpoint for Gnocchi.")
99 self
.endpoint
= self
._common
.get_endpoint("metric")
101 if message
.key
== "create_metric_request":
103 metric_details
= values
['metric_create']
104 metric_id
, resource_id
, status
= self
.configure_metric(
105 self
.endpoint
, self
.auth_token
, metric_details
)
107 # Generate and send a create metric response
109 resp_message
= self
._response
.generate_response(
110 'create_metric_response', status
=status
,
111 cor_id
=values
['correlation_id'],
112 metric_id
=metric_id
, r_id
=resource_id
)
113 log
.info("Response messages: %s", resp_message
)
114 self
._producer
.create_metrics_resp(
115 'create_metric_response', resp_message
,
117 except Exception as exc
:
118 log
.warn("Failed to create response: %s", exc
)
120 elif message
.key
== "read_metric_data_request":
121 # Read all metric data related to a specified metric
122 timestamps
, metric_data
= self
.read_metric_data(
123 self
.endpoint
, self
.auth_token
, values
)
125 # Generate and send a response message
127 resp_message
= self
._response
.generate_response(
128 'read_metric_data_response',
129 m_id
=values
['metric_uuid'],
130 m_name
=values
['metric_name'],
131 r_id
=values
['resource_uuid'],
132 cor_id
=values
['correlation_id'],
133 times
=timestamps
, metrics
=metric_data
)
134 log
.info("Response message: %s", resp_message
)
135 self
._producer
.read_metric_data_response(
136 'read_metric_data_response', resp_message
,
138 except Exception as exc
:
139 log
.warn("Failed to send read metric response:%s", exc
)
141 elif message
.key
== "delete_metric_request":
142 # delete the specified metric in the request
143 metric_id
= values
['metric_uuid']
144 status
= self
.delete_metric(
145 self
.endpoint
, self
.auth_token
, metric_id
)
147 # Generate and send a response message
149 resp_message
= self
._response
.generate_response(
150 'delete_metric_response', m_id
=metric_id
,
151 m_name
=values
['metric_name'],
152 status
=status
, r_id
=values
['resource_uuid'],
153 cor_id
=values
['correlation_id'])
154 log
.info("Response message: %s", resp_message
)
155 self
._producer
.delete_metric_response(
156 'delete_metric_response', resp_message
,
158 except Exception as exc
:
159 log
.warn("Failed to send delete response:%s", exc
)
161 elif message
.key
== "update_metric_request":
162 # Gnocchi doesn't support configuration updates
163 # Log and send a response back to this effect
164 log
.warn("Gnocchi doesn't support metric configuration\
166 req_details
= values
['metric_create']
167 metric_name
= req_details
['metric_name']
168 resource_id
= req_details
['resource_uuid']
169 metric_id
= self
.get_metric_id(
170 self
.endpoint
, self
.auth_token
, metric_name
, resource_id
)
172 # Generate and send a response message
174 resp_message
= self
._response
.generate_response(
175 'update_metric_response', status
=False,
176 cor_id
=values
['correlation_id'],
177 r_id
=resource_id
, m_id
=metric_id
)
178 log
.info("Response message: %s", resp_message
)
179 self
._producer
.update_metric_response(
180 'update_metric_response', resp_message
,
182 except Exception as exc
:
183 log
.warn("Failed to send an update response:%s", exc
)
185 elif message
.key
== "list_metric_request":
186 list_details
= values
['metrics_list_request']
188 metric_list
= self
.list_metrics(
189 self
.endpoint
, self
.auth_token
, list_details
)
191 # Generate and send a response message
193 resp_message
= self
._response
.generate_response(
194 'list_metric_response', m_list
=metric_list
,
195 cor_id
=list_details
['correlation_id'])
196 log
.info("Response message: %s", resp_message
)
197 self
._producer
.list_metric_response(
198 'list_metric_response', resp_message
,
200 except Exception as exc
:
201 log
.warn("Failed to send a list response:%s", exc
)
204 log
.warn("Unknown key, no action will be performed.")
208 def configure_metric(self
, endpoint
, auth_token
, values
):
209 """Create the new metric in Gnocchi."""
211 resource_id
= values
['resource_uuid']
213 log
.warn("Resource is not defined correctly.")
214 return None, None, False
216 # Check/Normalize metric name
217 metric_name
, norm_name
= self
.get_metric_name(values
)
218 if norm_name
is None:
219 log
.warn("This metric is not supported by this plugin.")
220 return None, resource_id
, False
222 # Check for an existing metric for this resource
223 metric_id
= self
.get_metric_id(
224 endpoint
, auth_token
, metric_name
, resource_id
)
226 if metric_id
is None:
227 # Try appending metric to existing resource
229 base_url
= "{}/v1/resource/generic/%s/metric"
230 res_url
= base_url
.format(endpoint
) % resource_id
231 payload
= {metric_name
: {'archive_policy_name': 'high',
232 'unit': values
['metric_unit']}}
233 result
= self
._common
._perform
_request
(
234 res_url
, auth_token
, req_type
="post",
235 payload
=json
.dumps(payload
))
236 # Get id of newly created metric
237 for row
in json
.loads(result
.text
):
238 if row
['name'] == metric_name
:
239 metric_id
= row
['id']
240 log
.info("Appended metric to existing resource.")
242 return metric_id
, resource_id
, True
243 except Exception as exc
:
244 # Gnocchi version of resource does not exist creating a new one
245 log
.info("Failed to append metric to existing resource:%s",
248 url
= "{}/v1/resource/generic".format(endpoint
)
249 metric
= {'name': metric_name
,
250 'archive_policy_name': 'high',
251 'unit': values
['metric_unit'], }
253 resource_payload
= json
.dumps({'id': resource_id
,
255 metric_name
: metric
}})
257 resource
= self
._common
._perform
_request
(
258 url
, auth_token
, req_type
="post",
259 payload
=resource_payload
)
261 # Return the newly created resource_id for creating alarms
262 new_resource_id
= json
.loads(resource
.text
)['id']
263 log
.info("Created new resource for metric: %s",
266 metric_id
= self
.get_metric_id(
267 endpoint
, auth_token
, metric_name
, new_resource_id
)
269 return metric_id
, new_resource_id
, True
270 except Exception as exc
:
271 log
.warn("Failed to create a new resource:%s", exc
)
272 return None, None, False
275 log
.info("This metric already exists for this resource.")
277 return metric_id
, resource_id
, False
279 def delete_metric(self
, endpoint
, auth_token
, metric_id
):
281 url
= "{}/v1/metric/%s".format(endpoint
) % (metric_id
)
284 result
= self
._common
._perform
_request
(
285 url
, auth_token
, req_type
="delete")
286 if str(result
.status_code
) == "404":
287 log
.warn("Failed to delete the metric.")
291 except Exception as exc
:
292 log
.warn("Failed to carry out delete metric request:%s", exc
)
295 def list_metrics(self
, endpoint
, auth_token
, values
):
296 """List all metrics."""
297 url
= "{}/v1/metric/".format(endpoint
)
299 # Check for a specified list
301 # Check if the metric_name was specified for the list
302 metric_name
= values
['metric_name'].lower()
303 if metric_name
not in METRIC_MAPPINGS
.keys():
304 log
.warn("This metric is not supported, won't be listed.")
306 except KeyError as exc
:
307 log
.info("Metric name is not specified: %s", exc
)
311 resource
= values
['resource_uuid']
312 except KeyError as exc
:
313 log
.info("Resource is not specified:%s", exc
)
317 result
= self
._common
._perform
_request
(
318 url
, auth_token
, req_type
="get")
319 metrics
= json
.loads(result
.text
)
321 if metrics
is not None:
322 # Format the list response
323 if metric_name
is not None and resource
is not None:
324 metric_list
= self
.response_list(
325 metrics
, metric_name
=metric_name
, resource
=resource
)
326 log
.info("Returning an %s resource list for %s metrics",
327 metric_name
, resource
)
328 elif metric_name
is not None:
329 metric_list
= self
.response_list(
330 metrics
, metric_name
=metric_name
)
331 log
.info("Returning a list of %s metrics", metric_name
)
332 elif resource
is not None:
333 metric_list
= self
.response_list(
334 metrics
, resource
=resource
)
335 log
.info("Return a list of %s resource metrics", resource
)
337 metric_list
= self
.response_list(metrics
)
338 log
.info("Returning a complete list of metrics")
342 log
.info("There are no metrics available")
344 except Exception as exc
:
345 log
.warn("Failed to generate any metric list. %s", exc
)
348 def get_metric_id(self
, endpoint
, auth_token
, metric_name
, resource_id
):
349 """Check if the desired metric already exists for the resource."""
350 url
= "{}/v1/resource/generic/%s".format(endpoint
) % resource_id
353 # Try return the metric id if it exists
354 result
= self
._common
._perform
_request
(
355 url
, auth_token
, req_type
="get")
356 return json
.loads(result
.text
)['metrics'][metric_name
]
358 log
.info("Metric doesn't exist. No metric_id available")
361 def get_metric_name(self
, values
):
362 """Check metric name configuration and normalize."""
364 # Normalize metric name
365 metric_name
= values
['metric_name'].lower()
366 return metric_name
, METRIC_MAPPINGS
[metric_name
]
368 log
.info("Metric name %s is invalid.", metric_name
)
369 return metric_name
, None
371 def read_metric_data(self
, endpoint
, auth_token
, values
):
372 """Collectd metric measures over a specified time period."""
376 # Try and collect measures
377 metric_id
= values
['metric_uuid']
378 collection_unit
= values
['collection_unit'].upper()
379 collection_period
= values
['collection_period']
381 # Define the start and end time based on configurations
382 stop_time
= time
.strftime("%Y-%m-%d") + "T" + time
.strftime("%X")
383 end_time
= int(round(time
.time() * 1000))
384 if collection_unit
== 'YEAR':
385 diff
= PERIOD_MS
[collection_unit
]
387 diff
= collection_period
* PERIOD_MS
[collection_unit
]
388 s_time
= (end_time
- diff
) / 1000.0
389 start_time
= datetime
.datetime
.fromtimestamp(s_time
).strftime(
390 '%Y-%m-%dT%H:%M:%S.%f')
391 base_url
= "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
392 url
= base_url
.format(endpoint
) % {
393 "0": metric_id
, "1": start_time
, "2": stop_time
}
395 # Perform metric data request
396 metric_data
= self
._common
._perform
_request
(
397 url
, auth_token
, req_type
="get")
399 # Generate a list of the requested timestamps and data
400 for r
in json
.loads(metric_data
.text
):
401 timestamp
= r
[0].replace("T", " ")
402 timestamps
.append(timestamp
)
405 return timestamps
, data
406 except Exception as exc
:
407 log
.warn("Failed to gather specified measures: %s", exc
)
408 return timestamps
, data
410 def response_list(self
, metric_list
, metric_name
=None, resource
=None):
411 """Create the appropriate lists for a list response."""
412 resp_list
, name_list
, res_list
= [], [], []
414 # Create required lists
415 for row
in metric_list
:
416 # Only list OSM metrics
417 if row
['name'] in METRIC_MAPPINGS
.keys():
418 metric
= {"metric_name": row
['name'],
419 "metric_uuid": row
['id'],
420 "metric_unit": row
['unit'],
421 "resource_uuid": row
['resource_id']}
422 resp_list
.append(str(metric
))
423 # Generate metric_name specific list
424 if metric_name
is not None:
425 if row
['name'] == metric_name
:
426 metric
= {"metric_name": row
['name'],
427 "metric_uuid": row
['id'],
428 "metric_unit": row
['unit'],
429 "resource_uuid": row
['resource_id']}
430 name_list
.append(str(metric
))
431 # Generate resource specific list
432 if resource
is not None:
433 if row
['resource_id'] == resource
:
434 metric
= {"metric_name": row
['name'],
435 "metric_uuid": row
['id'],
436 "metric_unit": row
['unit'],
437 "resource_uuid": row
['resource_id']}
438 res_list
.append(str(metric
))
440 # Join required lists
441 if metric_name
is not None and resource
is not None:
442 return list(set(res_list
).intersection(name_list
))
443 elif metric_name
is not None:
445 elif resource
is not None:
446 return list(set(res_list
).intersection(resp_list
))