Minor bugs fix
[osm/MON.git] / osm_mon / core / message_bus / common_consumer
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import os
26
27 sys.path.append("/root/MON")
28
29 logging.basicConfig(filename='MON_plugins.log',
30                     format='%(asctime)s %(message)s',
31                     datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
32                     level=logging.INFO)
33 log = logging.getLogger(__name__)
34
35 from kafka import KafkaConsumer
36 from kafka.errors import KafkaError
37
38 from osm_mon.plugins.OpenStack.Aodh import alarming
39 from osm_mon.plugins.OpenStack.common import Common
40 from osm_mon.plugins.OpenStack.Gnocchi import metrics
41
42 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
43 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
44 from osm_mon.plugins.CloudWatch.connection import Connection
45 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
46
47 from osm_mon.plugins.vRealiseOps import plugin_receiver
48
49 # Initialize servers
50 if "BROKER_URI" in os.environ:
51     server = {'server': os.getenv("BROKER_URI")}
52 else:
53     server = {'server': 'localhost:9092'}
54
55
56
57 # Initialize consumers for alarms and metrics
58 common_consumer = KafkaConsumer(bootstrap_servers=server['server'])
59
60 # Create OpenStack alarming and metric instances
61 auth_token = None
62 openstack_auth = Common()
63 openstack_metrics = metrics.Metrics()
64 openstack_alarms = alarming.Alarming()
65
66 # Create CloudWatch alarm and metric instances
67 cloudwatch_alarms = plugin_alarms()
68 cloudwatch_metrics = plugin_metrics()
69 aws_connection = Connection()
70 aws_access_credentials = AccessCredentials()
71
72 #Create vROps plugin_receiver class instance
73 vrops_rcvr = plugin_receiver.PluginReceiver()
74
75 def get_vim_type(message):
76     """Get the vim type that is required by the message."""
77     try:
78         return json.loads(message.value)["vim_type"].lower()
79     except Exception as exc:
80         log.warn("vim_type is not configured correctly; %s", exc)
81     return None
82
83 # Define subscribe the consumer for the plugins
84 topics = ['metric_request', 'alarm_request', 'access_credentials']
85 common_consumer.subscribe(topics)
86
87 try:
88     log.info("Listening for alarm_request and metric_request messages")
89     for message in common_consumer:
90         # Check the message topic
91         if message.topic == "metric_request":
92             # Check the vim desired by the message
93             vim_type = get_vim_type(message)
94
95             if vim_type == "openstack":
96                 log.info("This message is for the OpenStack plugin.")
97                 openstack_metrics.metric_calls(
98                     message, openstack_auth, auth_token)
99
100             elif vim_type == "aws":
101                 log.info("This message is for the CloudWatch plugin.")
102                 aws_conn = aws_connection.setEnvironment()
103                 cloudwatch_metrics.metric_calls(message,aws_conn)
104
105             elif vim_type == "vmware":
106                 log.info("This metric_request message is for the vROPs plugin.")
107                 vrops_rcvr.consume(message)
108
109             else:
110                 log.debug("vim_type is misconfigured or unsupported; %s",
111                           vim_type)
112
113         elif message.topic == "alarm_request":
114             # Check the vim desired by the message
115             vim_type = get_vim_type(message)
116             if vim_type == "openstack":
117                 log.info("This message is for the OpenStack plugin.")
118                 openstack_alarms.alarming(message, openstack_auth, auth_token)
119
120             elif vim_type == "aws":
121                 log.info("This message is for the CloudWatch plugin.")
122                 aws_conn = aws_connection.setEnvironment()
123                 cloudwatch_alarms.alarm_calls(message, aws_conn)
124
125             elif vim_type == "vmware":
126                 log.info("This alarm_request message is for the vROPs plugin.")
127                 vrops_rcvr.consume(message)
128
129             else:
130                 log.debug("vim_type is misconfigured or unsupported; %s",
131                           vim_type)
132
133         elif message.topic == "access_credentials":
134             # Check the vim desired by the message
135             vim_type = get_vim_type(message)
136             if vim_type == "openstack":
137                 log.info("This message is for the OpenStack plugin.")
138                 auth_token = openstack_auth._authenticate(message=message)
139
140             elif vim_type == "aws":
141                 log.info("This message is for the CloudWatch plugin.")
142                 aws_access_credentials.access_credential_calls(message) 
143
144             elif vim_type == "vmware":
145                 log.info("This access_credentials message is for the vROPs plugin.")
146                 vrops_rcvr.consume(message)
147
148             else:
149                 log.debug("vim_type is misconfigured or unsupported; %s",
150                           vim_type)
151
152         else:
153             log.info("This topic is not relevant to any of the MON plugins.")
154
155
156 except KafkaError as exc:
157     log.warn("Exception: %s", exc)