Replaces vdu-id-ref for name during VDU lookup
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25 import threading
26
27 import six
28 import yaml
29 from kafka import KafkaConsumer
30 from osm_common import dbmongo
31
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.settings import Config
35 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
36 from osm_mon.plugins.CloudWatch.connection import Connection
37 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
38 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
39 from osm_mon.plugins.OpenStack.Aodh import alarming
40 from osm_mon.plugins.OpenStack.Gnocchi import metrics
41 from osm_mon.plugins.vRealiseOps import plugin_receiver
42
43 logging.basicConfig(stream=sys.stdout,
44 format='%(asctime)s %(message)s',
45 datefmt='%m/%d/%Y %I:%M:%S %p',
46 level=logging.INFO)
47 log = logging.getLogger(__name__)
48
49
50 class CommonConsumer:
51
52 def __init__(self):
53 cfg = Config.instance()
54
55 self.auth_manager = AuthManager()
56 self.database_manager = DatabaseManager()
57 self.database_manager.create_tables()
58
59 # Create OpenStack alarming and metric instances
60 self.openstack_metrics = metrics.Metrics()
61 self.openstack_alarms = alarming.Alarming()
62
63 # Create CloudWatch alarm and metric instances
64 self.cloudwatch_alarms = plugin_alarms()
65 self.cloudwatch_metrics = plugin_metrics()
66 self.aws_connection = Connection()
67 self.aws_access_credentials = AccessCredentials()
68
69 # Create vROps plugin_receiver class instance
70 self.vrops_rcvr = plugin_receiver.PluginReceiver()
71
72 log.info("Connecting to MongoDB...")
73 self.common_db = dbmongo.DbMongo()
74 common_db_uri = cfg.MONGO_URI.split(':')
75 self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
76 log.info("Connection successful.")
77
78 # Initialize consumers for alarms and metrics
79 self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
80 key_deserializer=bytes.decode,
81 value_deserializer=bytes.decode,
82 group_id="mon-consumer")
83
84 # Define subscribe the consumer for the plugins
85 topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
86 # TODO: Remove access_credentials
87 self.common_consumer.subscribe(topics)
88
89 def get_vim_type(self, vim_uuid):
90 """Get the vim type that is required by the message."""
91 credentials = self.database_manager.get_credentials(vim_uuid)
92 return credentials.type
93
94 def get_vdur(self, nsr_id, member_index, vdu_name):
95 vnfr = self.get_vnfr(nsr_id, member_index)
96 for vdur in vnfr['vdur']:
97 if vdur['name'] == vdu_name:
98 return vdur
99 raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
100 vdu_name)
101
102 def get_vnfr(self, nsr_id, member_index):
103 vnfr = self.common_db.get_one(table="vnfrs",
104 filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
105 return vnfr
106
107 def run(self):
108 log.info("Listening for messages...")
109 for message in self.common_consumer:
110 t = threading.Thread(target=self.consume_message, args=(message,))
111 t.start()
112
113 def consume_message(self, message):
114 log.info("Message arrived: %s", message)
115 try:
116 try:
117 values = json.loads(message.value)
118 except ValueError:
119 values = yaml.safe_load(message.value)
120
121 if message.topic == "vim_account":
122 if message.key == "create" or message.key == "edit":
123 self.auth_manager.store_auth_credentials(values)
124 if message.key == "delete":
125 self.auth_manager.delete_auth_credentials(values)
126
127 else:
128 # Get ns_id from message
129 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
130 contains_list = False
131 list_index = None
132 for k, v in six.iteritems(values):
133 if isinstance(v, dict):
134 if 'ns_id' in v:
135 contains_list = True
136 list_index = k
137 break
138 if not contains_list and 'ns_id' in values:
139 ns_id = values['ns_id']
140 else:
141 ns_id = values[list_index]['ns_id']
142
143 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
144
145 # Check the vim desired by the message
146 vnfr = self.get_vnfr(ns_id, vnf_index)
147 vim_uuid = vnfr['vim-account-id']
148
149 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
150 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
151 vdur = self.get_vdur(ns_id, vnf_index, vdu_name)
152 if contains_list:
153 values[list_index]['resource_uuid'] = vdur['vim-id']
154 else:
155 values['resource_uuid'] = vdur['vim-id']
156 message = message._replace(value=json.dumps(values))
157
158 vim_type = self.get_vim_type(vim_uuid)
159
160 if vim_type == "openstack":
161 log.info("This message is for the OpenStack plugin.")
162 if message.topic == "metric_request":
163 self.openstack_metrics.metric_calls(message, vim_uuid)
164 if message.topic == "alarm_request":
165 self.openstack_alarms.alarming(message, vim_uuid)
166
167 elif vim_type == "aws":
168 log.info("This message is for the CloudWatch plugin.")
169 aws_conn = self.aws_connection.setEnvironment()
170 if message.topic == "metric_request":
171 self.cloudwatch_metrics.metric_calls(message, aws_conn)
172 if message.topic == "alarm_request":
173 self.cloudwatch_alarms.alarm_calls(message, aws_conn)
174 if message.topic == "access_credentials":
175 self.aws_access_credentials.access_credential_calls(message)
176
177 elif vim_type == "vmware":
178 log.info("This metric_request message is for the vROPs plugin.")
179 self.vrops_rcvr.consume(message,vim_uuid)
180
181 else:
182 log.debug("vim_type is misconfigured or unsupported; %s",
183 vim_type)
184
185 except Exception:
186 log.exception("Exception processing message: ")
187
188
189 if __name__ == '__main__':
190 CommonConsumer().run()