Merge "Updated Code of AWS plugin including Minor alternations of consumer/producer...
[osm/MON.git] / plugins / vRealiseOps / plugin_receiver.py
1 # -*- coding: utf-8 -*-
2 """
3 Montoring plugin receiver that consumes the request messages &
4 responds using producer for vROPs
5 """
6
7 from mon_plugin_vrops import MonPlugin
8 from kafka_consumer_vrops import vROP_KafkaConsumer
9 #To Do- Change producer
10 #from core.message_bus.producer import KafkaProducer
11 import json
12 import logging as log
13
14 class PluginReceiver():
15 """MON Plugin receiver receiving request messages & responding using producer for vROPs
16 telemetry plugin
17 """
18 def __init__(self):
19 """Constructor of PluginReceiver
20 """
21
22 topics = ['alarm_request', 'metric_request', 'Access_Credentials', 'alarm_response']
23 #To Do - Add broker uri
24 broker_uri = None
25 self.mon_plugin = MonPlugin()
26 self.consumer = vROP_KafkaConsumer(topics, broker_uri)
27 #To Do- Change producer
28 #self.producer = KafkaProducer()
29
30 def consume(self):
31 """Consume the message, act on it & respond
32 """
33 try:
34 for message in self.consumer.vrops_consumer:
35 if message.topic == 'alarm_request':
36 if message.key == "create_alarm_request":
37 config_alarm_info = json.loads(message.value)
38 alarm_uuid = self.create_alarm(config_alarm_info['alarm_creation_request'])
39 log.info("Alarm created with alarm uuid: {}".format(alarm_uuid))
40 #To Do - Publish message using producer
41 #self.publish_create_alarm_status(alarm_uuid, config_alarm_info)
42 elif message.key == "update_alarm_request":
43 update_alarm_info = json.loads(message.value)
44 alarm_uuid = self.update_alarm(update_alarm_info['alarm_creation_request'])
45 log.info("Alarm defination updated : alarm uuid: {}".format(alarm_uuid))
46 #To Do - Publish message using producer
47 #self.publish_update_alarm_status(alarm_uuid, update_alarm_info)
48 elif message.topic == 'metric_request':
49 if message.key == "read_metric_data_request":
50 metric_request_info = json.loads(message.value)
51 metrics_data = self.mon_plugin.get_metrics_data(metric_request_info)
52 log.info("Collected Metrics Data: {}".format(metrics_data))
53 #To Do - Publish message using producer
54 #self.publish_metrics_data_status(metrics_data)
55
56 except Exception as exp:
57 log.error("Exception in receiver: {}".format(exp))
58
59 def create_alarm(self, config_alarm_info):
60 """Create alarm using vROPs plugin
61 """
62 plugin_uuid = self.mon_plugin.configure_rest_plugin()
63 alarm_uuid = self.mon_plugin.configure_alarm(config_alarm_info)
64 return alarm_uuid
65
66 def publish_create_alarm_status(self, alarm_uuid, config_alarm_info):
67 """Publish create alarm status using producer
68 """
69 topic = 'alarm_response'
70 msg_key = 'create_alarm_response'
71 response_msg = {"schema_version":1.0,
72 "schema_type":"create_alarm_response",
73 "alarm_creation_response":
74 {"correlation_id":config_alarm_info["alarm_creation_request"]["correlation_id"],
75 "alarm_uuid":alarm_uuid,
76 "status": True if alarm_uuid else False
77 }
78 }
79 #To Do - Add producer
80 #self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
81
82 def update_alarm(self, update_alarm_info):
83 """Updare already created alarm
84 """
85 alarm_uuid = self.mon_plugin.reconfigure_alarm(update_alarm_info)
86 return alarm_uuid
87
88 def publish_update_alarm_status(self, alarm_uuid, update_alarm_info):
89 """Publish update alarm status requests using producer
90 """
91 topic = 'alarm_response'
92 msg_key = 'update_alarm_response'
93 response_msg = {"schema_version":1.0,
94 "schema_type":"update_alarm_response",
95 "alarm_update_response":
96 {"correlation_id":update_alarm_info["alarm_creation_request"]["correlation_id"],
97 "alarm_uuid":alarm_uuid,
98 "status": True if alarm_uuid else False
99 }
100 }
101 #To Do - Add producer
102 #self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
103
104 def publish_metrics_data_status(self, metrics_data):
105 """Publish the requested metric data using producer
106 """
107 topic = 'metric_response'
108 msg_key = 'read_metric_data_response'
109 #To Do - Add producer
110 #self.producer.publish(key=msg_key, value=json.dumps(metrics_data), topic=topic)
111
112 #For testing
113 #log.basicConfig(filename='mon_vrops_log.log',level=log.DEBUG)
114 #plugin_rcvr = PluginReceiver()
115 #plugin_rcvr.consume()
116