b2677d877f2f5ad027252f12d3088209cabd6081
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
25 from json
import JSONDecodeError
30 from osm_mon
.common
.common_db_client
import CommonDbClient
31 from osm_mon
.core
.auth
import AuthManager
32 from osm_mon
.core
.database
import DatabaseManager
33 from osm_mon
.core
.message_bus
.consumer
import Consumer
34 from osm_mon
.core
.message_bus
.producer
import Producer
35 from osm_mon
.core
.settings
import Config
36 from osm_mon
.plugins
.CloudWatch
.access_credentials
import AccessCredentials
37 from osm_mon
.plugins
.CloudWatch
.connection
import Connection
38 from osm_mon
.plugins
.CloudWatch
.plugin_alarm
import plugin_alarms
39 from osm_mon
.plugins
.CloudWatch
.plugin_metric
import plugin_metrics
40 from osm_mon
.plugins
.OpenStack
.Aodh
import alarm_handler
41 from osm_mon
.plugins
.OpenStack
.Gnocchi
import metric_handler
42 from osm_mon
.plugins
.vRealiseOps
import plugin_receiver
44 cfg
= Config
.instance()
46 logging
.basicConfig(stream
=sys
.stdout
,
47 format
='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
48 datefmt
='%m/%d/%Y %I:%M:%S %p',
49 level
=logging
.getLevelName(cfg
.OSMMON_LOG_LEVEL
))
50 log
= logging
.getLogger(__name__
)
52 kafka_logger
= logging
.getLogger('kafka')
53 kafka_logger
.setLevel(logging
.getLevelName(cfg
.OSMMON_KAFKA_LOG_LEVEL
))
54 kafka_formatter
= logging
.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
55 kafka_handler
= logging
.StreamHandler(sys
.stdout
)
56 kafka_handler
.setFormatter(kafka_formatter
)
57 kafka_logger
.addHandler(kafka_handler
)
63 self
.auth_manager
= AuthManager()
64 self
.database_manager
= DatabaseManager()
65 self
.database_manager
.create_tables()
67 # Create OpenStack alarming and metric instances
68 self
.openstack_metrics
= metric_handler
.OpenstackMetricHandler()
69 self
.openstack_alarms
= alarm_handler
.OpenstackAlarmHandler()
71 # Create CloudWatch alarm and metric instances
72 self
.cloudwatch_alarms
= plugin_alarms()
73 self
.cloudwatch_metrics
= plugin_metrics()
74 self
.aws_connection
= Connection()
75 self
.aws_access_credentials
= AccessCredentials()
77 # Create vROps plugin_receiver class instance
78 self
.vrops_rcvr
= plugin_receiver
.PluginReceiver()
80 log
.info("Connecting to MongoDB...")
81 self
.common_db
= CommonDbClient()
82 log
.info("Connection successful.")
84 def get_vim_type(self
, vim_uuid
):
85 """Get the vim type that is required by the message."""
86 credentials
= self
.database_manager
.get_credentials(vim_uuid
)
87 return credentials
.type
90 common_consumer
= Consumer("mon-consumer")
92 topics
= ['metric_request', 'alarm_request', 'vim_account']
93 common_consumer
.subscribe(topics
)
94 common_consumer
.poll()
95 common_consumer
.seek_to_end()
97 log
.info("Listening for messages...")
98 for message
in common_consumer
:
99 self
.consume_message(message
)
101 def consume_message(self
, message
):
102 log
.info("Message arrived: %s", message
)
105 values
= json
.loads(message
.value
)
106 except JSONDecodeError
:
107 values
= yaml
.safe_load(message
.value
)
111 if message
.topic
== "vim_account":
112 if message
.key
== "create" or message
.key
== "edit":
113 self
.auth_manager
.store_auth_credentials(values
)
114 if message
.key
== "delete":
115 self
.auth_manager
.delete_auth_credentials(values
)
118 # Get ns_id from message
119 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
120 contains_list
= False
122 for k
, v
in six
.iteritems(values
):
123 if isinstance(v
, dict):
128 if not contains_list
and 'ns_id' in values
:
129 ns_id
= values
['ns_id']
131 ns_id
= values
[list_index
]['ns_id']
133 vnf_index
= values
[list_index
]['vnf_member_index'] if contains_list
else values
['vnf_member_index']
135 # Check the vim desired by the message
136 vnfr
= self
.common_db
.get_vnfr(ns_id
, vnf_index
)
137 vim_uuid
= vnfr
['vim-account-id']
139 if (contains_list
and 'vdu_name' in values
[list_index
]) or 'vdu_name' in values
:
140 vdu_name
= values
[list_index
]['vdu_name'] if contains_list
else values
['vdu_name']
141 vdur
= self
.common_db
.get_vdur(ns_id
, vnf_index
, vdu_name
)
143 values
[list_index
]['resource_uuid'] = vdur
['vim-id']
145 values
['resource_uuid'] = vdur
['vim-id']
146 message
= message
._replace
(value
=json
.dumps(values
))
148 vim_type
= self
.get_vim_type(vim_uuid
)
150 if vim_type
== "openstack":
151 log
.info("This message is for the OpenStack plugin.")
152 if message
.topic
== "metric_request":
153 response
= self
.openstack_metrics
.handle_request(message
.key
, values
, vim_uuid
)
154 if message
.topic
== "alarm_request":
155 response
= self
.openstack_alarms
.handle_message(message
.key
, values
, vim_uuid
)
157 elif vim_type
== "aws":
158 log
.info("This message is for the CloudWatch plugin.")
159 aws_conn
= self
.aws_connection
.setEnvironment()
160 if message
.topic
== "metric_request":
161 response
= self
.cloudwatch_metrics
.metric_calls(message
.key
, values
, aws_conn
)
162 if message
.topic
== "alarm_request":
163 response
= self
.cloudwatch_alarms
.alarm_calls(message
.key
, values
, aws_conn
)
165 elif vim_type
== "vmware":
166 log
.info("This metric_request message is for the vROPs plugin.")
167 if message
.topic
== "metric_request":
168 response
= self
.vrops_rcvr
.handle_metric_requests(message
.key
, values
, vim_uuid
)
169 if message
.topic
== "alarm_request":
170 response
= self
.vrops_rcvr
.handle_alarm_requests(message
.key
, values
, vim_uuid
)
173 log
.debug("vim_type is misconfigured or unsupported; %s",
176 self
._publish
_response
(message
.topic
, message
.key
, response
)
179 log
.exception("Exception processing message: ")
181 def _publish_response(self
, topic
: str, key
: str, msg
: dict):
182 topic
= topic
.replace('request', 'response')
183 key
= key
.replace('request', 'response')
184 producer
= Producer()
185 producer
.send(topic
=topic
, key
=key
, value
=json
.dumps(msg
))
190 if __name__
== '__main__':
191 CommonConsumer().run()