Merge "Updates (AWS Plugin) code with common producer/consumer"
[osm/MON.git] / osm_mon / core / message_bus / common_consumer
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import os
26
27 sys.path.append("/root/MON")
28 sys.path.append("../../plugins/CloudWatch")
29
30 logging.basicConfig(filename='MON_plugins.log',
31                     format='%(asctime)s %(message)s',
32                     datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
33                     level=logging.INFO)
34 log = logging.getLogger(__name__)
35
36 from kafka import KafkaConsumer
37 from kafka.errors import KafkaError
38
39 from osm_mon.plugins.OpenStack.Aodh import alarming
40 from osm_mon.plugins.OpenStack.common import Common
41 from osm_mon.plugins.OpenStack.Gnocchi import metrics
42
43 from plugin_alarm import plugin_alarms
44 from plugin_metric import plugin_metrics
45
46 # Initialize servers
47 server = {'server': 'localhost:9092'}
48
49 # Initialize consumers for alarms and metrics
50 common_consumer = KafkaConsumer(bootstrap_servers=server['server'])
51
52 # Create OpenStack alarming and metric instances
53 auth_token = None
54 openstack_auth = Common()
55 openstack_metrics = metrics.Metrics()
56 openstack_alarms = alarming.Alarming()
57
58 # Create CloudWatch alarm and metric instances
59 cloudwatch_alarms = plugin_alarms()
60 cloudwatch_metrics = plugin_metrics()
61
62 def get_vim_type(message):
63     """Get the vim type that is required by the message."""
64     try:
65         return json.loads(message.value)["vim_type"].lower()
66     except Exception as exc:
67         log.warn("vim_type is not configured correctly; %s", exc)
68     return None
69
70 # Define subscribe the consumer for the plugins
71 topics = ['metric_request', 'alarm_request', 'access_credentials']
72 common_consumer.subscribe(topics)
73
74 try:
75     log.info("Listening for alarm_request and metric_request messages")
76     for message in common_consumer:
77         # Check the message topic
78         if message.topic == "metric_request":
79             # Check the vim desired by the message
80             vim_type = get_vim_type(message)
81             
82             if vim_type == "openstack":
83                 log.info("This message is for the OpenStack plugin.")
84                 openstack_metrics.metric_calls(
85                     message, openstack_auth, auth_token)
86
87             elif vim_type == "aws":
88                 cloudwatch_metrics.metric_calls(message)
89                 log.info("This message is for the CloudWatch plugin.")
90
91             elif vim_type == "vrops":
92                 log.info("This message is for the vROPs plugin.")
93
94             else:   
95                 log.debug("vim_type is misconfigured or unsupported; %s",
96                           vim_type)
97
98         elif message.topic == "alarm_request":
99             # Check the vim desired by the message
100             vim_type = get_vim_type(message)
101             if vim_type == "openstack":
102                 log.info("This message is for the OpenStack plugin.")
103                 openstack_alarms.alarming(message, openstack_auth, auth_token)
104
105             elif vim_type == "aws":
106                 cloudwatch_alarms.alarm_calls(message)
107                 log.info("This message is for the CloudWatch plugin.")
108
109             elif vim_type == "vrops":
110                 log.info("This message is for the vROPs plugin.")
111
112             else:
113                 log.debug("vim_type is misconfigured or unsupported; %s",
114                           vim_type)
115
116         elif message.topic == "access_credentials":
117             # Check the vim desired by the message
118             vim_type = get_vim_type(message)
119             if vim_type == "openstack":
120                 log.info("This message is for the OpenStack plugin.")
121                 auth_token = openstack_auth._authenticate(message=message)
122
123             elif vim_type == "aws":
124                 #TODO Access credentials later
125                 log.info("This message is for the CloudWatch plugin.")
126
127             elif vim_type == "vrops":
128                 log.info("This message is for the vROPs plugin.")
129
130             else:
131                 log.debug("vim_type is misconfigured or unsupported; %s",
132                           vim_type)
133
134         else:
135             log.info("This topic is not relevant to any of the MON plugins.")
136
137
138 except KafkaError as exc:
139     log.warn("Exception: %s", exc)