Removes threading from common_consumer
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 from json import JSONDecodeError
26
27 import six
28 import yaml
29
30 from osm_mon.common.common_db_client import CommonDbClient
31 from osm_mon.core.auth import AuthManager
32 from osm_mon.core.database import DatabaseManager
33 from osm_mon.core.message_bus.consumer import Consumer
34 from osm_mon.core.message_bus.producer import Producer
35 from osm_mon.core.settings import Config
36 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
37 from osm_mon.plugins.CloudWatch.connection import Connection
38 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
39 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
40 from osm_mon.plugins.OpenStack.Aodh import alarm_handler
41 from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
42 from osm_mon.plugins.vRealiseOps import plugin_receiver
43
44 cfg = Config.instance()
45
46 logging.basicConfig(stream=sys.stdout,
47 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
48 datefmt='%m/%d/%Y %I:%M:%S %p',
49 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
50 log = logging.getLogger(__name__)
51
52 kafka_logger = logging.getLogger('kafka')
53 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
54 kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
55 kafka_handler = logging.StreamHandler(sys.stdout)
56 kafka_handler.setFormatter(kafka_formatter)
57 kafka_logger.addHandler(kafka_handler)
58
59
60 class CommonConsumer:
61
62 def __init__(self):
63 self.auth_manager = AuthManager()
64 self.database_manager = DatabaseManager()
65 self.database_manager.create_tables()
66
67 # Create OpenStack alarming and metric instances
68 self.openstack_metrics = metric_handler.OpenstackMetricHandler()
69 self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
70
71 # Create CloudWatch alarm and metric instances
72 self.cloudwatch_alarms = plugin_alarms()
73 self.cloudwatch_metrics = plugin_metrics()
74 self.aws_connection = Connection()
75 self.aws_access_credentials = AccessCredentials()
76
77 # Create vROps plugin_receiver class instance
78 self.vrops_rcvr = plugin_receiver.PluginReceiver()
79
80 log.info("Connecting to MongoDB...")
81 self.common_db = CommonDbClient()
82 log.info("Connection successful.")
83
84 def get_vim_type(self, vim_uuid):
85 """Get the vim type that is required by the message."""
86 credentials = self.database_manager.get_credentials(vim_uuid)
87 return credentials.type
88
89 def run(self):
90 common_consumer = Consumer("mon-consumer")
91
92 topics = ['metric_request', 'alarm_request', 'vim_account']
93 common_consumer.subscribe(topics)
94
95 log.info("Listening for messages...")
96 for message in common_consumer:
97 self.consume_message(message)
98
99 def consume_message(self, message):
100 log.info("Message arrived: %s", message)
101 try:
102 try:
103 values = json.loads(message.value)
104 except JSONDecodeError:
105 values = yaml.safe_load(message.value)
106
107 response = None
108
109 if message.topic == "vim_account":
110 if message.key == "create" or message.key == "edit":
111 self.auth_manager.store_auth_credentials(values)
112 if message.key == "delete":
113 self.auth_manager.delete_auth_credentials(values)
114
115 else:
116 # Get ns_id from message
117 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
118 contains_list = False
119 list_index = None
120 for k, v in six.iteritems(values):
121 if isinstance(v, dict):
122 if 'ns_id' in v:
123 contains_list = True
124 list_index = k
125 break
126 if not contains_list and 'ns_id' in values:
127 ns_id = values['ns_id']
128 else:
129 ns_id = values[list_index]['ns_id']
130
131 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
132
133 # Check the vim desired by the message
134 vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
135 vim_uuid = vnfr['vim-account-id']
136
137 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
138 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
139 vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
140 if contains_list:
141 values[list_index]['resource_uuid'] = vdur['vim-id']
142 else:
143 values['resource_uuid'] = vdur['vim-id']
144 message = message._replace(value=json.dumps(values))
145
146 vim_type = self.get_vim_type(vim_uuid)
147
148 if vim_type == "openstack":
149 log.info("This message is for the OpenStack plugin.")
150 if message.topic == "metric_request":
151 response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
152 if message.topic == "alarm_request":
153 response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
154
155 elif vim_type == "aws":
156 log.info("This message is for the CloudWatch plugin.")
157 aws_conn = self.aws_connection.setEnvironment()
158 if message.topic == "metric_request":
159 response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
160 if message.topic == "alarm_request":
161 response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
162
163 elif vim_type == "vmware":
164 log.info("This metric_request message is for the vROPs plugin.")
165 if message.topic == "metric_request":
166 response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
167 if message.topic == "alarm_request":
168 response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
169
170 else:
171 log.debug("vim_type is misconfigured or unsupported; %s",
172 vim_type)
173 if response:
174 self._publish_response(message.topic, message.key, response)
175
176 except Exception:
177 log.exception("Exception processing message: ")
178
179 def _publish_response(self, topic: str, key: str, msg: dict):
180 topic = topic.replace('request', 'response')
181 key = key.replace('request', 'response')
182 producer = Producer()
183 producer.send(topic=topic, key=key, value=json.dumps(msg))
184 producer.flush()
185 producer.close()
186
187
188 if __name__ == '__main__':
189 CommonConsumer().run()