Adds support for vdu_name, ns_id and vnf_member_index
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25
26 import six
27 import yaml
28
29 from kafka import KafkaConsumer
30
31 from osm_mon.core.settings import Config
32 from osm_mon.plugins.OpenStack.Aodh import alarming
33 from osm_mon.plugins.OpenStack.Gnocchi import metrics
34
35 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
36 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
37 from osm_mon.plugins.CloudWatch.connection import Connection
38 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
39
40 from osm_mon.plugins.vRealiseOps import plugin_receiver
41
42 from osm_mon.core.auth import AuthManager
43 from osm_mon.core.database import DatabaseManager
44
45 from osm_common import dbmongo
46
47 logging.basicConfig(stream=sys.stdout,
48 format='%(asctime)s %(message)s',
49 datefmt='%m/%d/%Y %I:%M:%S %p',
50 level=logging.INFO)
51 log = logging.getLogger(__name__)
52
53
54 def get_vim_type(db_manager, vim_uuid):
55 """Get the vim type that is required by the message."""
56 credentials = db_manager.get_credentials(vim_uuid)
57 return credentials.type
58
59
60 def get_vdur(common_db, nsr_id, member_index, vdu_name):
61 vnfr = get_vnfr(common_db, nsr_id, member_index)
62 for vdur in vnfr['vdur']:
63 if vdur['vdu-id-ref'] == vdu_name:
64 return vdur
65 raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name)
66
67
68 def get_vnfr(common_db, nsr_id, member_index):
69 vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
70 return vnfr
71
72
73 def main():
74 cfg = Config.instance()
75 cfg.read_environ()
76
77 auth_manager = AuthManager()
78 database_manager = DatabaseManager()
79 database_manager.create_tables()
80
81 # Create OpenStack alarming and metric instances
82 openstack_metrics = metrics.Metrics()
83 openstack_alarms = alarming.Alarming()
84
85 # Create CloudWatch alarm and metric instances
86 cloudwatch_alarms = plugin_alarms()
87 cloudwatch_metrics = plugin_metrics()
88 aws_connection = Connection()
89 aws_access_credentials = AccessCredentials()
90
91 # Create vROps plugin_receiver class instance
92 vrops_rcvr = plugin_receiver.PluginReceiver()
93
94 common_db = dbmongo.DbMongo()
95 common_db_uri = cfg.MONGO_URI.split(':')
96 common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
97
98 # Initialize consumers for alarms and metrics
99 common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
100 key_deserializer=bytes.decode,
101 value_deserializer=bytes.decode,
102 group_id="mon-consumer")
103
104 # Define subscribe the consumer for the plugins
105 topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
106 # TODO: Remove access_credentials
107 common_consumer.subscribe(topics)
108
109 log.info("Listening for alarm_request and metric_request messages")
110 for message in common_consumer:
111 log.info("Message arrived: %s", message)
112 try:
113 try:
114 values = json.loads(message.value)
115 except ValueError:
116 values = yaml.safe_load(message.value)
117
118 if message.topic == "vim_account":
119 if message.key == "create" or message.key == "edit":
120 auth_manager.store_auth_credentials(values)
121 if message.key == "delete":
122 auth_manager.delete_auth_credentials(values)
123
124 else:
125 # Get ns_id from message
126 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
127 contains_list = False
128 list_index = None
129 ns_id = None
130 for k, v in six.iteritems(values):
131 if isinstance(v, dict):
132 if 'ns_id' in v:
133 ns_id = v['ns_id']
134 contains_list = True
135 list_index = k
136 if not contains_list and 'ns_id' in values:
137 ns_id = values['ns_id']
138
139 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
140
141 # Check the vim desired by the message
142 vnfr = get_vnfr(common_db, ns_id, vnf_index)
143 vim_uuid = vnfr['vim-account-id']
144 vim_type = get_vim_type(database_manager, vim_uuid)
145
146 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
147 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
148 vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name)
149 if contains_list:
150 values[list_index]['resource_uuid'] = vdur['vim-id']
151 else:
152 values['resource_uuid'] = vdur['vim-id']
153 message = message._replace(value=json.dumps(values))
154
155 if vim_type == "openstack":
156 log.info("This message is for the OpenStack plugin.")
157 if message.topic == "metric_request":
158 openstack_metrics.metric_calls(message, vim_uuid)
159 if message.topic == "alarm_request":
160 openstack_alarms.alarming(message, vim_uuid)
161
162 elif vim_type == "aws":
163 log.info("This message is for the CloudWatch plugin.")
164 aws_conn = aws_connection.setEnvironment()
165 if message.topic == "metric_request":
166 cloudwatch_metrics.metric_calls(message, aws_conn)
167 if message.topic == "alarm_request":
168 cloudwatch_alarms.alarm_calls(message, aws_conn)
169 if message.topic == "access_credentials":
170 aws_access_credentials.access_credential_calls(message)
171
172 elif vim_type == "vmware":
173 log.info("This metric_request message is for the vROPs plugin.")
174 vrops_rcvr.consume(message)
175
176 else:
177 log.debug("vim_type is misconfigured or unsupported; %s",
178 vim_type)
179
180 except Exception:
181 log.exception("Exception processing message: ")
182
183
184 if __name__ == '__main__':
185 main()