e79e98adc5c3dbdbe48595fdde0ce9a3650e438d
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import os
25 import sys
26
27 logging.basicConfig(stream=sys.stdout,
28 format='%(asctime)s %(message)s',
29 datefmt='%m/%d/%Y %I:%M:%S %p',
30 level=logging.INFO)
31 log = logging.getLogger(__name__)
32
33 sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..')))
34
35 from kafka import KafkaConsumer
36
37 from osm_mon.plugins.OpenStack.Aodh import alarming
38 from osm_mon.plugins.OpenStack.Gnocchi import metrics
39
40 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
41 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
42 from osm_mon.plugins.CloudWatch.connection import Connection
43 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
44
45 from osm_mon.plugins.vRealiseOps import plugin_receiver
46
47 from osm_mon.core.auth import AuthManager
48 from osm_mon.core.database import DatabaseManager
49
50 # Initialize servers
51 if "BROKER_URI" in os.environ:
52 server = {'server': os.getenv("BROKER_URI")}
53 else:
54 server = {'server': 'localhost:9092'}
55
56 # Initialize consumers for alarms and metrics
57 common_consumer = KafkaConsumer(bootstrap_servers=server['server'],
58 key_deserializer=bytes.decode,
59 value_deserializer=bytes.decode,
60 group_id="mon-consumer")
61
62 auth_manager = AuthManager()
63 database_manager = DatabaseManager()
64 database_manager.create_tables()
65
66 # Create OpenStack alarming and metric instances
67 openstack_metrics = metrics.Metrics()
68 openstack_alarms = alarming.Alarming()
69
70 # Create CloudWatch alarm and metric instances
71 cloudwatch_alarms = plugin_alarms()
72 cloudwatch_metrics = plugin_metrics()
73 aws_connection = Connection()
74 aws_access_credentials = AccessCredentials()
75
76 # Create vROps plugin_receiver class instance
77 vrops_rcvr = plugin_receiver.PluginReceiver()
78
79
80 def get_vim_type(message):
81 """Get the vim type that is required by the message."""
82 try:
83 return json.loads(message.value)["vim_type"].lower()
84 except Exception as exc:
85 log.warn("vim_type is not configured correctly; %s", exc)
86 return None
87
88
89 # Define subscribe the consumer for the plugins
90 topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
91 # TODO: Remove access_credentials
92 common_consumer.subscribe(topics)
93
94 log.info("Listening for alarm_request and metric_request messages")
95 for message in common_consumer:
96 log.info("Message arrived: %s", message)
97 try:
98 values = json.loads(message.value)
99 # Check the message topic
100 if message.topic == "metric_request":
101 # Check the vim desired by the message
102 vim_type = get_vim_type(message)
103
104 if vim_type == "openstack":
105 log.info("This message is for the OpenStack plugin.")
106 openstack_metrics.metric_calls(message)
107 elif vim_type == "aws":
108 log.info("This message is for the CloudWatch plugin.")
109 aws_conn = aws_connection.setEnvironment()
110 cloudwatch_metrics.metric_calls(message, aws_conn)
111
112 elif vim_type == "vmware":
113 log.info("This metric_request message is for the vROPs plugin.")
114 vrops_rcvr.consume(message)
115
116 else:
117 log.debug("vim_type is misconfigured or unsupported; %s",
118 vim_type)
119
120 elif message.topic == "alarm_request":
121 # Check the vim desired by the message
122 vim_type = get_vim_type(message)
123 if vim_type == "openstack":
124 log.info("This message is for the OpenStack plugin.")
125 openstack_alarms.alarming(message)
126
127 elif vim_type == "aws":
128 log.info("This message is for the CloudWatch plugin.")
129 aws_conn = aws_connection.setEnvironment()
130 cloudwatch_alarms.alarm_calls(message, aws_conn)
131
132 elif vim_type == "vmware":
133 log.info("This alarm_request message is for the vROPs plugin.")
134 vrops_rcvr.consume(message)
135
136 else:
137 log.debug("vim_type is misconfigured or unsupported; %s",
138 vim_type)
139
140 elif message.topic == "vim_account":
141 if message.key == "create" or message.key == "edit":
142 auth_manager.store_auth_credentials(values)
143 if message.key == "delete":
144 auth_manager.delete_auth_credentials(values)
145
146 # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly.
147 elif message.topic == "access_credentials":
148 # Check the vim desired by the message
149 vim_type = get_vim_type(message)
150
151 if vim_type == "aws":
152 log.info("This message is for the CloudWatch plugin.")
153 aws_access_credentials.access_credential_calls(message)
154
155 elif vim_type == "vmware":
156 log.info("This access_credentials message is for the vROPs plugin.")
157 vrops_rcvr.consume(message)
158
159 else:
160 log.debug("vim_type is misconfigured or unsupported; %s",
161 vim_type)
162
163 else:
164 log.info("This topic is not relevant to any of the MON plugins.")
165
166
167 except Exception as exc:
168 log.exception("Exception: %s")