Merge "Updated vROPs plugin for using Common plugin consumer"
[osm/MON.git] / osm_mon / plugins / vRealiseOps / plugin_receiver.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2016-2017 VMware Inc.
5 # This file is part of ETSI OSM
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: osslegalrouting@vmware.com
22 ##
23
24 """
25 Montoring plugin receiver that consumes the request messages &
26 responds using producer for vROPs
27 """
28
29 import sys
30 from mon_plugin_vrops import MonPlugin
31 from kafka_consumer_vrops import vROP_KafkaConsumer
32 #Core producer
33 sys.path.append("../../core/message_bus")
34 from producer import KafkaProducer
35 #from core.message_bus.producer import KafkaProducer
36 import json
37 import logging
38 import traceback
39 import os
40 from xml.etree import ElementTree as XmlElementTree
41
42 schema_version = "1.0"
43 req_config_params = ('vrops_site', 'vrops_user', 'vrops_password',
44 'vcloud-site','admin_username','admin_password',
45 'vcenter_ip','vcenter_port','vcenter_user','vcenter_password',
46 'vim_tenant_name','orgname','tenant_id')
47 MODULE_DIR = os.path.dirname(__file__)
48 CONFIG_FILE_NAME = 'vrops_config.xml'
49 CONFIG_FILE_PATH = os.path.join(MODULE_DIR, CONFIG_FILE_NAME)
50
51 def set_logger():
52 """Set Logger
53 """
54 BASE_DIR = os.path.dirname(os.path.dirname(__file__))
55 logger = logging.getLogger()
56 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
57 handler = logging.FileHandler(os.path.join(BASE_DIR,"mon_vrops_log.log"))
58 handler.setFormatter(formatter)
59 logger.addHandler(handler)
60
61
62 class PluginReceiver():
63 """MON Plugin receiver receiving request messages & responding using producer for vROPs
64 telemetry plugin
65 """
66 def __init__(self):
67 """Constructor of PluginReceiver
68 """
69
70
71 self.logger = logging.getLogger('PluginReceiver')
72 self.logger.setLevel(logging.DEBUG)
73 set_logger()
74
75 #Core producer
76 self.producer_alarms = KafkaProducer('alarm_response')
77 self.producer_metrics = KafkaProducer('metric_response')
78 self.producer_access_credentials = KafkaProducer('vim_access_credentials_response')
79
80
81 def consume(self, message):
82 """Consume the message, act on it & respond
83 """
84 try:
85 self.logger.info("Message received:\nTopic={}:{}:{}:\nKey={}\nValue={}"\
86 .format(message.topic, message.partition, message.offset, message.key, message.value))
87 message_values = json.loads(message.value)
88 self.logger.info("Action required for: {}".format(message.topic))
89 if message.topic == 'alarm_request':
90 if message.key == "create_alarm_request":
91 config_alarm_info = json.loads(message.value)
92 alarm_uuid = self.create_alarm(config_alarm_info['alarm_create_request'])
93 self.logger.info("Alarm created with alarm uuid: {}".format(alarm_uuid))
94 #Publish message using producer
95 self.publish_create_alarm_status(alarm_uuid, config_alarm_info)
96 elif message.key == "update_alarm_request":
97 update_alarm_info = json.loads(message.value)
98 alarm_uuid = self.update_alarm(update_alarm_info['alarm_update_request'])
99 self.logger.info("Alarm defination updated : alarm uuid: {}".format(alarm_uuid))
100 #Publish message using producer
101 self.publish_update_alarm_status(alarm_uuid, update_alarm_info)
102 elif message.key == "delete_alarm_request":
103 delete_alarm_info = json.loads(message.value)
104 alarm_uuid = self.delete_alarm(delete_alarm_info['alarm_delete_request'])
105 self.logger.info("Alarm defination deleted : alarm uuid: {}".format(alarm_uuid))
106 #Publish message using producer
107 self.publish_delete_alarm_status(alarm_uuid, delete_alarm_info)
108 elif message.key == "list_alarm_request":
109 request_input = json.loads(message.value)
110 triggered_alarm_list = self.list_alarms(request_input['alarm_list_request'])
111 #Publish message using producer
112 self.publish_list_alarm_response(triggered_alarm_list, request_input)
113 elif message.topic == 'metric_request':
114 if message.key == "read_metric_data_request":
115 metric_request_info = json.loads(message.value)
116 mon_plugin_obj = MonPlugin()
117 metrics_data = mon_plugin_obj.get_metrics_data(metric_request_info)
118 self.logger.info("Collected Metrics Data: {}".format(metrics_data))
119 #Publish message using producer
120 self.publish_metrics_data_status(metrics_data)
121 elif message.key == "create_metric_request":
122 metric_info = json.loads(message.value)
123 metric_status = self.verify_metric(metric_info['metric_create'])
124 #Publish message using producer
125 self.publish_create_metric_response(metric_info, metric_status)
126 elif message.key == "update_metric_request":
127 metric_info = json.loads(message.value)
128 metric_status = self.verify_metric(metric_info['metric_create'])
129 #Publish message using producer
130 self.publish_update_metric_response(metric_info, metric_status)
131 elif message.key == "delete_metric_request":
132 metric_info = json.loads(message.value)
133 #Deleting Metric Data is not allowed. Publish status as False
134 self.logger.warn("Deleting Metric is not allowed: {}".format(metric_info['metric_name']))
135 #Publish message using producer
136 self.publish_delete_metric_response(metric_info)
137 elif message.topic == 'access_credentials':
138 if message.key == "vim_access_credentials":
139 access_info = json.loads(message.value)
140 access_update_status = self.update_access_credentials(access_info['access_config'])
141 self.publish_access_update_response(access_update_status, access_info)
142
143 except:
144 self.logger.error("Exception in vROPs plugin receiver: {}".format(traceback.format_exc()))
145
146
147 def create_alarm(self, config_alarm_info):
148 """Create alarm using vROPs plugin
149 """
150 mon_plugin = MonPlugin()
151 plugin_uuid = mon_plugin.configure_rest_plugin()
152 alarm_uuid = mon_plugin.configure_alarm(config_alarm_info)
153 return alarm_uuid
154
155 def publish_create_alarm_status(self, alarm_uuid, config_alarm_info):
156 """Publish create alarm status using producer
157 """
158 topic = 'alarm_response'
159 msg_key = 'create_alarm_response'
160 response_msg = {"schema_version":schema_version,
161 "schema_type":"create_alarm_response",
162 "alarm_create_response":
163 {"correlation_id":config_alarm_info["alarm_create_request"]["correlation_id"],
164 "alarm_uuid":alarm_uuid,
165 "status": True if alarm_uuid else False
166 }
167 }
168 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
169 .format(topic, msg_key, response_msg))
170 #Core producer
171 self.producer_alarms.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
172
173 def update_alarm(self, update_alarm_info):
174 """Updare already created alarm
175 """
176 mon_plugin = MonPlugin()
177 alarm_uuid = mon_plugin.update_alarm_configuration(update_alarm_info)
178 return alarm_uuid
179
180 def publish_update_alarm_status(self, alarm_uuid, update_alarm_info):
181 """Publish update alarm status requests using producer
182 """
183 topic = 'alarm_response'
184 msg_key = 'update_alarm_response'
185 response_msg = {"schema_version":schema_version,
186 "schema_type":"update_alarm_response",
187 "alarm_update_response":
188 {"correlation_id":update_alarm_info["alarm_update_request"]["correlation_id"],
189 "alarm_uuid":update_alarm_info["alarm_update_request"]["alarm_uuid"] \
190 if update_alarm_info["alarm_update_request"].get('alarm_uuid') is not None else None,
191 "status": True if alarm_uuid else False
192 }
193 }
194 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
195 .format(topic, msg_key, response_msg))
196 #Core producer
197 self.producer_alarms.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
198
199 def delete_alarm(self, delete_alarm_info):
200 """Delete alarm configuration
201 """
202 mon_plugin = MonPlugin()
203 alarm_uuid = mon_plugin.delete_alarm_configuration(delete_alarm_info)
204 return alarm_uuid
205
206 def publish_delete_alarm_status(self, alarm_uuid, delete_alarm_info):
207 """Publish update alarm status requests using producer
208 """
209 topic = 'alarm_response'
210 msg_key = 'delete_alarm_response'
211 response_msg = {"schema_version":schema_version,
212 "schema_type":"delete_alarm_response",
213 "alarm_deletion_response":
214 {"correlation_id":delete_alarm_info["alarm_delete_request"]["correlation_id"],
215 "alarm_uuid":delete_alarm_info["alarm_delete_request"]["alarm_uuid"],
216 "status": True if alarm_uuid else False
217 }
218 }
219 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
220 .format(topic, msg_key, response_msg))
221 #Core producer
222 self.producer_alarms.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
223
224
225 def publish_metrics_data_status(self, metrics_data):
226 """Publish the requested metric data using producer
227 """
228 topic = 'metric_response'
229 msg_key = 'read_metric_data_response'
230 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
231 .format(topic, msg_key, metrics_data))
232 #Core producer
233 self.producer_metrics.publish(key=msg_key, value=json.dumps(metrics_data), topic=topic)
234
235
236 def verify_metric(self, metric_info):
237 """Verify if metric is supported or not
238 """
239 mon_plugin = MonPlugin()
240 metric_key_status = mon_plugin.verify_metric_support(metric_info)
241 return metric_key_status
242
243 def publish_create_metric_response(self, metric_info, metric_status):
244 """Publish create metric response
245 """
246 topic = 'metric_response'
247 msg_key = 'create_metric_response'
248 response_msg = {"schema_version":schema_version,
249 "schema_type":"create_metric_response",
250 "correlation_id":metric_info['correlation_id'],
251 "metric_create_response":
252 {
253 "metric_uuid":'0',
254 "resource_uuid":metric_info['metric_create']['resource_uuid'],
255 "status":metric_status
256 }
257 }
258 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
259 .format(topic, msg_key, response_msg))
260 #Core producer
261 self.producer_metrics.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
262
263 def publish_update_metric_response(self, metric_info, metric_status):
264 """Publish update metric response
265 """
266 topic = 'metric_response'
267 msg_key = 'update_metric_response'
268 response_msg = {"schema_version":schema_version,
269 "schema_type":"metric_update_response",
270 "correlation_id":metric_info['correlation_id'],
271 "metric_update_response":
272 {
273 "metric_uuid":'0',
274 "resource_uuid":metric_info['metric_create']['resource_uuid'],
275 "status":metric_status
276 }
277 }
278 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
279 .format(topic, msg_key, response_msg))
280 #Core producer
281 self.producer_metrics.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
282
283 def publish_delete_metric_response(self, metric_info):
284 """
285 """
286 topic = 'metric_response'
287 msg_key = 'delete_metric_response'
288 if metric_info.has_key('tenant_uuid') and metric_info['tenant_uuid'] is not None:
289 tenant_uuid = metric_info['tenant_uuid']
290 else:
291 tenant_uuid = None
292
293 response_msg = {"schema_version":schema_version,
294 "schema_type":"delete_metric_response",
295 "correlation_id":metric_info['correlation_id'],
296 "metric_name":metric_info['metric_name'],
297 "metric_uuid":'0',
298 "resource_uuid":metric_info['resource_uuid'],
299 "tenant_uuid":tenant_uuid,
300 "status":False
301 }
302 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
303 .format(topic, msg_key, response_msg))
304 #Core producer
305 self.producer_metrics.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
306
307 def list_alarms(self, list_alarm_input):
308 """Collect list of triggered alarms based on input
309 """
310 mon_plugin = MonPlugin()
311 triggered_alarms = mon_plugin.get_triggered_alarms_list(list_alarm_input)
312 return triggered_alarms
313
314
315 def publish_list_alarm_response(self, triggered_alarm_list, list_alarm_input):
316 """Publish list of triggered alarms
317 """
318 topic = 'alarm_response'
319 msg_key = 'list_alarm_response'
320 response_msg = {"schema_version":schema_version,
321 "schema_type":"list_alarm_response",
322 "correlation_id":list_alarm_input['alarm_list_request']['correlation_id'],
323 "list_alarm_resp":triggered_alarm_list
324 }
325 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
326 .format(topic, msg_key, response_msg))
327 #Core producer
328 self.producer_alarms.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
329
330
331 def update_access_credentials(self, access_info):
332 """Verify if all the required access config params are provided and
333 updates access config in default vrops config file
334 """
335 update_status = False
336 wr_status = False
337 #Check if all the required config params are passed in request
338 if not all (keys in access_info for keys in req_config_params):
339 self.logger.debug("All required Access Config Parameters not provided")
340 self.logger.debug("List of required Access Config Parameters: {}".format(req_config_params))
341 self.logger.debug("List of given Access Config Parameters: {}".format(access_info))
342 return update_status
343
344 wr_status = self.write_access_config(access_info)
345 return wr_status #True/False
346
347 def write_access_config(self, access_info):
348 """Write access configuration to vROPs config file.
349 """
350 wr_status = False
351 try:
352 tree = XmlElementTree.parse(CONFIG_FILE_PATH)
353 root = tree.getroot()
354 alarmParams = {}
355 for config in root:
356 if config.tag == 'Access_Config':
357 for param in config:
358 for key,val in access_info.iteritems():
359 if param.tag == key:
360 #print param.tag, val
361 param.text = val
362
363 tree.write(CONFIG_FILE_PATH)
364 wr_status = True
365 except Exception as exp:
366 self.logger.warn("Failed to update Access Config Parameters: {}".format(exp))
367
368 return wr_status
369
370
371 def publish_access_update_response(self, access_update_status, access_info_req):
372 """Publish access update response
373 """
374 topic = 'access_credentials'
375 msg_key = 'vim_access_credentials_response'
376 response_msg = {"schema_version":schema_version,
377 "schema_type":"vim_access_credentials_response",
378 "correlation_id":access_info_req['access_config']['correlation_id'],
379 "status":access_update_status
380 }
381 self.logger.info("Publishing response:\nTopic={}\nKey={}\nValue={}"\
382 .format(topic, msg_key, response_msg))
383 #Core Add producer
384 self.producer_access_credentials.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
385
386 """
387 def main():
388 #log.basicConfig(filename='mon_vrops_log.log',level=log.DEBUG)
389 set_logger()
390 plugin_rcvr = PluginReceiver()
391 plugin_rcvr.consume()
392
393 if __name__ == "__main__":
394 main()
395 """