e9811845de876f8d5e4a79721a154d6545d19647
[osm/MON.git] / osm_mon / core / message_bus / common_consumer
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import os
26
27 sys.path.insert(0, os.path.abspath('../'))
28
29 logging.basicConfig(filename='MON_plugins.log',
30                     format='%(asctime)s %(message)s',
31                     datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
32                     level=logging.INFO)
33 log = logging.getLogger(__name__)
34
35 from kafka import KafkaConsumer
36 from kafka.errors import KafkaError
37
38 from osm_mon.plugins.OpenStack.Aodh import alarming
39 from osm_mon.plugins.OpenStack.common import Common
40 from osm_mon.plugins.OpenStack.Gnocchi import metrics
41
42
43 # Initialize servers
44 server = {'server': 'localhost:9092'}
45
46 # Initialize consumers for alarms and metrics
47 common_consumer = KafkaConsumer(group_id='osm_mon',
48                                 bootstrap_servers=server['server'])
49
50 # Create OpenStack alarming and metric instances
51 auth_token = None
52 openstack_auth = Common()
53 openstack_metrics = metrics.Metrics()
54 openstack_alarms = alarming.Alarming()
55
56
57 def get_vim_type(message):
58     """Get the vim type that is required by the message."""
59     try:
60         return json.loads(message.value)["vim_type"].lower()
61     except Exception as exc:
62         log.warn("vim_type is not configured correctly; %s", exc)
63     return None
64
65 # Define subscribe the consumer for the plugins
66 topics = ['metric_request', 'alarm_request', 'access_credentials']
67 common_consumer.subscribe(topics)
68
69 try:
70     log.info("Listening for alarm_request and metric_request messages")
71     for message in common_consumer:
72         # Check the message topic
73         if message.topic == "metric_request":
74             # Check the vim desired by the message
75             vim_type = get_vim_type(message)
76             if vim_type == "openstack":
77                 log.info("This message is for the OpenStack plugin.")
78                 openstack_metrics.metric_calls(
79                     message, openstack_auth, auth_token)
80
81             elif vim_type == "cloudwatch":
82                 log.info("This message is for the CloudWatch plugin.")
83
84             elif vim_type == "vrops":
85                 log.info("This message is for the vROPs plugin.")
86
87             else:
88                 log.debug("vim_type is misconfigured or unsupported; %s",
89                           vim_type)
90
91         elif message.topic == "alarm_request":
92             # Check the vim desired by the message
93             vim_type = get_vim_type(message)
94             if vim_type == "openstack":
95                 log.info("This message is for the OpenStack plugin.")
96                 openstack_alarms.alarming(message, openstack_auth, auth_token)
97
98             elif vim_type == "cloudwatch":
99                 log.info("This message is for the CloudWatch plugin.")
100
101             elif vim_type == "vrops":
102                 log.info("This message is for the vROPs plugin.")
103
104             else:
105                 log.debug("vim_type is misconfigured or unsupported; %s",
106                           vim_type)
107
108         elif message.topic == "access_credentials":
109             # Check the vim desired by the message
110             vim_type = get_vim_type(message)
111             if vim_type == "openstack":
112                 log.info("This message is for the OpenStack plugin.")
113                 auth_token = openstack_auth._authenticate(message=message)
114
115             elif vim_type == "cloudwatch":
116                 log.info("This message is for the CloudWatch plugin.")
117
118             elif vim_type == "vrops":
119                 log.info("This message is for the vROPs plugin.")
120
121             else:
122                 log.debug("vim_type is misconfigured or unsupported; %s",
123                           vim_type)
124
125         else:
126             log.info("This topic is not relevant to any of the MON plugins.")
127
128
129 except KafkaError as exc:
130     log.warn("Exception: %s", exc)