85e679f6a60cafa87bbd686d156f550d69088175
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 from json import JSONDecodeError
26
27 import six
28 import yaml
29
30 from osm_mon.common.common_db_client import CommonDbClient
31 from osm_mon.core.auth import AuthManager
32 from osm_mon.core.database import DatabaseManager
33 from osm_mon.core.message_bus.consumer import Consumer
34 from osm_mon.core.message_bus.producer import Producer
35 from osm_mon.core.settings import Config
36 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
37 from osm_mon.plugins.CloudWatch.connection import Connection
38 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
39 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
40 from osm_mon.plugins.OpenStack.Aodh import alarm_handler
41 from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
42 from osm_mon.plugins.vRealiseOps import plugin_receiver
43
44 cfg = Config.instance()
45
46 logging.basicConfig(stream=sys.stdout,
47 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
48 datefmt='%m/%d/%Y %I:%M:%S %p',
49 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
50 log = logging.getLogger(__name__)
51
52 kafka_logger = logging.getLogger('kafka')
53 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
54 kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
55 kafka_handler = logging.StreamHandler(sys.stdout)
56 kafka_handler.setFormatter(kafka_formatter)
57 kafka_logger.addHandler(kafka_handler)
58
59
60 class CommonConsumer:
61
62 def __init__(self):
63 self.auth_manager = AuthManager()
64 self.database_manager = DatabaseManager()
65 self.database_manager.create_tables()
66
67 # Create OpenStack alarming and metric instances
68 self.openstack_metrics = metric_handler.OpenstackMetricHandler()
69 self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
70
71 # Create CloudWatch alarm and metric instances
72 self.cloudwatch_alarms = plugin_alarms()
73 self.cloudwatch_metrics = plugin_metrics()
74 self.aws_connection = Connection()
75 self.aws_access_credentials = AccessCredentials()
76
77 # Create vROps plugin_receiver class instance
78 self.vrops_rcvr = plugin_receiver.PluginReceiver()
79
80 log.info("Connecting to MongoDB...")
81 self.common_db = CommonDbClient()
82 log.info("Connection successful.")
83
84 def get_vim_type(self, vim_uuid):
85 """Get the vim type that is required by the message."""
86 credentials = self.database_manager.get_credentials(vim_uuid)
87 return credentials.type
88
89 def run(self):
90 common_consumer = Consumer("mon-consumer")
91
92 topics = ['metric_request', 'alarm_request', 'vim_account']
93 common_consumer.subscribe(topics)
94 retries = 1
95 max_retries = 5
96 while True:
97 try:
98 common_consumer.poll()
99 common_consumer.seek_to_end()
100 break
101 except Exception:
102 log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.")
103 log.error("Retry number %d of %d", retries, max_retries)
104 if retries >= max_retries:
105 log.error("Achieved max number of retries. Logging exception and exiting...")
106 log.exception("Exception: ")
107 return
108 retries = retries + 1
109
110 log.info("Listening for messages...")
111 for message in common_consumer:
112 self.consume_message(message)
113
114 def consume_message(self, message):
115 log.info("Message arrived: %s", message)
116 try:
117 try:
118 values = json.loads(message.value)
119 except JSONDecodeError:
120 values = yaml.safe_load(message.value)
121
122 response = None
123
124 if message.topic == "vim_account":
125 if message.key == "create" or message.key == "edit":
126 self.auth_manager.store_auth_credentials(values)
127 if message.key == "delete":
128 self.auth_manager.delete_auth_credentials(values)
129
130 else:
131 # Get ns_id from message
132 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
133 contains_list = False
134 list_index = None
135 for k, v in six.iteritems(values):
136 if isinstance(v, dict):
137 if 'ns_id' in v:
138 contains_list = True
139 list_index = k
140 break
141 if not contains_list and 'ns_id' in values:
142 ns_id = values['ns_id']
143 else:
144 ns_id = values[list_index]['ns_id']
145
146 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
147
148 # Check the vim desired by the message
149 vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
150 vim_uuid = vnfr['vim-account-id']
151
152 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
153 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
154 vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
155 if contains_list:
156 values[list_index]['resource_uuid'] = vdur['vim-id']
157 else:
158 values['resource_uuid'] = vdur['vim-id']
159 message = message._replace(value=json.dumps(values))
160
161 vim_type = self.get_vim_type(vim_uuid)
162
163 if vim_type == "openstack":
164 log.info("This message is for the OpenStack plugin.")
165 if message.topic == "metric_request":
166 response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
167 if message.topic == "alarm_request":
168 response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
169
170 elif vim_type == "aws":
171 log.info("This message is for the CloudWatch plugin.")
172 aws_conn = self.aws_connection.setEnvironment()
173 if message.topic == "metric_request":
174 response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
175 if message.topic == "alarm_request":
176 response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
177
178 elif vim_type == "vmware":
179 log.info("This metric_request message is for the vROPs plugin.")
180 if message.topic == "metric_request":
181 response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
182 if message.topic == "alarm_request":
183 response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
184
185 else:
186 log.debug("vim_type is misconfigured or unsupported; %s",
187 vim_type)
188 if response:
189 self._publish_response(message.topic, message.key, response)
190
191 except Exception:
192 log.exception("Exception processing message: ")
193
194 def _publish_response(self, topic: str, key: str, msg: dict):
195 topic = topic.replace('request', 'response')
196 key = key.replace('request', 'response')
197 producer = Producer()
198 producer.send(topic=topic, key=key, value=json.dumps(msg))
199 producer.flush()
200 producer.close()
201
202
203 if __name__ == '__main__':
204 CommonConsumer().run()