Refactor common_db client code
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import threading
26
27 import six
28 import yaml
29 from kafka import KafkaConsumer
30
31 from osm_mon.common.common_db_client import CommonDbClient
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.settings import Config
35 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
36 from osm_mon.plugins.CloudWatch.connection import Connection
37 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
38 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
39 from osm_mon.plugins.OpenStack.Aodh import alarming
40 from osm_mon.plugins.OpenStack.Gnocchi import metrics
41 from osm_mon.plugins.vRealiseOps import plugin_receiver
42
43 cfg = Config.instance()
44
45 logging.basicConfig(stream=sys.stdout,
46 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
47 datefmt='%m/%d/%Y %I:%M:%S %p',
48 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
49 log = logging.getLogger(__name__)
50
51 kafka_logger = logging.getLogger('kafka')
52 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
53 kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
54 kafka_handler = logging.StreamHandler(sys.stdout)
55 kafka_handler.setFormatter(kafka_formatter)
56 kafka_logger.addHandler(kafka_handler)
57
58
59 class CommonConsumer:
60
61 def __init__(self):
62 cfg = Config.instance()
63
64 self.auth_manager = AuthManager()
65 self.database_manager = DatabaseManager()
66 self.database_manager.create_tables()
67
68 # Create OpenStack alarming and metric instances
69 self.openstack_metrics = metrics.Metrics()
70 self.openstack_alarms = alarming.Alarming()
71
72 # Create CloudWatch alarm and metric instances
73 self.cloudwatch_alarms = plugin_alarms()
74 self.cloudwatch_metrics = plugin_metrics()
75 self.aws_connection = Connection()
76 self.aws_access_credentials = AccessCredentials()
77
78 # Create vROps plugin_receiver class instance
79 self.vrops_rcvr = plugin_receiver.PluginReceiver()
80
81 log.info("Connecting to MongoDB...")
82 self.common_db = CommonDbClient()
83 log.info("Connection successful.")
84
85 def get_vim_type(self, vim_uuid):
86 """Get the vim type that is required by the message."""
87 credentials = self.database_manager.get_credentials(vim_uuid)
88 return credentials.type
89
90 def run(self):
91 common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
92 key_deserializer=bytes.decode,
93 value_deserializer=bytes.decode,
94 group_id="mon-consumer",
95 session_timeout_ms=60000,
96 heartbeat_interval_ms=20000)
97
98 topics = ['metric_request', 'alarm_request', 'vim_account']
99 common_consumer.subscribe(topics)
100
101 log.info("Listening for messages...")
102 for message in common_consumer:
103 t = threading.Thread(target=self.consume_message, args=(message,))
104 t.start()
105
106 def consume_message(self, message):
107 log.info("Message arrived: %s", message)
108 try:
109 try:
110 values = json.loads(message.value)
111 except ValueError:
112 values = yaml.safe_load(message.value)
113
114 if message.topic == "vim_account":
115 if message.key == "create" or message.key == "edit":
116 self.auth_manager.store_auth_credentials(values)
117 if message.key == "delete":
118 self.auth_manager.delete_auth_credentials(values)
119
120 else:
121 # Get ns_id from message
122 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
123 contains_list = False
124 list_index = None
125 for k, v in six.iteritems(values):
126 if isinstance(v, dict):
127 if 'ns_id' in v:
128 contains_list = True
129 list_index = k
130 break
131 if not contains_list and 'ns_id' in values:
132 ns_id = values['ns_id']
133 else:
134 ns_id = values[list_index]['ns_id']
135
136 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
137
138 # Check the vim desired by the message
139 vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
140 vim_uuid = vnfr['vim-account-id']
141
142 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
143 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
144 vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
145 if contains_list:
146 values[list_index]['resource_uuid'] = vdur['vim-id']
147 else:
148 values['resource_uuid'] = vdur['vim-id']
149 message = message._replace(value=json.dumps(values))
150
151 vim_type = self.get_vim_type(vim_uuid)
152
153 if vim_type == "openstack":
154 log.info("This message is for the OpenStack plugin.")
155 if message.topic == "metric_request":
156 self.openstack_metrics.metric_calls(message, vim_uuid)
157 if message.topic == "alarm_request":
158 self.openstack_alarms.alarming(message, vim_uuid)
159
160 elif vim_type == "aws":
161 log.info("This message is for the CloudWatch plugin.")
162 aws_conn = self.aws_connection.setEnvironment()
163 if message.topic == "metric_request":
164 self.cloudwatch_metrics.metric_calls(message, aws_conn)
165 if message.topic == "alarm_request":
166 self.cloudwatch_alarms.alarm_calls(message, aws_conn)
167 if message.topic == "access_credentials":
168 self.aws_access_credentials.access_credential_calls(message)
169
170 elif vim_type == "vmware":
171 log.info("This metric_request message is for the vROPs plugin.")
172 self.vrops_rcvr.consume(message, vim_uuid)
173
174 else:
175 log.debug("vim_type is misconfigured or unsupported; %s",
176 vim_type)
177
178 except Exception:
179 log.exception("Exception processing message: ")
180
181
182 if __name__ == '__main__':
183 CommonConsumer().run()