dc5816e9b19ff505d990d018d1c1fc0ee4e9810c
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
29 from kafka
import KafkaConsumer
31 from osm_mon
.core
.settings
import Config
32 from osm_mon
.plugins
.OpenStack
.Aodh
import alarming
33 from osm_mon
.plugins
.OpenStack
.Gnocchi
import metrics
35 from osm_mon
.plugins
.CloudWatch
.plugin_alarm
import plugin_alarms
36 from osm_mon
.plugins
.CloudWatch
.plugin_metric
import plugin_metrics
37 from osm_mon
.plugins
.CloudWatch
.connection
import Connection
38 from osm_mon
.plugins
.CloudWatch
.access_credentials
import AccessCredentials
40 from osm_mon
.plugins
.vRealiseOps
import plugin_receiver
42 from osm_mon
.core
.auth
import AuthManager
43 from osm_mon
.core
.database
import DatabaseManager
45 from osm_common
import dbmongo
47 logging
.basicConfig(stream
=sys
.stdout
,
48 format
='%(asctime)s %(message)s',
49 datefmt
='%m/%d/%Y %I:%M:%S %p',
51 log
= logging
.getLogger(__name__
)
54 def get_vim_type(db_manager
, vim_uuid
):
55 """Get the vim type that is required by the message."""
56 credentials
= db_manager
.get_credentials(vim_uuid
)
57 return credentials
.type
60 def get_vdur(common_db
, nsr_id
, member_index
, vdu_name
):
61 vnfr
= get_vnfr(common_db
, nsr_id
, member_index
)
62 for vdur
in vnfr
['vdur']:
63 if vdur
['vdu-id-ref'] == vdu_name
:
65 raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id
, member_index
, vdu_name
)
68 def get_vnfr(common_db
, nsr_id
, member_index
):
69 vnfr
= common_db
.get_one(table
="vnfrs", filter={"nsr-id-ref": nsr_id
, "member-vnf-index-ref": str(member_index
)})
74 cfg
= Config
.instance()
77 auth_manager
= AuthManager()
78 database_manager
= DatabaseManager()
79 database_manager
.create_tables()
81 # Create OpenStack alarming and metric instances
82 openstack_metrics
= metrics
.Metrics()
83 openstack_alarms
= alarming
.Alarming()
85 # Create CloudWatch alarm and metric instances
86 cloudwatch_alarms
= plugin_alarms()
87 cloudwatch_metrics
= plugin_metrics()
88 aws_connection
= Connection()
89 aws_access_credentials
= AccessCredentials()
91 # Create vROps plugin_receiver class instance
92 vrops_rcvr
= plugin_receiver
.PluginReceiver()
94 common_db
= dbmongo
.DbMongo()
95 common_db_uri
= cfg
.MONGO_URI
.split(':')
96 common_db
.db_connect({'host': common_db_uri
[0], 'port': int(common_db_uri
[1]), 'name': 'osm'})
98 # Initialize consumers for alarms and metrics
99 common_consumer
= KafkaConsumer(bootstrap_servers
=cfg
.BROKER_URI
,
100 key_deserializer
=bytes
.decode
,
101 value_deserializer
=bytes
.decode
,
102 group_id
="mon-consumer")
104 # Define subscribe the consumer for the plugins
105 topics
= ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
106 # TODO: Remove access_credentials
107 common_consumer
.subscribe(topics
)
109 log
.info("Listening for alarm_request and metric_request messages")
110 for message
in common_consumer
:
111 log
.info("Message arrived: %s", message
)
114 values
= json
.loads(message
.value
)
116 values
= yaml
.safe_load(message
.value
)
118 if message
.topic
== "vim_account":
119 if message
.key
== "create" or message
.key
== "edit":
120 auth_manager
.store_auth_credentials(values
)
121 if message
.key
== "delete":
122 auth_manager
.delete_auth_credentials(values
)
125 # Get ns_id from message
126 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
127 contains_list
= False
129 for k
, v
in six
.iteritems(values
):
130 if isinstance(v
, dict):
135 if not contains_list
and 'ns_id' in values
:
136 ns_id
= values
['ns_id']
138 ns_id
= values
[list_index
]['ns_id']
140 vnf_index
= values
[list_index
]['vnf_member_index'] if contains_list
else values
['vnf_member_index']
142 # Check the vim desired by the message
143 vnfr
= get_vnfr(common_db
, ns_id
, vnf_index
)
144 vim_uuid
= vnfr
['vim-account-id']
145 vim_type
= get_vim_type(database_manager
, vim_uuid
)
147 if (contains_list
and 'vdu_name' in values
[list_index
]) or 'vdu_name' in values
:
148 vdu_name
= values
[list_index
]['vdu_name'] if contains_list
else values
['vdu_name']
149 vdur
= get_vdur(common_db
, ns_id
, vnf_index
, vdu_name
)
151 values
[list_index
]['resource_uuid'] = vdur
['vim-id']
153 values
['resource_uuid'] = vdur
['vim-id']
154 message
= message
._replace
(value
=json
.dumps(values
))
156 if vim_type
== "openstack":
157 log
.info("This message is for the OpenStack plugin.")
158 if message
.topic
== "metric_request":
159 openstack_metrics
.metric_calls(message
, vim_uuid
)
160 if message
.topic
== "alarm_request":
161 openstack_alarms
.alarming(message
, vim_uuid
)
163 elif vim_type
== "aws":
164 log
.info("This message is for the CloudWatch plugin.")
165 aws_conn
= aws_connection
.setEnvironment()
166 if message
.topic
== "metric_request":
167 cloudwatch_metrics
.metric_calls(message
, aws_conn
)
168 if message
.topic
== "alarm_request":
169 cloudwatch_alarms
.alarm_calls(message
, aws_conn
)
170 if message
.topic
== "access_credentials":
171 aws_access_credentials
.access_credential_calls(message
)
173 elif vim_type
== "vmware":
174 log
.info("This metric_request message is for the vROPs plugin.")
175 vrops_rcvr
.consume(message
)
178 log
.debug("vim_type is misconfigured or unsupported; %s",
182 log
.exception("Exception processing message: ")
185 if __name__
== '__main__':