Merge "Changes in vROPs Plugin. 1.Added specifications for Create, Update, Delete...
[osm/MON.git] / plugins / vRealiseOps / plugin_receiver.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2016-2017 VMware Inc.
5 # This file is part of ETSI OSM
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: osslegalrouting@vmware.com
22 ##
23
24 """
25 Montoring plugin receiver that consumes the request messages &
26 responds using producer for vROPs
27 """
28
29 from mon_plugin_vrops import MonPlugin
30 from kafka_consumer_vrops import vROP_KafkaConsumer
31 #Core producer
32 from core.message_bus.producer import KafkaProducer
33 import json
34 import logging as log
35 import traceback
36
37 class PluginReceiver():
38 """MON Plugin receiver receiving request messages & responding using producer for vROPs
39 telemetry plugin
40 """
41 def __init__(self):
42 """Constructor of PluginReceiver
43 """
44
45 topics = ['alarm_request', 'metric_request', 'Access_Credentials', 'alarm_response']
46 #To Do - Add broker uri
47 broker_uri = None
48 self.mon_plugin = MonPlugin()
49 self.consumer = vROP_KafkaConsumer(topics, broker_uri)
50 #Core producer
51 self.producer = KafkaProducer()
52
53 def consume(self):
54 """Consume the message, act on it & respond
55 """
56 try:
57 for message in self.consumer.vrops_consumer:
58 message_values = json.loads(message.value)
59 if message_values.has_key('vim_type'):
60 vim_type = message_values['vim_type'].lower()
61 if vim_type == 'vmware':
62 log.info("Action required for: {}".format(message.topic))
63 if message.topic == 'alarm_request':
64 if message.key == "create_alarm_request":
65 config_alarm_info = json.loads(message.value)
66 alarm_uuid = self.create_alarm(config_alarm_info['alarm_creation_request'])
67 log.info("Alarm created with alarm uuid: {}".format(alarm_uuid))
68 #Publish message using producer
69 self.publish_create_alarm_status(alarm_uuid, config_alarm_info)
70 elif message.key == "update_alarm_request":
71 update_alarm_info = json.loads(message.value)
72 alarm_uuid = self.update_alarm(update_alarm_info['alarm_update_request'])
73 log.info("In plugin_receiver: Alarm defination updated : alarm uuid: {}".format(alarm_uuid))
74 #Publish message using producer
75 self.publish_update_alarm_status(alarm_uuid, update_alarm_info)
76 elif message.key == "delete_alarm_request":
77 delete_alarm_info = json.loads(message.value)
78 alarm_uuid = self.delete_alarm(delete_alarm_info['alarm_deletion_request'])
79 log.info("In plugin_receiver: Alarm defination deleted : alarm uuid: {}".format(alarm_uuid))
80 #Publish message using producer
81 self.publish_delete_alarm_status(alarm_uuid, delete_alarm_info)
82 elif message.key == "list_alarm_request":
83 request_input = json.loads(message.value)
84 triggered_alarm_list = self.list_alarms(request_input['alarm_list_request'])
85 #Publish message using producer
86 self.publish_list_alarm_response(triggered_alarm_list, request_input)
87 elif message.topic == 'metric_request':
88 if message.key == "read_metric_data_request":
89 metric_request_info = json.loads(message.value)
90 metrics_data = self.mon_plugin.get_metrics_data(metric_request_info)
91 log.info("Collected Metrics Data: {}".format(metrics_data))
92 #Publish message using producer
93 self.publish_metrics_data_status(metrics_data)
94 elif message.key == "create_metric_request":
95 metric_info = json.loads(message.value)
96 metric_status = self.verify_metric(metric_info['metric_create'])
97 #Publish message using producer
98 self.publish_create_metric_response(metric_info, metric_status)
99 elif message.key == "update_metric_request":
100 metric_info = json.loads(message.value)
101 metric_status = self.verify_metric(metric_info['metric_create'])
102 #Publish message using producer
103 self.publish_update_metric_response(metric_info, metric_status)
104 elif message.key == "delete_metric_request":
105 metric_info = json.loads(message.value)
106 #Deleting Metric Data is not allowed. Publish status as False
107 log.warn("Deleting Metric is not allowed: {}".format(metric_info['metric_name']))
108 #Publish message using producer
109 self.publish_delete_metric_response(metric_info)
110
111 except Exception as exp:
112 log.error("Exception in receiver: {} {}".format(exp, traceback.format_exc()))
113
114 def create_alarm(self, config_alarm_info):
115 """Create alarm using vROPs plugin
116 """
117 plugin_uuid = self.mon_plugin.configure_rest_plugin()
118 alarm_uuid = self.mon_plugin.configure_alarm(config_alarm_info)
119 return alarm_uuid
120
121 def publish_create_alarm_status(self, alarm_uuid, config_alarm_info):
122 """Publish create alarm status using producer
123 """
124 topic = 'alarm_response'
125 msg_key = 'create_alarm_response'
126 response_msg = {"schema_version":1.0,
127 "schema_type":"create_alarm_response",
128 "alarm_creation_response":
129 {"correlation_id":config_alarm_info["alarm_creation_request"]["correlation_id"],
130 "alarm_uuid":alarm_uuid,
131 "status": True if alarm_uuid else False
132 }
133 }
134 #Core producer
135 self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
136
137 def update_alarm(self, update_alarm_info):
138 """Updare already created alarm
139 """
140 alarm_uuid = self.mon_plugin.update_alarm_configuration(update_alarm_info)
141 return alarm_uuid
142
143 def publish_update_alarm_status(self, alarm_uuid, update_alarm_info):
144 """Publish update alarm status requests using producer
145 """
146 topic = 'alarm_response'
147 msg_key = 'update_alarm_response'
148 response_msg = {"schema_version":1.0,
149 "schema_type":"update_alarm_response",
150 "alarm_update_response":
151 {"correlation_id":update_alarm_info["alarm_update_request"]["correlation_id"],
152 "alarm_uuid":alarm_uuid,
153 "status": True if alarm_uuid else False
154 }
155 }
156 #Core producer
157 self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
158
159 def delete_alarm(self, delete_alarm_info):
160 """Delete alarm configuration
161 """
162 alarm_uuid = self.mon_plugin.delete_alarm_configuration(delete_alarm_info)
163 return alarm_uuid
164
165 def publish_delete_alarm_status(self, alarm_uuid, delete_alarm_info):
166 """Publish update alarm status requests using producer
167 """
168 topic = 'alarm_response'
169 msg_key = 'delete_alarm_response'
170 response_msg = {"schema_version":1.0,
171 "schema_type":"delete_alarm_response",
172 "alarm_deletion_response":
173 {"correlation_id":delete_alarm_info["alarm_deletion_request"]["correlation_id"],
174 "alarm_uuid":alarm_uuid,
175 "status": True if alarm_uuid else False
176 }
177 }
178 #Core producer
179 self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
180
181
182 def publish_metrics_data_status(self, metrics_data):
183 """Publish the requested metric data using producer
184 """
185 topic = 'metric_response'
186 msg_key = 'read_metric_data_response'
187 #Core producer
188 self.producer.publish(key=msg_key, value=json.dumps(metrics_data), topic=topic)
189
190
191 def verify_metric(self, metric_info):
192 """Verify if metric is supported or not
193 """
194 metric_key_status = self.mon_plugin.verify_metric_support(metric_info)
195 return metric_key_status
196
197 def publish_create_metric_response(self, metric_info, metric_status):
198 """Publish create metric response
199 """
200 topic = 'metric_response'
201 msg_key = 'create_metric_response'
202 response_msg = {"schema_version":1.0,
203 "schema_type":"create_metric_response",
204 "correlation_id":metric_info['correlation_id'],
205 "metric_create_response":
206 {
207 "metric_uuid":0,
208 "resource_uuid":metric_info['metric_create']['resource_uuid'],
209 "status":metric_status
210 }
211 }
212 #Core producer
213 self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
214
215 def publish_update_metric_response(self, metric_info, metric_status):
216 """Publish update metric response
217 """
218 topic = 'metric_response'
219 msg_key = 'update_metric_response'
220 response_msg = {"schema_version":1.0,
221 "schema_type":"metric_update_response",
222 "correlation_id":metric_info['correlation_id'],
223 "metric_update_response":
224 {
225 "metric_uuid":0,
226 "resource_uuid":metric_info['metric_create']['resource_uuid'],
227 "status":metric_status
228 }
229 }
230 #Core producer
231 self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
232
233 def publish_delete_metric_response(self, metric_info):
234 """
235 """
236 topic = 'metric_response'
237 msg_key = 'delete_metric_response'
238 response_msg = {"schema_version":1.0,
239 "schema_type":"delete_metric_response",
240 "correlation_id":metric_info['correlation_id'],
241 "metric_name":metric_info['metric_name'],
242 "metric_uuid":0,
243 "resource_uuid":metric_info['resource_uuid'],
244 "tenant_uuid":metric_info['tenant_uuid'],
245 "status":False
246 }
247 #Core producer
248 self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
249
250 def list_alarms(self, list_alarm_input):
251 """
252 """
253 triggered_alarms = self.mon_plugin.get_triggered_alarms_list(list_alarm_input)
254 return triggered_alarms
255
256
257 def publish_list_alarm_response(self, triggered_alarm_list, list_alarm_input):
258 """
259 """
260 topic = 'alarm_response'
261 msg_key = 'list_alarm_response'
262 response_msg = {"schema_version":1.0,
263 "schema_type":"list_alarm_response",
264 "correlation_id":list_alarm_input['alarm_list_request']['correlation_id'],
265 "resource_uuid":list_alarm_input['alarm_list_request']['resource_uuid'],
266 "list_alarm_resp":triggered_alarm_list
267 }
268 #Core producer
269 self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
270
271
272 def main():
273 log.basicConfig(filename='mon_vrops_log.log',level=log.DEBUG)
274 plugin_rcvr = PluginReceiver()
275 plugin_rcvr.consume()
276
277 if __name__ == "__main__":
278 main()
279