Merge "Set node to run on label docker"
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import time
26 from json import JSONDecodeError
27
28 import six
29 import yaml
30
31 from osm_mon.common.common_db_client import CommonDbClient
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.message_bus.consumer import Consumer
35 from osm_mon.core.message_bus.producer import Producer
36 from osm_mon.core.settings import Config
37 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
38 from osm_mon.plugins.CloudWatch.connection import Connection
39 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
40 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
41 from osm_mon.plugins.OpenStack.Aodh import alarm_handler
42 from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
43 from osm_mon.plugins.vRealiseOps import plugin_receiver
44
45 cfg = Config.instance()
46
47 logging.basicConfig(stream=sys.stdout,
48 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
49 datefmt='%m/%d/%Y %I:%M:%S %p',
50 level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
51 log = logging.getLogger(__name__)
52
53 kafka_logger = logging.getLogger('kafka')
54 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
55 kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
56 kafka_handler = logging.StreamHandler(sys.stdout)
57 kafka_handler.setFormatter(kafka_formatter)
58 kafka_logger.addHandler(kafka_handler)
59
60
61 class CommonConsumer:
62
63 def __init__(self):
64 self.auth_manager = AuthManager()
65 self.database_manager = DatabaseManager()
66 self.database_manager.create_tables()
67
68 # Create OpenStack alarming and metric instances
69 self.openstack_metrics = metric_handler.OpenstackMetricHandler()
70 self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
71
72 # Create CloudWatch alarm and metric instances
73 self.cloudwatch_alarms = plugin_alarms()
74 self.cloudwatch_metrics = plugin_metrics()
75 self.aws_connection = Connection()
76 self.aws_access_credentials = AccessCredentials()
77
78 # Create vROps plugin_receiver class instance
79 self.vrops_rcvr = plugin_receiver.PluginReceiver()
80
81 log.info("Connecting to MongoDB...")
82 self.common_db = CommonDbClient()
83 log.info("Connection successful.")
84
85 def get_vim_type(self, vim_uuid):
86 """Get the vim type that is required by the message."""
87 credentials = self.database_manager.get_credentials(vim_uuid)
88 return credentials.type
89
90 def run(self):
91 common_consumer = Consumer("mon-consumer")
92
93 topics = ['metric_request', 'alarm_request', 'vim_account']
94 common_consumer.subscribe(topics)
95 retries = 1
96 max_retries = 5
97 while True:
98 try:
99 common_consumer.poll()
100 common_consumer.seek_to_end()
101 break
102 except Exception:
103 log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.")
104 log.error("Retry number %d of %d", retries, max_retries)
105 if retries >= max_retries:
106 log.error("Achieved max number of retries. Logging exception and exiting...")
107 log.exception("Exception: ")
108 return
109 retries = retries + 1
110 time.sleep(2)
111
112 log.info("Listening for messages...")
113 for message in common_consumer:
114 self.consume_message(message)
115
116 def consume_message(self, message):
117 log.info("Message arrived: %s", message)
118 try:
119 try:
120 values = json.loads(message.value)
121 except JSONDecodeError:
122 values = yaml.safe_load(message.value)
123
124 response = None
125
126 if message.topic == "vim_account":
127 if message.key == "create" or message.key == "edit":
128 self.auth_manager.store_auth_credentials(values)
129 if message.key == "delete":
130 self.auth_manager.delete_auth_credentials(values)
131
132 else:
133 # Get ns_id from message
134 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
135 contains_list = False
136 list_index = None
137 for k, v in six.iteritems(values):
138 if isinstance(v, dict):
139 if 'ns_id' in v:
140 contains_list = True
141 list_index = k
142 break
143 if not contains_list and 'ns_id' in values:
144 ns_id = values['ns_id']
145 else:
146 ns_id = values[list_index]['ns_id']
147
148 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
149
150 # Check the vim desired by the message
151 vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
152 vim_uuid = vnfr['vim-account-id']
153
154 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
155 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
156 vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
157 if contains_list:
158 values[list_index]['resource_uuid'] = vdur['vim-id']
159 else:
160 values['resource_uuid'] = vdur['vim-id']
161 message = message._replace(value=json.dumps(values))
162
163 vim_type = self.get_vim_type(vim_uuid)
164
165 if vim_type == "openstack":
166 log.info("This message is for the OpenStack plugin.")
167 if message.topic == "metric_request":
168 response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
169 if message.topic == "alarm_request":
170 response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
171
172 elif vim_type == "aws":
173 log.info("This message is for the CloudWatch plugin.")
174 aws_conn = self.aws_connection.setEnvironment()
175 if message.topic == "metric_request":
176 response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
177 if message.topic == "alarm_request":
178 response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
179
180 elif vim_type == "vmware":
181 log.info("This metric_request message is for the vROPs plugin.")
182 if message.topic == "metric_request":
183 response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
184 if message.topic == "alarm_request":
185 response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
186
187 else:
188 log.debug("vim_type is misconfigured or unsupported; %s",
189 vim_type)
190 if response:
191 self._publish_response(message.topic, message.key, response)
192
193 except Exception:
194 log.exception("Exception processing message: ")
195
196 def _publish_response(self, topic: str, key: str, msg: dict):
197 topic = topic.replace('request', 'response')
198 key = key.replace('request', 'response')
199 producer = Producer()
200 producer.send(topic=topic, key=key, value=json.dumps(msg))
201 producer.flush()
202 producer.close()
203
204
205 if __name__ == '__main__':
206 CommonConsumer().run()