Adds OSMMON_KAFKA_LOG_LEVEL env var
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import threading
26
27 import six
28 import yaml
29 from kafka import KafkaConsumer
30 from osm_common import dbmongo
31
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.settings import Config
35 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
36 from osm_mon.plugins.CloudWatch.connection import Connection
37 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
38 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
39 from osm_mon.plugins.OpenStack.Aodh import alarming
40 from osm_mon.plugins.OpenStack.Gnocchi import metrics
41 from osm_mon.plugins.vRealiseOps import plugin_receiver
42
43 cfg = Config.instance()
44
45 logging.basicConfig(stream=sys.stdout,
46 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
47 datefmt='%m/%d/%Y %I:%M:%S %p',
48 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
49 log = logging.getLogger(__name__)
50
51 kafka_logger = logging.getLogger('kafka')
52 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
53 kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
54 kafka_handler = logging.StreamHandler(sys.stdout)
55 kafka_handler.setFormatter(kafka_formatter)
56 kafka_logger.addHandler(kafka_handler)
57
58
59 class CommonConsumer:
60
61 def __init__(self):
62 cfg = Config.instance()
63
64 self.auth_manager = AuthManager()
65 self.database_manager = DatabaseManager()
66 self.database_manager.create_tables()
67
68 # Create OpenStack alarming and metric instances
69 self.openstack_metrics = metrics.Metrics()
70 self.openstack_alarms = alarming.Alarming()
71
72 # Create CloudWatch alarm and metric instances
73 self.cloudwatch_alarms = plugin_alarms()
74 self.cloudwatch_metrics = plugin_metrics()
75 self.aws_connection = Connection()
76 self.aws_access_credentials = AccessCredentials()
77
78 # Create vROps plugin_receiver class instance
79 self.vrops_rcvr = plugin_receiver.PluginReceiver()
80
81 log.info("Connecting to MongoDB...")
82 self.common_db = dbmongo.DbMongo()
83 common_db_uri = cfg.MONGO_URI.split(':')
84 self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
85 log.info("Connection successful.")
86
87 def get_vim_type(self, vim_uuid):
88 """Get the vim type that is required by the message."""
89 credentials = self.database_manager.get_credentials(vim_uuid)
90 return credentials.type
91
92 def get_vdur(self, nsr_id, member_index, vdu_name):
93 vnfr = self.get_vnfr(nsr_id, member_index)
94 for vdur in vnfr['vdur']:
95 if vdur['name'] == vdu_name:
96 return vdur
97 raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
98 vdu_name)
99
100 def get_vnfr(self, nsr_id, member_index):
101 vnfr = self.common_db.get_one("vnfrs",
102 {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
103 return vnfr
104
105 def run(self):
106 common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
107 key_deserializer=bytes.decode,
108 value_deserializer=bytes.decode,
109 group_id="mon-consumer")
110
111 topics = ['metric_request', 'alarm_request', 'vim_account']
112 common_consumer.subscribe(topics)
113
114 log.info("Listening for messages...")
115 for message in common_consumer:
116 t = threading.Thread(target=self.consume_message, args=(message,))
117 t.start()
118
119 def consume_message(self, message):
120 log.info("Message arrived: %s", message)
121 try:
122 try:
123 values = json.loads(message.value)
124 except ValueError:
125 values = yaml.safe_load(message.value)
126
127 if message.topic == "vim_account":
128 if message.key == "create" or message.key == "edit":
129 self.auth_manager.store_auth_credentials(values)
130 if message.key == "delete":
131 self.auth_manager.delete_auth_credentials(values)
132
133 else:
134 # Get ns_id from message
135 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
136 contains_list = False
137 list_index = None
138 for k, v in six.iteritems(values):
139 if isinstance(v, dict):
140 if 'ns_id' in v:
141 contains_list = True
142 list_index = k
143 break
144 if not contains_list and 'ns_id' in values:
145 ns_id = values['ns_id']
146 else:
147 ns_id = values[list_index]['ns_id']
148
149 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
150
151 # Check the vim desired by the message
152 vnfr = self.get_vnfr(ns_id, vnf_index)
153 vim_uuid = vnfr['vim-account-id']
154
155 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
156 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
157 vdur = self.get_vdur(ns_id, vnf_index, vdu_name)
158 if contains_list:
159 values[list_index]['resource_uuid'] = vdur['vim-id']
160 else:
161 values['resource_uuid'] = vdur['vim-id']
162 message = message._replace(value=json.dumps(values))
163
164 vim_type = self.get_vim_type(vim_uuid)
165
166 if vim_type == "openstack":
167 log.info("This message is for the OpenStack plugin.")
168 if message.topic == "metric_request":
169 self.openstack_metrics.metric_calls(message, vim_uuid)
170 if message.topic == "alarm_request":
171 self.openstack_alarms.alarming(message, vim_uuid)
172
173 elif vim_type == "aws":
174 log.info("This message is for the CloudWatch plugin.")
175 aws_conn = self.aws_connection.setEnvironment()
176 if message.topic == "metric_request":
177 self.cloudwatch_metrics.metric_calls(message, aws_conn)
178 if message.topic == "alarm_request":
179 self.cloudwatch_alarms.alarm_calls(message, aws_conn)
180 if message.topic == "access_credentials":
181 self.aws_access_credentials.access_credential_calls(message)
182
183 elif vim_type == "vmware":
184 log.info("This metric_request message is for the vROPs plugin.")
185 self.vrops_rcvr.consume(message,vim_uuid)
186
187 else:
188 log.debug("vim_type is misconfigured or unsupported; %s",
189 vim_type)
190
191 except Exception:
192 log.exception("Exception processing message: ")
193
194
195 if __name__ == '__main__':
196 CommonConsumer().run()