Adds use of CustomCollector in Prometheus exporter
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 from json import JSONDecodeError
26
27 import six
28 import yaml
29
30 from osm_mon.common.common_db_client import CommonDbClient
31 from osm_mon.core.auth import AuthManager
32 from osm_mon.core.database import DatabaseManager
33 from osm_mon.core.message_bus.consumer import Consumer
34 from osm_mon.core.message_bus.producer import Producer
35 from osm_mon.core.settings import Config
36 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
37 from osm_mon.plugins.CloudWatch.connection import Connection
38 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
39 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
40 from osm_mon.plugins.OpenStack.Aodh import alarm_handler
41 from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
42 from osm_mon.plugins.vRealiseOps import plugin_receiver
43
44 cfg = Config.instance()
45
46 logging.basicConfig(stream=sys.stdout,
47 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
48 datefmt='%m/%d/%Y %I:%M:%S %p',
49 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
50 log = logging.getLogger(__name__)
51
52 kafka_logger = logging.getLogger('kafka')
53 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
54 kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
55 kafka_handler = logging.StreamHandler(sys.stdout)
56 kafka_handler.setFormatter(kafka_formatter)
57 kafka_logger.addHandler(kafka_handler)
58
59
60 class CommonConsumer:
61
62 def __init__(self):
63 self.auth_manager = AuthManager()
64 self.database_manager = DatabaseManager()
65 self.database_manager.create_tables()
66
67 # Create OpenStack alarming and metric instances
68 self.openstack_metrics = metric_handler.OpenstackMetricHandler()
69 self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
70
71 # Create CloudWatch alarm and metric instances
72 self.cloudwatch_alarms = plugin_alarms()
73 self.cloudwatch_metrics = plugin_metrics()
74 self.aws_connection = Connection()
75 self.aws_access_credentials = AccessCredentials()
76
77 # Create vROps plugin_receiver class instance
78 self.vrops_rcvr = plugin_receiver.PluginReceiver()
79
80 log.info("Connecting to MongoDB...")
81 self.common_db = CommonDbClient()
82 log.info("Connection successful.")
83
84 def get_vim_type(self, vim_uuid):
85 """Get the vim type that is required by the message."""
86 credentials = self.database_manager.get_credentials(vim_uuid)
87 return credentials.type
88
89 def run(self):
90 common_consumer = Consumer("mon-consumer")
91
92 topics = ['metric_request', 'alarm_request', 'vim_account']
93 common_consumer.subscribe(topics)
94 common_consumer.poll()
95 common_consumer.seek_to_end()
96
97 log.info("Listening for messages...")
98 for message in common_consumer:
99 self.consume_message(message)
100
101 def consume_message(self, message):
102 log.info("Message arrived: %s", message)
103 try:
104 try:
105 values = json.loads(message.value)
106 except JSONDecodeError:
107 values = yaml.safe_load(message.value)
108
109 response = None
110
111 if message.topic == "vim_account":
112 if message.key == "create" or message.key == "edit":
113 self.auth_manager.store_auth_credentials(values)
114 if message.key == "delete":
115 self.auth_manager.delete_auth_credentials(values)
116
117 else:
118 # Get ns_id from message
119 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
120 contains_list = False
121 list_index = None
122 for k, v in six.iteritems(values):
123 if isinstance(v, dict):
124 if 'ns_id' in v:
125 contains_list = True
126 list_index = k
127 break
128 if not contains_list and 'ns_id' in values:
129 ns_id = values['ns_id']
130 else:
131 ns_id = values[list_index]['ns_id']
132
133 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
134
135 # Check the vim desired by the message
136 vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
137 vim_uuid = vnfr['vim-account-id']
138
139 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
140 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
141 vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
142 if contains_list:
143 values[list_index]['resource_uuid'] = vdur['vim-id']
144 else:
145 values['resource_uuid'] = vdur['vim-id']
146 message = message._replace(value=json.dumps(values))
147
148 vim_type = self.get_vim_type(vim_uuid)
149
150 if vim_type == "openstack":
151 log.info("This message is for the OpenStack plugin.")
152 if message.topic == "metric_request":
153 response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
154 if message.topic == "alarm_request":
155 response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
156
157 elif vim_type == "aws":
158 log.info("This message is for the CloudWatch plugin.")
159 aws_conn = self.aws_connection.setEnvironment()
160 if message.topic == "metric_request":
161 response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
162 if message.topic == "alarm_request":
163 response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
164
165 elif vim_type == "vmware":
166 log.info("This metric_request message is for the vROPs plugin.")
167 if message.topic == "metric_request":
168 response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
169 if message.topic == "alarm_request":
170 response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
171
172 else:
173 log.debug("vim_type is misconfigured or unsupported; %s",
174 vim_type)
175 if response:
176 self._publish_response(message.topic, message.key, response)
177
178 except Exception:
179 log.exception("Exception processing message: ")
180
181 def _publish_response(self, topic: str, key: str, msg: dict):
182 topic = topic.replace('request', 'response')
183 key = key.replace('request', 'response')
184 producer = Producer()
185 producer.send(topic=topic, key=key, value=json.dumps(msg))
186 producer.flush()
187 producer.close()
188
189
190 if __name__ == '__main__':
191 CommonConsumer().run()