1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
26 sys
.path
.append("/root/MON")
28 logging
.basicConfig(filename
='MON_plugins.log',
29 format
='%(asctime)s %(message)s',
30 datefmt
='%m/%d/%Y %I:%M:%S %p', filemode
='a',
32 log
= logging
.getLogger(__name__
)
34 from kafka
import KafkaConsumer
35 from kafka
.errors
import KafkaError
37 from plugins
.OpenStack
.Aodh
import alarming
38 from plugins
.OpenStack
.common
import Common
39 from plugins
.OpenStack
.Gnocchi
import metrics
43 server
= {'server': 'localhost:9092'}
45 # Initialize consumers for alarms and metrics
46 common_consumer
= KafkaConsumer(group_id
='osm_mon',
47 bootstrap_servers
=server
['server'])
49 # Create OpenStack alarming and metric instances
51 openstack_auth
= Common()
52 openstack_metrics
= metrics
.Metrics()
53 openstack_alarms
= alarming
.Alarming()
56 def get_vim_type(message
):
57 """Get the vim type that is required by the message."""
59 return json
.loads(message
.value
)["vim_type"].lower()
60 except Exception as exc
:
61 log
.warn("vim_type is not configured correctly; %s", exc
)
64 # Define subscribe the consumer for the plugins
65 topics
= ['metric_request', 'alarm_request', 'access_credentials']
66 common_consumer
.subscribe(topics
)
69 log
.info("Listening for alarm_request and metric_request messages")
70 for message
in common_consumer
:
71 # Check the message topic
72 if message
.topic
== "metric_request":
73 # Check the vim desired by the message
74 vim_type
= get_vim_type(message
)
75 if vim_type
== "openstack":
76 log
.info("This message is for the OpenStack plugin.")
77 openstack_metrics
.metric_calls(
78 message
, openstack_auth
, auth_token
)
80 elif vim_type
== "cloudwatch":
81 log
.info("This message is for the CloudWatch plugin.")
83 elif vim_type
== "vrops":
84 log
.info("This message is for the vROPs plugin.")
87 log
.debug("vim_type is misconfigured or unsupported; %s",
90 elif message
.topic
== "alarm_request":
91 # Check the vim desired by the message
92 vim_type
= get_vim_type(message
)
93 if vim_type
== "openstack":
94 log
.info("This message is for the OpenStack plugin.")
95 openstack_alarms
.alarming(message
, openstack_auth
, auth_token
)
97 elif vim_type
== "cloudwatch":
98 log
.info("This message is for the CloudWatch plugin.")
100 elif vim_type
== "vrops":
101 log
.info("This message is for the vROPs plugin.")
104 log
.debug("vim_type is misconfigured or unsupported; %s",
107 elif message
.topic
== "access_credentials":
108 # Check the vim desired by the message
109 vim_type
= get_vim_type(message
)
110 if vim_type
== "openstack":
111 log
.info("This message is for the OpenStack plugin.")
112 auth_token
= openstack_auth
._authenticate
(message
=message
)
114 elif vim_type
== "cloudwatch":
115 log
.info("This message is for the CloudWatch plugin.")
117 elif vim_type
== "vrops":
118 log
.info("This message is for the vROPs plugin.")
121 log
.debug("vim_type is misconfigured or unsupported; %s",
125 log
.info("This topic is not relevant to any of the MON plugins.")
128 except KafkaError
as exc
:
129 log
.warn("Exception: %s", exc
)