Added a Common KafkaConsumer for all of the plugins
[osm/MON.git] / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25
26 sys.path.append("/root/MON")
27
28 logging.basicConfig(filename='MON_plugins.log',
29 format='%(asctime)s %(message)s',
30 datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
31 level=logging.INFO)
32 log = logging.getLogger(__name__)
33
34 from kafka import KafkaConsumer
35 from kafka.errors import KafkaError
36
37 from plugins.OpenStack.Aodh import alarming
38 from plugins.OpenStack.common import Common
39 from plugins.OpenStack.Gnocchi import metrics
40
41
42 # Initialize servers
43 server = {'server': 'localhost:9092'}
44
45 # Initialize consumers for alarms and metrics
46 common_consumer = KafkaConsumer(group_id='osm_mon',
47 bootstrap_servers=server['server'])
48
49 # Create OpenStack alarming and metric instances
50 auth_token = None
51 openstack_auth = Common()
52 openstack_metrics = metrics.Metrics()
53 openstack_alarms = alarming.Alarming()
54
55
56 def get_vim_type(message):
57 """Get the vim type that is required by the message."""
58 try:
59 return json.loads(message.value)["vim_type"].lower()
60 except Exception as exc:
61 log.warn("vim_type is not configured correctly; %s", exc)
62 return None
63
64 # Define subscribe the consumer for the plugins
65 topics = ['metric_request', 'alarm_request', 'access_credentials']
66 common_consumer.subscribe(topics)
67
68 try:
69 log.info("Listening for alarm_request and metric_request messages")
70 for message in common_consumer:
71 # Check the message topic
72 if message.topic == "metric_request":
73 # Check the vim desired by the message
74 vim_type = get_vim_type(message)
75 if vim_type == "openstack":
76 log.info("This message is for the OpenStack plugin.")
77 openstack_metrics.metric_calls(
78 message, openstack_auth, auth_token)
79
80 elif vim_type == "cloudwatch":
81 log.info("This message is for the CloudWatch plugin.")
82
83 elif vim_type == "vrops":
84 log.info("This message is for the vROPs plugin.")
85
86 else:
87 log.debug("vim_type is misconfigured or unsupported; %s",
88 vim_type)
89
90 elif message.topic == "alarm_request":
91 # Check the vim desired by the message
92 vim_type = get_vim_type(message)
93 if vim_type == "openstack":
94 log.info("This message is for the OpenStack plugin.")
95 openstack_alarms.alarming(message, openstack_auth, auth_token)
96
97 elif vim_type == "cloudwatch":
98 log.info("This message is for the CloudWatch plugin.")
99
100 elif vim_type == "vrops":
101 log.info("This message is for the vROPs plugin.")
102
103 else:
104 log.debug("vim_type is misconfigured or unsupported; %s",
105 vim_type)
106
107 elif message.topic == "access_credentials":
108 # Check the vim desired by the message
109 vim_type = get_vim_type(message)
110 if vim_type == "openstack":
111 log.info("This message is for the OpenStack plugin.")
112 auth_token = openstack_auth._authenticate(message=message)
113
114 elif vim_type == "cloudwatch":
115 log.info("This message is for the CloudWatch plugin.")
116
117 elif vim_type == "vrops":
118 log.info("This message is for the vROPs plugin.")
119
120 else:
121 log.debug("vim_type is misconfigured or unsupported; %s",
122 vim_type)
123
124 else:
125 log.info("This topic is not relevant to any of the MON plugins.")
126
127
128 except KafkaError as exc:
129 log.warn("Exception: %s", exc)