1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
22 """Carry out OpenStack metric requests via Gnocchi API."""
29 from core
.message_bus
.producer
import KafkaProducer
31 from kafka
import KafkaConsumer
33 from plugins
.OpenStack
.common
import Common
34 from plugins
.OpenStack
.response
import OpenStack_Response
36 __author__
= "Helena McGough"
39 "AVERAGE_MEMORY_UTILIZATION": "memory.percent",
40 "DISK_READ_OPS": "disk.disk_ops",
41 "DISK_WRITE_OPS": "disk.disk_ops",
42 "DISK_READ_BYTES": "disk.disk_octets",
43 "DISK_WRITE_BYTES": "disk.disk_octets",
44 "PACKETS_DROPPED": "interface.if_dropped",
45 "PACKETS_RECEIVED": "interface.if_packets",
46 "PACKETS_SENT": "interface.if_packets",
47 "CPU_UTILIZATION": "cpu.percent",
59 class Metrics(object):
60 """OpenStack metric requests performed via the Gnocchi API."""
63 """Initialize the metric actions."""
64 self
._common
= Common()
66 # TODO(mcgoughh): Initialize a generic consumer object to consume
67 # message from the SO. This is hardcoded for now
68 server
= {'server': 'localhost:9092', 'topic': 'metric_request'}
69 self
._consumer
= KafkaConsumer(server
['topic'],
71 bootstrap_servers
=server
['server'])
73 # Use the Response class to generate valid json response messages
74 self
._response
= OpenStack_Response()
76 # Initializer a producer to send responses back to SO
77 self
._producer
= KafkaProducer("metric_response")
79 def metric_calls(self
):
80 """Consume info from the message bus to manage metric requests."""
81 # Consumer check for metric messages
82 for message
in self
._consumer
:
83 # Check if this plugin should carry out this request
84 values
= json
.loads(message
.value
)
85 vim_type
= values
['vim_type'].lower()
87 if vim_type
== "openstack":
88 # Generate auth_token and endpoint
89 auth_token
, endpoint
= self
.authenticate(values
)
91 if message
.key
== "create_metric_request":
93 metric_details
= values
['metric_create']
94 metric_id
, resource_id
, status
= self
.configure_metric(
95 endpoint
, auth_token
, metric_details
)
97 # Generate and send a create metric response
99 resp_message
= self
._response
.generate_response(
100 'create_metric_response', status
=status
,
101 cor_id
=values
['correlation_id'],
102 metric_id
=metric_id
, r_id
=resource_id
)
103 self
._producer
.create_metrics_resp(
104 'create_metric_response', resp_message
,
106 except Exception as exc
:
107 log
.warn("Failed to create response: %s", exc
)
109 elif message
.key
== "read_metric_data_request":
110 # Read all metric data related to a specified metric
111 timestamps
, metric_data
= self
.read_metric_data(
112 endpoint
, auth_token
, values
)
114 # Generate and send a response message
116 resp_message
= self
._response
.generate_response(
117 'read_metric_data_response',
118 m_id
=values
['metric_uuid'],
119 m_name
=values
['metric_name'],
120 r_id
=values
['resource_uuid'],
121 cor_id
=values
['correlation_id'],
122 times
=timestamps
, metrics
=metric_data
)
123 self
._producer
.read_metric_data_response(
124 'read_metric_data_response', resp_message
,
126 except Exception as exc
:
127 log
.warn("Failed to send read metric response:%s", exc
)
129 elif message
.key
== "delete_metric_request":
130 # delete the specified metric in the request
131 metric_id
= values
['metric_uuid']
132 status
= self
.delete_metric(
133 endpoint
, auth_token
, metric_id
)
135 # Generate and send a response message
137 resp_message
= self
._response
.generate_response(
138 'delete_metric_response', m_id
=metric_id
,
139 m_name
=values
['metric_name'],
140 status
=status
, r_id
=values
['resource_uuid'],
141 cor_id
=values
['correlation_id'])
142 self
._producer
.delete_metric_response(
143 'delete_metric_response', resp_message
,
145 except Exception as exc
:
146 log
.warn("Failed to send delete response:%s", exc
)
148 elif message
.key
== "update_metric_request":
149 # Gnocchi doesn't support configuration updates
150 # Log and send a response back to this effect
151 log
.warn("Gnocchi doesn't support metric configuration\
153 req_details
= values
['metric_create']
154 metric_name
= req_details
['metric_name']
155 resource_id
= req_details
['resource_uuid']
156 metric_id
= self
.get_metric_id(
157 endpoint
, auth_token
, metric_name
, resource_id
)
159 # Generate and send a response message
161 resp_message
= self
._response
.generate_response(
162 'update_metric_response', status
=False,
163 cor_id
=values
['correlation_id'],
164 r_id
=resource_id
, m_id
=metric_id
)
165 self
._producer
.update_metric_response(
166 'update_metric_response', resp_message
,
168 except Exception as exc
:
169 log
.warn("Failed to send an update response:%s", exc
)
171 elif message
.key
== "list_metric_request":
172 list_details
= values
['metrics_list_request']
174 metric_list
= self
.list_metrics(
175 endpoint
, auth_token
, list_details
)
177 # Generate and send a response message
179 resp_message
= self
._response
.generate_response(
180 'list_metric_response', m_list
=metric_list
,
181 cor_id
=list_details
['correlation_id'])
182 self
._producer
.list_metric_response(
183 'list_metric_response', resp_message
,
185 except Exception as exc
:
186 log
.warn("Failed to send a list response:%s", exc
)
189 log
.warn("Unknown key, no action will be performed.")
191 log
.debug("Message is not for this OpenStack.")
195 def configure_metric(self
, endpoint
, auth_token
, values
):
196 """Create the new metric in Gnocchi."""
198 resource_id
= values
['resource_uuid']
200 log
.warn("Resource is not defined correctly.")
201 return None, None, False
203 # Check/Normalize metric name
204 metric_name
, norm_name
= self
.get_metric_name(values
)
205 if norm_name
is None:
206 log
.warn("This metric is not supported by this plugin.")
207 return None, resource_id
, False
209 # Check for an existing metric for this resource
210 metric_id
= self
.get_metric_id(
211 endpoint
, auth_token
, metric_name
, resource_id
)
213 if metric_id
is None:
214 # Need to create a new version of the resource for gnocchi to
215 # create the new metric based on that resource
216 url
= "{}/v1/resource/generic".format(endpoint
)
218 # Try to create a new resource for the new metric
219 metric
= {'name': metric_name
,
220 'archive_policy_name': 'high',
221 'unit': values
['metric_unit'], }
223 resource_payload
= json
.dumps({'id': resource_id
,
225 metric_name
: metric
}})
227 new_resource
= self
._common
._perform
_request
(
228 url
, auth_token
, req_type
="post", payload
=resource_payload
)
230 resource_id
= json
.loads(new_resource
.text
)['id']
231 except Exception as exc
:
232 # Append new metric to existing resource
233 log
.debug("This resource already exists:%s, appending metric.",
235 base_url
= "{}/v1/resource/generic/%s/metric"
236 res_url
= base_url
.format(endpoint
) % resource_id
237 payload
= {metric_name
: {'archive_policy_name': 'high',
238 'unit': values
['metric_unit']}}
239 self
._common
._perform
_request
(
240 res_url
, auth_token
, req_type
="post",
241 payload
=json
.dumps(payload
))
243 metric_id
= self
.get_metric_id(
244 endpoint
, auth_token
, metric_name
, resource_id
)
245 return metric_id
, resource_id
, True
248 log
.debug("This metric already exists for this resource.")
250 return metric_id
, resource_id
, False
252 def delete_metric(self
, endpoint
, auth_token
, metric_id
):
254 url
= "{}/v1/metric/%s".format(endpoint
) % (metric_id
)
257 result
= self
._common
._perform
_request
(
258 url
, auth_token
, req_type
="delete")
259 if str(result
.status_code
) == "404":
260 log
.warn("Failed to delete the metric.")
264 except Exception as exc
:
265 log
.warn("Failed to carry out delete metric request:%s", exc
)
268 def list_metrics(self
, endpoint
, auth_token
, values
):
269 """List all metrics."""
270 url
= "{}/v1/metric/".format(endpoint
)
273 # Check if the metric_name was specified for the list
274 metric_name
= values
['metric_name']
275 result
= self
._common
._perform
_request
(
276 url
, auth_token
, req_type
="get")
277 metric_list
= json
.loads(result
.text
)
279 # Format the list response
280 metrics
= self
.response_list(
281 metric_list
, metric_name
=metric_name
)
284 log
.debug("Metric name is not specified for this list.")
287 # Check if a resource_id was specified
288 resource_id
= values
['resource_uuid']
289 result
= self
._common
._perform
_request
(
290 url
, auth_token
, req_type
="get")
291 metric_list
= json
.loads(result
.text
)
292 # Format the list response
293 metrics
= self
.response_list(
294 metric_list
, resource
=resource_id
)
297 log
.debug("Resource id not specificed either, will return a\
300 result
= self
._common
._perform
_request
(
301 url
, auth_token
, req_type
="get")
302 metric_list
= json
.loads(result
.text
)
303 # Format the list response
304 metrics
= self
.response_list(metric_list
)
307 except Exception as exc
:
308 log
.warn("Failed to generate any metric list. %s", exc
)
311 def get_metric_id(self
, endpoint
, auth_token
, metric_name
, resource_id
):
312 """Check if the desired metric already exists for the resource."""
313 url
= "{}/v1/resource/generic/%s".format(endpoint
) % resource_id
316 # Try return the metric id if it exists
317 result
= self
._common
._perform
_request
(
318 url
, auth_token
, req_type
="get")
319 return json
.loads(result
.text
)['metrics'][metric_name
]
321 log
.debug("Metric doesn't exist. No metric_id available")
324 def get_metric_name(self
, values
):
325 """Check metric name configuration and normalize."""
327 # Normalize metric name
328 metric_name
= values
['metric_name']
329 return metric_name
, METRIC_MAPPINGS
[metric_name
]
331 log
.warn("Metric name %s is invalid.", metric_name
)
332 return metric_name
, None
334 def read_metric_data(self
, endpoint
, auth_token
, values
):
335 """Collectd metric measures over a specified time period."""
339 # Try and collect measures
340 metric_id
= values
['metric_uuid']
341 collection_unit
= values
['collection_unit'].upper()
342 collection_period
= values
['collection_period']
344 # Define the start and end time based on configurations
345 stop_time
= time
.strftime("%Y-%m-%d") + "T" + time
.strftime("%X")
346 end_time
= int(round(time
.time() * 1000))
347 if collection_unit
== 'YEAR':
348 diff
= PERIOD_MS
[collection_unit
]
350 diff
= collection_period
* PERIOD_MS
[collection_unit
]
351 s_time
= (end_time
- diff
)/1000.0
352 start_time
= datetime
.datetime
.fromtimestamp(s_time
).strftime(
353 '%Y-%m-%dT%H:%M:%S.%f')
354 base_url
= "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
355 url
= base_url
.format(endpoint
) % {
356 "0": metric_id
, "1": start_time
, "2": stop_time
}
358 # Perform metric data request
359 metric_data
= self
._common
._perform
_request
(
360 url
, auth_token
, req_type
="get")
362 # Generate a list of the requested timestamps and data
363 for r
in json
.loads(metric_data
.text
):
364 timestamp
= r
[0].replace("T", " ")
365 timestamps
.append(timestamp
)
368 return timestamps
, data
369 except Exception as exc
:
370 log
.warn("Failed to gather specified measures: %s", exc
)
371 return timestamps
, data
373 def authenticate(self
, values
):
374 """Generate an authentication token and endpoint for metric request."""
376 # Check for a tenant_id
377 auth_token
= self
._common
._authenticate
(
378 tenant_id
=values
['tenant_uuid'])
379 endpoint
= self
._common
.get_endpoint("metric")
381 log
.warn("Tenant ID is not specified. Will use a generic\
383 auth_token
= self
._common
._authenticate
()
384 endpoint
= self
._common
.get_endpoint("metric")
386 return auth_token
, endpoint
388 def response_list(self
, metric_list
, metric_name
=None, resource
=None):
389 """Create the appropriate lists for a list response."""
392 for row
in metric_list
:
393 if metric_name
is not None:
394 if row
['name'] == metric_name
:
395 metric
= {"metric_name": row
['name'],
396 "metric_uuid": row
['id'],
397 "metric_unit": row
['unit'],
398 "resource_uuid": row
['resource_id']}
399 resp_list
.append(metric
)
400 elif resource
is not None:
401 if row
['resource_id'] == resource
:
402 metric
= {"metric_name": row
['name'],
403 "metric_uuid": row
['id'],
404 "metric_unit": row
['unit'],
405 "resource_uuid": row
['resource_id']}
406 resp_list
.append(metric
)
408 metric
= {"metric_name": row
['name'],
409 "metric_uuid": row
['id'],
410 "metric_unit": row
['unit'],
411 "resource_uuid": row
['resource_id']}
412 resp_list
.append(metric
)