1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
27 sys.path.append("/root/MON")
29 logging.basicConfig(filename='MON_plugins.log',
30 format='%(asctime)s %(message)s',
31 datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
33 log = logging.getLogger(__name__)
35 from kafka import KafkaConsumer
36 from kafka.errors import KafkaError
38 from osm_mon.plugins.OpenStack.Aodh import alarming
39 from osm_mon.plugins.OpenStack.common import Common
40 from osm_mon.plugins.OpenStack.Gnocchi import metrics
42 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
43 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
45 from osm_mon.plugins.vRealiseOps import plugin_receiver
48 server = {'server': 'localhost:9092'}
50 # Initialize consumers for alarms and metrics
51 common_consumer = KafkaConsumer(bootstrap_servers=server['server'])
53 # Create OpenStack alarming and metric instances
55 openstack_auth = Common()
56 openstack_metrics = metrics.Metrics()
57 openstack_alarms = alarming.Alarming()
59 # Create CloudWatch alarm and metric instances
60 cloudwatch_alarms = plugin_alarms()
61 cloudwatch_metrics = plugin_metrics()
63 #Create vROps plugin_receiver class instance
64 vrops_rcvr = plugin_receiver.PluginReceiver()
66 def get_vim_type(message):
67 """Get the vim type that is required by the message."""
69 return json.loads(message.value)["vim_type"].lower()
70 except Exception as exc:
71 log.warn("vim_type is not configured correctly; %s", exc)
74 # Define subscribe the consumer for the plugins
75 topics = ['metric_request', 'alarm_request', 'access_credentials']
76 common_consumer.subscribe(topics)
79 log.info("Listening for alarm_request and metric_request messages")
80 for message in common_consumer:
81 # Check the message topic
82 if message.topic == "metric_request":
83 # Check the vim desired by the message
84 vim_type = get_vim_type(message)
86 if vim_type == "openstack":
87 log.info("This message is for the OpenStack plugin.")
88 openstack_metrics.metric_calls(
89 message, openstack_auth, auth_token)
91 elif vim_type == "aws":
92 cloudwatch_metrics.metric_calls(message)
93 log.info("This message is for the CloudWatch plugin.")
95 elif vim_type == "vmware":
96 log.info("This metric_request message is for the vROPs plugin.")
97 vrops_rcvr.consume(message)
100 log.debug("vim_type is misconfigured or unsupported; %s",
103 elif message.topic == "alarm_request":
104 # Check the vim desired by the message
105 vim_type = get_vim_type(message)
106 if vim_type == "openstack":
107 log.info("This message is for the OpenStack plugin.")
108 openstack_alarms.alarming(message, openstack_auth, auth_token)
110 elif vim_type == "aws":
111 cloudwatch_alarms.alarm_calls(message)
112 log.info("This message is for the CloudWatch plugin.")
114 elif vim_type == "vmware":
115 log.info("This alarm_request message is for the vROPs plugin.")
116 vrops_rcvr.consume(message)
119 log.debug("vim_type is misconfigured or unsupported; %s",
122 elif message.topic == "access_credentials":
123 # Check the vim desired by the message
124 vim_type = get_vim_type(message)
125 if vim_type == "openstack":
126 log.info("This message is for the OpenStack plugin.")
127 auth_token = openstack_auth._authenticate(message=message)
129 elif vim_type == "aws":
130 #TODO Access credentials later
131 log.info("This message is for the CloudWatch plugin.")
133 elif vim_type == "vmware":
134 log.info("This access_credentials message is for the vROPs plugin.")
135 vrops_rcvr.consume(message)
138 log.debug("vim_type is misconfigured or unsupported; %s",
142 log.info("This topic is not relevant to any of the MON plugins.")
145 except KafkaError as exc:
146 log.warn("Exception: %s", exc)