1 # -*- coding: utf-8 -*-
4 # Copyright 2016-2017 VMware Inc.
5 # This file is part of ETSI OSM
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: osslegalrouting@vmware.com
25 Montoring plugin receiver that consumes the request messages &
26 responds using producer for vROPs
29 from mon_plugin_vrops
import MonPlugin
30 from kafka_consumer_vrops
import vROP_KafkaConsumer
32 from core
.message_bus
.producer
import KafkaProducer
37 class PluginReceiver():
38 """MON Plugin receiver receiving request messages & responding using producer for vROPs
42 """Constructor of PluginReceiver
45 topics
= ['alarm_request', 'metric_request', 'Access_Credentials', 'alarm_response']
46 #To Do - Add broker uri
48 self
.mon_plugin
= MonPlugin()
49 self
.consumer
= vROP_KafkaConsumer(topics
, broker_uri
)
51 self
.producer
= KafkaProducer()
54 """Consume the message, act on it & respond
57 for message
in self
.consumer
.vrops_consumer
:
58 message_values
= json
.loads(message
.value
)
59 if message_values
.has_key('vim_type'):
60 vim_type
= message_values
['vim_type'].lower()
61 if vim_type
== 'vmware':
62 log
.info("Action required for: {}".format(message
.topic
))
63 if message
.topic
== 'alarm_request':
64 if message
.key
== "create_alarm_request":
65 config_alarm_info
= json
.loads(message
.value
)
66 alarm_uuid
= self
.create_alarm(config_alarm_info
['alarm_creation_request'])
67 log
.info("Alarm created with alarm uuid: {}".format(alarm_uuid
))
68 #Publish message using producer
69 self
.publish_create_alarm_status(alarm_uuid
, config_alarm_info
)
70 elif message
.key
== "update_alarm_request":
71 update_alarm_info
= json
.loads(message
.value
)
72 alarm_uuid
= self
.update_alarm(update_alarm_info
['alarm_update_request'])
73 log
.info("In plugin_receiver: Alarm defination updated : alarm uuid: {}".format(alarm_uuid
))
74 #Publish message using producer
75 self
.publish_update_alarm_status(alarm_uuid
, update_alarm_info
)
76 elif message
.key
== "delete_alarm_request":
77 delete_alarm_info
= json
.loads(message
.value
)
78 alarm_uuid
= self
.delete_alarm(delete_alarm_info
['alarm_deletion_request'])
79 log
.info("In plugin_receiver: Alarm defination deleted : alarm uuid: {}".format(alarm_uuid
))
80 #Publish message using producer
81 self
.publish_delete_alarm_status(alarm_uuid
, delete_alarm_info
)
82 elif message
.key
== "list_alarm_request":
83 request_input
= json
.loads(message
.value
)
84 triggered_alarm_list
= self
.list_alarms(request_input
['alarm_list_request'])
85 #Publish message using producer
86 self
.publish_list_alarm_response(triggered_alarm_list
, request_input
)
87 elif message
.topic
== 'metric_request':
88 if message
.key
== "read_metric_data_request":
89 metric_request_info
= json
.loads(message
.value
)
90 metrics_data
= self
.mon_plugin
.get_metrics_data(metric_request_info
)
91 log
.info("Collected Metrics Data: {}".format(metrics_data
))
92 #Publish message using producer
93 self
.publish_metrics_data_status(metrics_data
)
94 elif message
.key
== "create_metric_request":
95 metric_info
= json
.loads(message
.value
)
96 metric_status
= self
.verify_metric(metric_info
['metric_create'])
97 #Publish message using producer
98 self
.publish_create_metric_response(metric_info
, metric_status
)
99 elif message
.key
== "update_metric_request":
100 metric_info
= json
.loads(message
.value
)
101 metric_status
= self
.verify_metric(metric_info
['metric_create'])
102 #Publish message using producer
103 self
.publish_update_metric_response(metric_info
, metric_status
)
104 elif message
.key
== "delete_metric_request":
105 metric_info
= json
.loads(message
.value
)
106 #Deleting Metric Data is not allowed. Publish status as False
107 log
.warn("Deleting Metric is not allowed: {}".format(metric_info
['metric_name']))
108 #Publish message using producer
109 self
.publish_delete_metric_response(metric_info
)
111 except Exception as exp
:
112 log
.error("Exception in receiver: {} {}".format(exp
, traceback
.format_exc()))
114 def create_alarm(self
, config_alarm_info
):
115 """Create alarm using vROPs plugin
117 plugin_uuid
= self
.mon_plugin
.configure_rest_plugin()
118 alarm_uuid
= self
.mon_plugin
.configure_alarm(config_alarm_info
)
121 def publish_create_alarm_status(self
, alarm_uuid
, config_alarm_info
):
122 """Publish create alarm status using producer
124 topic
= 'alarm_response'
125 msg_key
= 'create_alarm_response'
126 response_msg
= {"schema_version":1.0,
127 "schema_type":"create_alarm_response",
128 "alarm_creation_response":
129 {"correlation_id":config_alarm_info
["alarm_creation_request"]["correlation_id"],
130 "alarm_uuid":alarm_uuid
,
131 "status": True if alarm_uuid
else False
135 self
.producer
.publish(key
=msg_key
, value
=json
.dumps(response_msg
), topic
=topic
)
137 def update_alarm(self
, update_alarm_info
):
138 """Updare already created alarm
140 alarm_uuid
= self
.mon_plugin
.update_alarm_configuration(update_alarm_info
)
143 def publish_update_alarm_status(self
, alarm_uuid
, update_alarm_info
):
144 """Publish update alarm status requests using producer
146 topic
= 'alarm_response'
147 msg_key
= 'update_alarm_response'
148 response_msg
= {"schema_version":1.0,
149 "schema_type":"update_alarm_response",
150 "alarm_update_response":
151 {"correlation_id":update_alarm_info
["alarm_update_request"]["correlation_id"],
152 "alarm_uuid":alarm_uuid
,
153 "status": True if alarm_uuid
else False
157 self
.producer
.publish(key
=msg_key
, value
=json
.dumps(response_msg
), topic
=topic
)
159 def delete_alarm(self
, delete_alarm_info
):
160 """Delete alarm configuration
162 alarm_uuid
= self
.mon_plugin
.delete_alarm_configuration(delete_alarm_info
)
165 def publish_delete_alarm_status(self
, alarm_uuid
, delete_alarm_info
):
166 """Publish update alarm status requests using producer
168 topic
= 'alarm_response'
169 msg_key
= 'delete_alarm_response'
170 response_msg
= {"schema_version":1.0,
171 "schema_type":"delete_alarm_response",
172 "alarm_deletion_response":
173 {"correlation_id":delete_alarm_info
["alarm_deletion_request"]["correlation_id"],
174 "alarm_uuid":alarm_uuid
,
175 "status": True if alarm_uuid
else False
179 self
.producer
.publish(key
=msg_key
, value
=json
.dumps(response_msg
), topic
=topic
)
182 def publish_metrics_data_status(self
, metrics_data
):
183 """Publish the requested metric data using producer
185 topic
= 'metric_response'
186 msg_key
= 'read_metric_data_response'
188 self
.producer
.publish(key
=msg_key
, value
=json
.dumps(metrics_data
), topic
=topic
)
191 def verify_metric(self
, metric_info
):
192 """Verify if metric is supported or not
194 metric_key_status
= self
.mon_plugin
.verify_metric_support(metric_info
)
195 return metric_key_status
197 def publish_create_metric_response(self
, metric_info
, metric_status
):
198 """Publish create metric response
200 topic
= 'metric_response'
201 msg_key
= 'create_metric_response'
202 response_msg
= {"schema_version":1.0,
203 "schema_type":"create_metric_response",
204 "correlation_id":metric_info
['correlation_id'],
205 "metric_create_response":
208 "resource_uuid":metric_info
['metric_create']['resource_uuid'],
209 "status":metric_status
213 self
.producer
.publish(key
=msg_key
, value
=json
.dumps(response_msg
), topic
=topic
)
215 def publish_update_metric_response(self
, metric_info
, metric_status
):
216 """Publish update metric response
218 topic
= 'metric_response'
219 msg_key
= 'update_metric_response'
220 response_msg
= {"schema_version":1.0,
221 "schema_type":"metric_update_response",
222 "correlation_id":metric_info
['correlation_id'],
223 "metric_update_response":
226 "resource_uuid":metric_info
['metric_create']['resource_uuid'],
227 "status":metric_status
231 self
.producer
.publish(key
=msg_key
, value
=json
.dumps(response_msg
), topic
=topic
)
233 def publish_delete_metric_response(self
, metric_info
):
236 topic
= 'metric_response'
237 msg_key
= 'delete_metric_response'
238 response_msg
= {"schema_version":1.0,
239 "schema_type":"delete_metric_response",
240 "correlation_id":metric_info
['correlation_id'],
241 "metric_name":metric_info
['metric_name'],
243 "resource_uuid":metric_info
['resource_uuid'],
244 "tenant_uuid":metric_info
['tenant_uuid'],
248 self
.producer
.publish(key
=msg_key
, value
=json
.dumps(response_msg
), topic
=topic
)
250 def list_alarms(self
, list_alarm_input
):
253 triggered_alarms
= self
.mon_plugin
.get_triggered_alarms_list(list_alarm_input
)
254 return triggered_alarms
257 def publish_list_alarm_response(self
, triggered_alarm_list
, list_alarm_input
):
260 topic
= 'alarm_response'
261 msg_key
= 'list_alarm_response'
262 response_msg
= {"schema_version":1.0,
263 "schema_type":"list_alarm_response",
264 "correlation_id":list_alarm_input
['alarm_list_request']['correlation_id'],
265 "resource_uuid":list_alarm_input
['alarm_list_request']['resource_uuid'],
266 "list_alarm_resp":triggered_alarm_list
269 self
.producer
.publish(key
=msg_key
, value
=json
.dumps(response_msg
), topic
=topic
)
273 log
.basicConfig(filename
='mon_vrops_log.log',level
=log
.DEBUG
)
274 plugin_rcvr
= PluginReceiver()
275 plugin_rcvr
.consume()
277 if __name__
== "__main__":