Adds OSMMON_LOG_LEVEL env var to config log level
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import threading
26
27 import six
28 import yaml
29 from kafka import KafkaConsumer
30 from osm_common import dbmongo
31
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.settings import Config
35 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
36 from osm_mon.plugins.CloudWatch.connection import Connection
37 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
38 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
39 from osm_mon.plugins.OpenStack.Aodh import alarming
40 from osm_mon.plugins.OpenStack.Gnocchi import metrics
41 from osm_mon.plugins.vRealiseOps import plugin_receiver
42
43 cfg = Config.instance()
44
45 logging.basicConfig(stream=sys.stdout,
46 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
47 datefmt='%m/%d/%Y %I:%M:%S %p',
48 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
49 log = logging.getLogger(__name__)
50
51
52 class CommonConsumer:
53
54 def __init__(self):
55 cfg = Config.instance()
56
57 self.auth_manager = AuthManager()
58 self.database_manager = DatabaseManager()
59 self.database_manager.create_tables()
60
61 # Create OpenStack alarming and metric instances
62 self.openstack_metrics = metrics.Metrics()
63 self.openstack_alarms = alarming.Alarming()
64
65 # Create CloudWatch alarm and metric instances
66 self.cloudwatch_alarms = plugin_alarms()
67 self.cloudwatch_metrics = plugin_metrics()
68 self.aws_connection = Connection()
69 self.aws_access_credentials = AccessCredentials()
70
71 # Create vROps plugin_receiver class instance
72 self.vrops_rcvr = plugin_receiver.PluginReceiver()
73
74 log.info("Connecting to MongoDB...")
75 self.common_db = dbmongo.DbMongo()
76 common_db_uri = cfg.MONGO_URI.split(':')
77 self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
78 log.info("Connection successful.")
79
80 # Initialize consumers for alarms and metrics
81 self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
82 key_deserializer=bytes.decode,
83 value_deserializer=bytes.decode,
84 group_id="mon-consumer")
85
86 # Define subscribe the consumer for the plugins
87 topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
88 # TODO: Remove access_credentials
89 self.common_consumer.subscribe(topics)
90
91 def get_vim_type(self, vim_uuid):
92 """Get the vim type that is required by the message."""
93 credentials = self.database_manager.get_credentials(vim_uuid)
94 return credentials.type
95
96 def get_vdur(self, nsr_id, member_index, vdu_name):
97 vnfr = self.get_vnfr(nsr_id, member_index)
98 for vdur in vnfr['vdur']:
99 if vdur['name'] == vdu_name:
100 return vdur
101 raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
102 vdu_name)
103
104 def get_vnfr(self, nsr_id, member_index):
105 vnfr = self.common_db.get_one("vnfrs",
106 {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
107 return vnfr
108
109 def run(self):
110 log.info("Listening for messages...")
111 for message in self.common_consumer:
112 t = threading.Thread(target=self.consume_message, args=(message,))
113 t.start()
114
115 def consume_message(self, message):
116 log.info("Message arrived: %s", message)
117 try:
118 try:
119 values = json.loads(message.value)
120 except ValueError:
121 values = yaml.safe_load(message.value)
122
123 if message.topic == "vim_account":
124 if message.key == "create" or message.key == "edit":
125 self.auth_manager.store_auth_credentials(values)
126 if message.key == "delete":
127 self.auth_manager.delete_auth_credentials(values)
128
129 else:
130 # Get ns_id from message
131 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
132 contains_list = False
133 list_index = None
134 for k, v in six.iteritems(values):
135 if isinstance(v, dict):
136 if 'ns_id' in v:
137 contains_list = True
138 list_index = k
139 break
140 if not contains_list and 'ns_id' in values:
141 ns_id = values['ns_id']
142 else:
143 ns_id = values[list_index]['ns_id']
144
145 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
146
147 # Check the vim desired by the message
148 vnfr = self.get_vnfr(ns_id, vnf_index)
149 vim_uuid = vnfr['vim-account-id']
150
151 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
152 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
153 vdur = self.get_vdur(ns_id, vnf_index, vdu_name)
154 if contains_list:
155 values[list_index]['resource_uuid'] = vdur['vim-id']
156 else:
157 values['resource_uuid'] = vdur['vim-id']
158 message = message._replace(value=json.dumps(values))
159
160 vim_type = self.get_vim_type(vim_uuid)
161
162 if vim_type == "openstack":
163 log.info("This message is for the OpenStack plugin.")
164 if message.topic == "metric_request":
165 self.openstack_metrics.metric_calls(message, vim_uuid)
166 if message.topic == "alarm_request":
167 self.openstack_alarms.alarming(message, vim_uuid)
168
169 elif vim_type == "aws":
170 log.info("This message is for the CloudWatch plugin.")
171 aws_conn = self.aws_connection.setEnvironment()
172 if message.topic == "metric_request":
173 self.cloudwatch_metrics.metric_calls(message, aws_conn)
174 if message.topic == "alarm_request":
175 self.cloudwatch_alarms.alarm_calls(message, aws_conn)
176 if message.topic == "access_credentials":
177 self.aws_access_credentials.access_credential_calls(message)
178
179 elif vim_type == "vmware":
180 log.info("This metric_request message is for the vROPs plugin.")
181 self.vrops_rcvr.consume(message,vim_uuid)
182
183 else:
184 log.debug("vim_type is misconfigured or unsupported; %s",
185 vim_type)
186
187 except Exception:
188 log.exception("Exception processing message: ")
189
190
191 if __name__ == '__main__':
192 CommonConsumer().run()