97e37fae2ec2b1d35eaed7bee043280d7eb3cb57
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
31 logging
.basicConfig(stream
=sys
.stdout
,
32 format
='%(asctime)s %(message)s',
33 datefmt
='%m/%d/%Y %I:%M:%S %p',
35 log
= logging
.getLogger(__name__
)
36 log
.addHandler(logstash
.TCPLogstashHandler('dockerelk_logstash_1', 5000, version
=1))
39 sys
.path
.append(os
.path
.abspath(os
.path
.join(os
.path
.realpath(__file__
), '..', '..', '..', '..')))
41 from kafka
import KafkaConsumer
43 from osm_mon
.plugins
.OpenStack
.Aodh
import alarming
44 from osm_mon
.plugins
.OpenStack
.Gnocchi
import metrics
46 from osm_mon
.plugins
.CloudWatch
.plugin_alarm
import plugin_alarms
47 from osm_mon
.plugins
.CloudWatch
.plugin_metric
import plugin_metrics
48 from osm_mon
.plugins
.CloudWatch
.connection
import Connection
49 from osm_mon
.plugins
.CloudWatch
.access_credentials
import AccessCredentials
51 from osm_mon
.plugins
.vRealiseOps
import plugin_receiver
53 from osm_mon
.core
.auth
import AuthManager
54 from osm_mon
.core
.database
import DatabaseManager
57 if "BROKER_URI" in os
.environ
:
58 server
= {'server': os
.getenv("BROKER_URI")}
60 server
= {'server': 'localhost:9092'}
62 # Initialize consumers for alarms and metrics
63 common_consumer
= KafkaConsumer(bootstrap_servers
=server
['server'],
64 key_deserializer
=bytes
.decode
,
65 value_deserializer
=bytes
.decode
,
66 group_id
="mon-consumer")
68 auth_manager
= AuthManager()
69 database_manager
= DatabaseManager()
70 database_manager
.create_tables()
72 # Create OpenStack alarming and metric instances
73 openstack_metrics
= metrics
.Metrics()
74 openstack_alarms
= alarming
.Alarming()
76 # Create CloudWatch alarm and metric instances
77 cloudwatch_alarms
= plugin_alarms()
78 cloudwatch_metrics
= plugin_metrics()
79 aws_connection
= Connection()
80 aws_access_credentials
= AccessCredentials()
82 # Create vROps plugin_receiver class instance
83 vrops_rcvr
= plugin_receiver
.PluginReceiver()
86 def get_vim_type(vim_uuid
):
87 """Get the vim type that is required by the message."""
89 credentials
= database_manager
.get_credentials(vim_uuid
)
90 return credentials
.type
91 except Exception as exc
:
92 log
.exception("Error getting vim_type: ")
96 # Define subscribe the consumer for the plugins
97 topics
= ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
98 # TODO: Remove access_credentials
99 common_consumer
.subscribe(topics
)
101 log
.info("Listening for alarm_request and metric_request messages")
102 for message
in common_consumer
:
103 log
.info("Message arrived: %s", message
)
106 values
= json
.loads(message
.value
)
108 values
= yaml
.safe_load(message
.value
)
109 # Check the message topic
110 if message
.topic
== "metric_request":
111 # Check the vim desired by the message
112 vim_type
= get_vim_type(values
['vim_uuid'])
114 if vim_type
== "openstack":
115 log
.info("This message is for the OpenStack plugin.")
116 openstack_metrics
.metric_calls(message
)
117 elif vim_type
== "aws":
118 log
.info("This message is for the CloudWatch plugin.")
119 aws_conn
= aws_connection
.setEnvironment()
120 cloudwatch_metrics
.metric_calls(message
, aws_conn
)
122 elif vim_type
== "vmware":
123 log
.info("This metric_request message is for the vROPs plugin.")
124 vrops_rcvr
.consume(message
)
127 log
.debug("vim_type is misconfigured or unsupported; %s",
130 elif message
.topic
== "alarm_request":
131 # Check the vim desired by the message
132 vim_type
= get_vim_type(values
['vim_uuid'])
133 if vim_type
== "openstack":
134 log
.info("This message is for the OpenStack plugin.")
135 openstack_alarms
.alarming(message
)
137 elif vim_type
== "aws":
138 log
.info("This message is for the CloudWatch plugin.")
139 aws_conn
= aws_connection
.setEnvironment()
140 cloudwatch_alarms
.alarm_calls(message
, aws_conn
)
142 elif vim_type
== "vmware":
143 log
.info("This alarm_request message is for the vROPs plugin.")
144 vrops_rcvr
.consume(message
)
147 log
.debug("vim_type is misconfigured or unsupported; %s",
150 elif message
.topic
== "vim_account":
151 if message
.key
== "create" or message
.key
== "edit":
152 auth_manager
.store_auth_credentials(values
)
153 if message
.key
== "delete":
154 auth_manager
.delete_auth_credentials(values
)
156 # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly.
157 elif message
.topic
== "access_credentials":
158 # Check the vim desired by the message
159 vim_type
= get_vim_type(values
['vim_uuid'])
161 if vim_type
== "aws":
162 log
.info("This message is for the CloudWatch plugin.")
163 aws_access_credentials
.access_credential_calls(message
)
165 elif vim_type
== "vmware":
166 log
.info("This access_credentials message is for the vROPs plugin.")
167 vrops_rcvr
.consume(message
)
170 log
.debug("vim_type is misconfigured or unsupported; %s",
174 log
.info("This topic is not relevant to any of the MON plugins.")
177 except Exception as exc
:
178 log
.exception("Exception: %s")