3af2d4abd61654499f078005af6b23819561997d
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import threading
26
27 import six
28 import yaml
29 from kafka import KafkaConsumer
30 from osm_common import dbmongo
31
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.settings import Config
35 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
36 from osm_mon.plugins.CloudWatch.connection import Connection
37 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
38 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
39 from osm_mon.plugins.OpenStack.Aodh import alarming
40 from osm_mon.plugins.OpenStack.Gnocchi import metrics
41 from osm_mon.plugins.vRealiseOps import plugin_receiver
42
43 cfg = Config.instance()
44
45 logging.basicConfig(stream=sys.stdout,
46 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
47 datefmt='%m/%d/%Y %I:%M:%S %p',
48 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
49 log = logging.getLogger(__name__)
50
51 kafka_logger = logging.getLogger('kafka')
52 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
53 kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
54 kafka_handler = logging.StreamHandler(sys.stdout)
55 kafka_handler.setFormatter(kafka_formatter)
56 kafka_logger.addHandler(kafka_handler)
57
58
59 class CommonConsumer:
60
61 def __init__(self):
62 cfg = Config.instance()
63
64 self.auth_manager = AuthManager()
65 self.database_manager = DatabaseManager()
66 self.database_manager.create_tables()
67
68 # Create OpenStack alarming and metric instances
69 self.openstack_metrics = metrics.Metrics()
70 self.openstack_alarms = alarming.Alarming()
71
72 # Create CloudWatch alarm and metric instances
73 self.cloudwatch_alarms = plugin_alarms()
74 self.cloudwatch_metrics = plugin_metrics()
75 self.aws_connection = Connection()
76 self.aws_access_credentials = AccessCredentials()
77
78 # Create vROps plugin_receiver class instance
79 self.vrops_rcvr = plugin_receiver.PluginReceiver()
80
81 log.info("Connecting to MongoDB...")
82 self.common_db = dbmongo.DbMongo()
83 common_db_uri = cfg.MONGO_URI.split(':')
84 self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
85 log.info("Connection successful.")
86
87 def get_vim_type(self, vim_uuid):
88 """Get the vim type that is required by the message."""
89 credentials = self.database_manager.get_credentials(vim_uuid)
90 return credentials.type
91
92 def get_vdur(self, nsr_id, member_index, vdu_name):
93 vnfr = self.get_vnfr(nsr_id, member_index)
94 for vdur in vnfr['vdur']:
95 if vdur['name'] == vdu_name:
96 return vdur
97 raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
98 vdu_name)
99
100 def get_vnfr(self, nsr_id, member_index):
101 vnfr = self.common_db.get_one("vnfrs",
102 {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
103 return vnfr
104
105 def run(self):
106 common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
107 key_deserializer=bytes.decode,
108 value_deserializer=bytes.decode,
109 group_id="mon-consumer",
110 session_timeout_ms=60000,
111 heartbeat_interval_ms=20000)
112
113 topics = ['metric_request', 'alarm_request', 'vim_account']
114 common_consumer.subscribe(topics)
115
116 log.info("Listening for messages...")
117 for message in common_consumer:
118 t = threading.Thread(target=self.consume_message, args=(message,))
119 t.start()
120
121 def consume_message(self, message):
122 log.info("Message arrived: %s", message)
123 try:
124 try:
125 values = json.loads(message.value)
126 except ValueError:
127 values = yaml.safe_load(message.value)
128
129 if message.topic == "vim_account":
130 if message.key == "create" or message.key == "edit":
131 self.auth_manager.store_auth_credentials(values)
132 if message.key == "delete":
133 self.auth_manager.delete_auth_credentials(values)
134
135 else:
136 # Get ns_id from message
137 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
138 contains_list = False
139 list_index = None
140 for k, v in six.iteritems(values):
141 if isinstance(v, dict):
142 if 'ns_id' in v:
143 contains_list = True
144 list_index = k
145 break
146 if not contains_list and 'ns_id' in values:
147 ns_id = values['ns_id']
148 else:
149 ns_id = values[list_index]['ns_id']
150
151 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
152
153 # Check the vim desired by the message
154 vnfr = self.get_vnfr(ns_id, vnf_index)
155 vim_uuid = vnfr['vim-account-id']
156
157 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
158 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
159 vdur = self.get_vdur(ns_id, vnf_index, vdu_name)
160 if contains_list:
161 values[list_index]['resource_uuid'] = vdur['vim-id']
162 else:
163 values['resource_uuid'] = vdur['vim-id']
164 message = message._replace(value=json.dumps(values))
165
166 vim_type = self.get_vim_type(vim_uuid)
167
168 if vim_type == "openstack":
169 log.info("This message is for the OpenStack plugin.")
170 if message.topic == "metric_request":
171 self.openstack_metrics.metric_calls(message, vim_uuid)
172 if message.topic == "alarm_request":
173 self.openstack_alarms.alarming(message, vim_uuid)
174
175 elif vim_type == "aws":
176 log.info("This message is for the CloudWatch plugin.")
177 aws_conn = self.aws_connection.setEnvironment()
178 if message.topic == "metric_request":
179 self.cloudwatch_metrics.metric_calls(message, aws_conn)
180 if message.topic == "alarm_request":
181 self.cloudwatch_alarms.alarm_calls(message, aws_conn)
182 if message.topic == "access_credentials":
183 self.aws_access_credentials.access_credential_calls(message)
184
185 elif vim_type == "vmware":
186 log.info("This metric_request message is for the vROPs plugin.")
187 self.vrops_rcvr.consume(message,vim_uuid)
188
189 else:
190 log.debug("vim_type is misconfigured or unsupported; %s",
191 vim_type)
192
193 except Exception:
194 log.exception("Exception processing message: ")
195
196
197 if __name__ == '__main__':
198 CommonConsumer().run()