Merge "Initial CloudWatch-boto plugin for MON"
[osm/MON.git] / plugins / OpenStack / Gnocchi / metrics.py
1 """Gnocchi acts on a metric message received from the SO via MON."""
2
3 import json
4 import logging as log
5
6 from kafka import KafkaConsumer
7
8 from plugins.OpenStack.common import Common
9
10
11 class Metrics(object):
12 """Gnocchi based metric actions performed on info from MON."""
13
14 def __init__(self):
15 """Initialize the metric actions."""
16 self._common = Common()
17
18 # TODO(mcgoughh): Initialize a generic consumer object to consume
19 # message from the SO. This is hardcoded for now
20 server = {'server': 'localhost:9092', 'topic': 'metrics'}
21 self._consumer = KafkaConsumer(server['topic'],
22 group_id='my-group',
23 bootstrap_servers=server['server'])
24
25 # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
26
27 def metric_calls(self):
28 """Consume info from the message bus to manage metrics."""
29 # Concumer check for metric messages
30 for message in self._consumer:
31
32 if message.topic == "metrics":
33 log.info("Metric action required on this topic: %s",
34 (message.topic))
35
36 if message.key == "configure_metric":
37 # Configure/Update a resource and its metric
38 values = json.loads(message.value)
39 schema = values['configure_metrics']
40 metric_details = schema['metrics_configuration']
41
42 # Generate authentication credentials via keystone:
43 # auth_token, endpoint
44 auth_token = self._common._authenticate(
45 schema['tenant_uuid'])
46 endpoint = self._common.get_endpoint("metric")
47
48 metric_id = self.configure_metric(
49 endpoint, auth_token, metric_details)
50 log.info("New metric created with metricID: %s", metric_id)
51
52 # TODO(mcgoughh): will send an acknowledge message back on
53 # the bus via the producer
54
55 # TODO(mcoughh): Key alternatives are "metric_data_request" and
56 # "metric_data_response" will be accomodated later
57 # Will also need a producer for this functionality
58 elif message.key == "metric_data_request":
59 log.debug("Key used to request a metrics data")
60
61 elif message.key == "metric_data_response":
62 log.debug("Key used for a metrics data response")
63
64 else:
65 log.debug("Unknown key, no action will be performed")
66
67 else:
68 log.info("Message topic not relevant to this plugin: %s",
69 message.topic)
70
71 return
72
73 def configure_metric(self, endpoint, auth_token, values):
74 """Create the new SO desired metric in Gnocchi."""
75 metric_id = None
76
77 # TODO(mcgoughh): error check the values sent in the message
78 # will query the database for the request resource and then
79 # check that resource for the desired metric
80 metric_name = values['metric_name']
81
82 if metric_id is None:
83
84 # Need to create a new version of the resource for gnocchi to
85 # the new metric
86 resource_url = "{}/v1/resource/generic".format(endpoint)
87
88 metric = {'name': metric_name,
89 'unit': values['metric_unit'], }
90
91 resource_payload = json.dumps({'id': values['resource_uuid'],
92 'metrics': {metric_name: metric}})
93
94 new_resource = self._common._perform_request(
95 resource_url, auth_token,
96 req_type="post", payload=resource_payload)
97 new_metric = json.loads(new_resource.text)['metrics']
98
99 return new_metric[metric_name]
100 else:
101 return metric_id
102
103 def delete_metric(self, endpoint, auth_token, metric_id):
104 """Delete metric."""
105 url = "{}/v1/metric/%s".format(endpoint) % (metric_id)
106
107 self._common._perform_request(url, auth_token, req_type="delete")
108 return None
109
110 def list_metrics(self, endpoint, auth_token):
111 """List all metrics."""
112 url = "{}/v1/metric/".format(endpoint)
113
114 metric_list = self._common._perform_request(
115 url, auth_token, req_type="get")
116 return json.loads(metric_list.text)