Adds support for OSMMON_DATABASE_COMMONKEY to decrypt vim passwords
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import time
26 from json import JSONDecodeError
27
28 import six
29 import yaml
30
31 from osm_mon.common.common_db_client import CommonDbClient
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.message_bus.consumer import Consumer
35 from osm_mon.core.message_bus.producer import Producer
36 from osm_mon.core.settings import Config
37 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
38 from osm_mon.plugins.CloudWatch.connection import Connection
39 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
40 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
41 from osm_mon.plugins.OpenStack.Aodh import alarm_handler
42 from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
43 from osm_mon.plugins.vRealiseOps import plugin_receiver
44
45 cfg = Config.instance()
46
47 logging.basicConfig(stream=sys.stdout,
48 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
49 datefmt='%m/%d/%Y %I:%M:%S %p',
50 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
51 log = logging.getLogger(__name__)
52
53 kafka_logger = logging.getLogger('kafka')
54 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
55
56
57 class CommonConsumer:
58
59 def __init__(self):
60 self.auth_manager = AuthManager()
61 self.database_manager = DatabaseManager()
62 self.database_manager.create_tables()
63
64 # Create OpenStack alarming and metric instances
65 self.openstack_metrics = metric_handler.OpenstackMetricHandler()
66 self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
67
68 # Create CloudWatch alarm and metric instances
69 self.cloudwatch_alarms = plugin_alarms()
70 self.cloudwatch_metrics = plugin_metrics()
71 self.aws_connection = Connection()
72 self.aws_access_credentials = AccessCredentials()
73
74 # Create vROps plugin_receiver class instance
75 self.vrops_rcvr = plugin_receiver.PluginReceiver()
76
77 log.info("Connecting to MongoDB...")
78 self.common_db = CommonDbClient()
79 log.info("Connection successful.")
80
81 def get_vim_type(self, vim_uuid):
82 """Get the vim type that is required by the message."""
83 credentials = self.database_manager.get_credentials(vim_uuid)
84 return credentials.type
85
86 def run(self):
87 common_consumer = Consumer("mon-consumer")
88
89 topics = ['metric_request', 'alarm_request', 'vim_account']
90 common_consumer.subscribe(topics)
91 retries = 1
92 max_retries = 5
93 while True:
94 try:
95 common_consumer.poll()
96 common_consumer.seek_to_end()
97 break
98 except Exception:
99 log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.")
100 log.error("Retry number %d of %d", retries, max_retries)
101 if retries >= max_retries:
102 log.error("Achieved max number of retries. Logging exception and exiting...")
103 log.exception("Exception: ")
104 return
105 retries = retries + 1
106 time.sleep(2)
107
108 log.info("Listening for messages...")
109 for message in common_consumer:
110 self.consume_message(message)
111
112 def consume_message(self, message):
113 log.info("Message arrived: %s", message)
114 try:
115 try:
116 values = json.loads(message.value)
117 except JSONDecodeError:
118 values = yaml.safe_load(message.value)
119
120 response = None
121
122 if message.topic == "vim_account":
123 if message.key == "create" or message.key == "edit":
124 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
125 values['schema_version'],
126 values['_id'])
127 self.auth_manager.store_auth_credentials(values)
128 if message.key == "delete":
129 self.auth_manager.delete_auth_credentials(values)
130
131 else:
132 # Get ns_id from message
133 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
134 contains_list = False
135 list_index = None
136 for k, v in six.iteritems(values):
137 if isinstance(v, dict):
138 if 'ns_id' in v:
139 contains_list = True
140 list_index = k
141 break
142 if not contains_list and 'ns_id' in values:
143 ns_id = values['ns_id']
144 else:
145 ns_id = values[list_index]['ns_id']
146
147 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
148
149 # Check the vim desired by the message
150 vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
151 vim_uuid = vnfr['vim-account-id']
152
153 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
154 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
155 vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
156 if contains_list:
157 values[list_index]['resource_uuid'] = vdur['vim-id']
158 else:
159 values['resource_uuid'] = vdur['vim-id']
160 message = message._replace(value=json.dumps(values))
161
162 vim_type = self.get_vim_type(vim_uuid)
163
164 if vim_type == "openstack":
165 log.info("This message is for the OpenStack plugin.")
166 if message.topic == "metric_request":
167 response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
168 if message.topic == "alarm_request":
169 response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
170
171 elif vim_type == "aws":
172 log.info("This message is for the CloudWatch plugin.")
173 aws_conn = self.aws_connection.setEnvironment()
174 if message.topic == "metric_request":
175 response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
176 if message.topic == "alarm_request":
177 response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
178
179 elif vim_type == "vmware":
180 log.info("This metric_request message is for the vROPs plugin.")
181 if message.topic == "metric_request":
182 response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
183 if message.topic == "alarm_request":
184 response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
185
186 else:
187 log.debug("vim_type is misconfigured or unsupported; %s",
188 vim_type)
189 if response:
190 self._publish_response(message.topic, message.key, response)
191
192 except Exception:
193 log.exception("Exception processing message: ")
194
195 def _publish_response(self, topic: str, key: str, msg: dict):
196 topic = topic.replace('request', 'response')
197 key = key.replace('request', 'response')
198 producer = Producer()
199 producer.send(topic=topic, key=key, value=json.dumps(msg))
200 producer.flush(timeout=5)
201 producer.close()
202
203
204 if __name__ == '__main__':
205 CommonConsumer().run()