Adds OSMMON_VCA_USER and adds timeout and max.poll.interval to collector
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import time
26 from json import JSONDecodeError
27
28 import six
29 import yaml
30
31 from osm_mon.common.common_db_client import CommonDbClient
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.message_bus.consumer import Consumer
35 from osm_mon.core.message_bus.producer import Producer
36 from osm_mon.core.settings import Config
37 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
38 from osm_mon.plugins.CloudWatch.connection import Connection
39 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
40 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
41 from osm_mon.plugins.OpenStack.Aodh import alarm_handler
42 from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
43 from osm_mon.plugins.vRealiseOps import plugin_receiver
44
45 cfg = Config.instance()
46
47 logging.basicConfig(stream=sys.stdout,
48 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
49 datefmt='%m/%d/%Y %I:%M:%S %p',
50 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
51 log = logging.getLogger(__name__)
52
53 kafka_logger = logging.getLogger('kafka')
54 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
55
56
57 class CommonConsumer:
58
59 def __init__(self):
60 self.auth_manager = AuthManager()
61 self.database_manager = DatabaseManager()
62 self.database_manager.create_tables()
63
64 # Create OpenStack alarming and metric instances
65 self.openstack_metrics = metric_handler.OpenstackMetricHandler()
66 self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
67
68 # Create CloudWatch alarm and metric instances
69 self.cloudwatch_alarms = plugin_alarms()
70 self.cloudwatch_metrics = plugin_metrics()
71 self.aws_connection = Connection()
72 self.aws_access_credentials = AccessCredentials()
73
74 # Create vROps plugin_receiver class instance
75 self.vrops_rcvr = plugin_receiver.PluginReceiver()
76
77 log.info("Connecting to MongoDB...")
78 self.common_db = CommonDbClient()
79 log.info("Connection successful.")
80
81 def get_vim_type(self, vim_uuid):
82 """Get the vim type that is required by the message."""
83 credentials = self.database_manager.get_credentials(vim_uuid)
84 return credentials.type
85
86 def run(self):
87 common_consumer = Consumer("mon-consumer")
88
89 topics = ['metric_request', 'alarm_request', 'vim_account']
90 common_consumer.subscribe(topics)
91 retries = 1
92 max_retries = 5
93 while True:
94 try:
95 common_consumer.poll()
96 common_consumer.seek_to_end()
97 break
98 except Exception:
99 log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.")
100 log.error("Retry number %d of %d", retries, max_retries)
101 if retries >= max_retries:
102 log.error("Achieved max number of retries. Logging exception and exiting...")
103 log.exception("Exception: ")
104 return
105 retries = retries + 1
106 time.sleep(2)
107
108 log.info("Listening for messages...")
109 for message in common_consumer:
110 self.consume_message(message)
111
112 def consume_message(self, message):
113 log.info("Message arrived: %s", message)
114 try:
115 try:
116 values = json.loads(message.value)
117 except JSONDecodeError:
118 values = yaml.safe_load(message.value)
119
120 response = None
121
122 if message.topic == "vim_account":
123 if message.key == "create" or message.key == "edit":
124 self.auth_manager.store_auth_credentials(values)
125 if message.key == "delete":
126 self.auth_manager.delete_auth_credentials(values)
127
128 else:
129 # Get ns_id from message
130 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
131 contains_list = False
132 list_index = None
133 for k, v in six.iteritems(values):
134 if isinstance(v, dict):
135 if 'ns_id' in v:
136 contains_list = True
137 list_index = k
138 break
139 if not contains_list and 'ns_id' in values:
140 ns_id = values['ns_id']
141 else:
142 ns_id = values[list_index]['ns_id']
143
144 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
145
146 # Check the vim desired by the message
147 vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
148 vim_uuid = vnfr['vim-account-id']
149
150 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
151 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
152 vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
153 if contains_list:
154 values[list_index]['resource_uuid'] = vdur['vim-id']
155 else:
156 values['resource_uuid'] = vdur['vim-id']
157 message = message._replace(value=json.dumps(values))
158
159 vim_type = self.get_vim_type(vim_uuid)
160
161 if vim_type == "openstack":
162 log.info("This message is for the OpenStack plugin.")
163 if message.topic == "metric_request":
164 response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
165 if message.topic == "alarm_request":
166 response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
167
168 elif vim_type == "aws":
169 log.info("This message is for the CloudWatch plugin.")
170 aws_conn = self.aws_connection.setEnvironment()
171 if message.topic == "metric_request":
172 response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
173 if message.topic == "alarm_request":
174 response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
175
176 elif vim_type == "vmware":
177 log.info("This metric_request message is for the vROPs plugin.")
178 if message.topic == "metric_request":
179 response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
180 if message.topic == "alarm_request":
181 response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
182
183 else:
184 log.debug("vim_type is misconfigured or unsupported; %s",
185 vim_type)
186 if response:
187 self._publish_response(message.topic, message.key, response)
188
189 except Exception:
190 log.exception("Exception processing message: ")
191
192 def _publish_response(self, topic: str, key: str, msg: dict):
193 topic = topic.replace('request', 'response')
194 key = key.replace('request', 'response')
195 producer = Producer()
196 producer.send(topic=topic, key=key, value=json.dumps(msg))
197 producer.flush(timeout=5)
198 producer.close()
199
200
201 if __name__ == '__main__':
202 CommonConsumer().run()