6a134bdbb72443574481e61ace0677fd0965939b
1 # -*- coding: utf-8 -*-
3 Montoring plugin receiver that consumes the request messages &
4 responds using producer for vROPs
7 from mon_plugin_vrops
import MonPlugin
8 from kafka_consumer_vrops
import vROP_KafkaConsumer
9 #To Do- Change producer
10 #from core.message_bus.producer import KafkaProducer
14 class PluginReceiver():
15 """MON Plugin receiver receiving request messages & responding using producer for vROPs
19 """Constructor of PluginReceiver
22 topics
= ['alarm_request', 'metric_request', 'Access_Credentials', 'alarm_response']
23 #To Do - Add broker uri
25 self
.mon_plugin
= MonPlugin()
26 self
.consumer
= vROP_KafkaConsumer(topics
, broker_uri
)
27 #To Do- Change producer
28 #self.producer = KafkaProducer()
31 """Consume the message, act on it & respond
34 for message
in self
.consumer
.vrops_consumer
:
35 if message
.topic
== 'alarm_request':
36 if message
.key
== "create_alarm_request":
37 config_alarm_info
= json
.loads(message
.value
)
38 alarm_uuid
= self
.create_alarm(config_alarm_info
['alarm_creation_request'])
39 log
.info("Alarm created with alarm uuid: {}".format(alarm_uuid
))
40 #To Do - Publish message using producer
41 #self.publish_create_alarm_status(alarm_uuid, config_alarm_info)
42 elif message
.key
== "update_alarm_request":
43 update_alarm_info
= json
.loads(message
.value
)
44 alarm_uuid
= self
.update_alarm(update_alarm_info
['alarm_creation_request'])
45 log
.info("Alarm defination updated : alarm uuid: {}".format(alarm_uuid
))
46 #To Do - Publish message using producer
47 #self.publish_update_alarm_status(alarm_uuid, update_alarm_info)
48 elif message
.topic
== 'metric_request':
49 if message
.key
== "read_metric_data_request":
50 metric_request_info
= json
.loads(message
.value
)
51 metrics_data
= self
.mon_plugin
.get_metrics_data(metric_request_info
)
52 log
.info("Collected Metrics Data: {}".format(metrics_data
))
53 #To Do - Publish message using producer
54 #self.publish_metrics_data_status(metrics_data)
56 except Exception as exp
:
57 log
.error("Exception in receiver: {}".format(exp
))
59 def create_alarm(self
, config_alarm_info
):
60 """Create alarm using vROPs plugin
62 plugin_uuid
= self
.mon_plugin
.configure_rest_plugin()
63 alarm_uuid
= self
.mon_plugin
.configure_alarm(config_alarm_info
)
66 def publish_create_alarm_status(self
, alarm_uuid
, config_alarm_info
):
67 """Publish create alarm status using producer
69 topic
= 'alarm_response'
70 msg_key
= 'create_alarm_response'
71 response_msg
= {"schema_version":1.0,
72 "schema_type":"create_alarm_response",
73 "alarm_creation_response":
74 {"correlation_id":config_alarm_info
["alarm_creation_request"]["correlation_id"],
75 "alarm_uuid":alarm_uuid
,
76 "status": True if alarm_uuid
else False
80 #self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
82 def update_alarm(self
, update_alarm_info
):
83 """Updare already created alarm
85 alarm_uuid
= self
.mon_plugin
.reconfigure_alarm(update_alarm_info
)
88 def publish_update_alarm_status(self
, alarm_uuid
, update_alarm_info
):
89 """Publish update alarm status requests using producer
91 topic
= 'alarm_response'
92 msg_key
= 'update_alarm_response'
93 response_msg
= {"schema_version":1.0,
94 "schema_type":"update_alarm_response",
95 "alarm_update_response":
96 {"correlation_id":update_alarm_info
["alarm_creation_request"]["correlation_id"],
97 "alarm_uuid":alarm_uuid
,
98 "status": True if alarm_uuid
else False
101 #To Do - Add producer
102 #self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
104 def publish_metrics_data_status(self
, metrics_data
):
105 """Publish the requested metric data using producer
107 topic
= 'metric_response'
108 msg_key
= 'read_metric_data_response'
109 #To Do - Add producer
110 #self.producer.publish(key=msg_key, value=json.dumps(metrics_data), topic=topic)
113 #log.basicConfig(filename='mon_vrops_log.log',level=log.DEBUG)
114 #plugin_rcvr = PluginReceiver()
115 #plugin_rcvr.consume()