Adds yaml format in OpenStack plugin
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import os
25 import sys
26 import yaml
27
28 import yaml
29 import logstash
30
31 logging.basicConfig(stream=sys.stdout,
32 format='%(asctime)s %(message)s',
33 datefmt='%m/%d/%Y %I:%M:%S %p',
34 level=logging.INFO)
35 log = logging.getLogger(__name__)
36 log.addHandler(logstash.TCPLogstashHandler('dockerelk_logstash_1', 5000, version=1))
37
38
39 sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..')))
40
41 from kafka import KafkaConsumer
42
43 from osm_mon.plugins.OpenStack.Aodh import alarming
44 from osm_mon.plugins.OpenStack.Gnocchi import metrics
45
46 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
47 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
48 from osm_mon.plugins.CloudWatch.connection import Connection
49 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
50
51 from osm_mon.plugins.vRealiseOps import plugin_receiver
52
53 from osm_mon.core.auth import AuthManager
54 from osm_mon.core.database import DatabaseManager
55
56 # Initialize servers
57 if "BROKER_URI" in os.environ:
58 server = {'server': os.getenv("BROKER_URI")}
59 else:
60 server = {'server': 'localhost:9092'}
61
62 # Initialize consumers for alarms and metrics
63 common_consumer = KafkaConsumer(bootstrap_servers=server['server'],
64 key_deserializer=bytes.decode,
65 value_deserializer=bytes.decode,
66 group_id="mon-consumer")
67
68 auth_manager = AuthManager()
69 database_manager = DatabaseManager()
70 database_manager.create_tables()
71
72 # Create OpenStack alarming and metric instances
73 openstack_metrics = metrics.Metrics()
74 openstack_alarms = alarming.Alarming()
75
76 # Create CloudWatch alarm and metric instances
77 cloudwatch_alarms = plugin_alarms()
78 cloudwatch_metrics = plugin_metrics()
79 aws_connection = Connection()
80 aws_access_credentials = AccessCredentials()
81
82 # Create vROps plugin_receiver class instance
83 vrops_rcvr = plugin_receiver.PluginReceiver()
84
85
86 def get_vim_type(vim_uuid):
87 """Get the vim type that is required by the message."""
88 try:
89 credentials = database_manager.get_credentials(vim_uuid)
90 return credentials.type
91 except Exception as exc:
92 log.exception("Error getting vim_type: ")
93 return None
94
95
96 # Define subscribe the consumer for the plugins
97 topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
98 # TODO: Remove access_credentials
99 common_consumer.subscribe(topics)
100
101 log.info("Listening for alarm_request and metric_request messages")
102 for message in common_consumer:
103 log.info("Message arrived: %s", message)
104 try:
105 try:
106 values = json.loads(message.value)
107 except ValueError:
108 values = yaml.safe_load(message.value)
109 # Check the message topic
110 if message.topic == "metric_request":
111 # Check the vim desired by the message
112 vim_type = get_vim_type(values['vim_uuid'])
113
114 if vim_type == "openstack":
115 log.info("This message is for the OpenStack plugin.")
116 openstack_metrics.metric_calls(message)
117 elif vim_type == "aws":
118 log.info("This message is for the CloudWatch plugin.")
119 aws_conn = aws_connection.setEnvironment()
120 cloudwatch_metrics.metric_calls(message, aws_conn)
121
122 elif vim_type == "vmware":
123 log.info("This metric_request message is for the vROPs plugin.")
124 vrops_rcvr.consume(message)
125
126 else:
127 log.debug("vim_type is misconfigured or unsupported; %s",
128 vim_type)
129
130 elif message.topic == "alarm_request":
131 # Check the vim desired by the message
132 vim_type = get_vim_type(values['vim_uuid'])
133 if vim_type == "openstack":
134 log.info("This message is for the OpenStack plugin.")
135 openstack_alarms.alarming(message)
136
137 elif vim_type == "aws":
138 log.info("This message is for the CloudWatch plugin.")
139 aws_conn = aws_connection.setEnvironment()
140 cloudwatch_alarms.alarm_calls(message, aws_conn)
141
142 elif vim_type == "vmware":
143 log.info("This alarm_request message is for the vROPs plugin.")
144 vrops_rcvr.consume(message)
145
146 else:
147 log.debug("vim_type is misconfigured or unsupported; %s",
148 vim_type)
149
150 elif message.topic == "vim_account":
151 if message.key == "create" or message.key == "edit":
152 auth_manager.store_auth_credentials(values)
153 if message.key == "delete":
154 auth_manager.delete_auth_credentials(values)
155
156 # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly.
157 elif message.topic == "access_credentials":
158 # Check the vim desired by the message
159 vim_type = get_vim_type(values['vim_uuid'])
160
161 if vim_type == "aws":
162 log.info("This message is for the CloudWatch plugin.")
163 aws_access_credentials.access_credential_calls(message)
164
165 elif vim_type == "vmware":
166 log.info("This access_credentials message is for the vROPs plugin.")
167 vrops_rcvr.consume(message)
168
169 else:
170 log.debug("vim_type is misconfigured or unsupported; %s",
171 vim_type)
172
173 else:
174 log.info("This topic is not relevant to any of the MON plugins.")
175
176
177 except Exception as exc:
178 log.exception("Exception: %s")