Adds vdu, ns, threshold and operation info to alarm notification
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import sys
25
26 import six
27 import yaml
28
29 from kafka import KafkaConsumer
30
31 from osm_mon.core.settings import Config
32 from osm_mon.plugins.OpenStack.Aodh import alarming
33 from osm_mon.plugins.OpenStack.Gnocchi import metrics
34
35 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
36 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
37 from osm_mon.plugins.CloudWatch.connection import Connection
38 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
39
40 from osm_mon.plugins.vRealiseOps import plugin_receiver
41
42 from osm_mon.core.auth import AuthManager
43 from osm_mon.core.database import DatabaseManager
44
45 from osm_common import dbmongo
46
47 logging.basicConfig(stream=sys.stdout,
48 format='%(asctime)s %(message)s',
49 datefmt='%m/%d/%Y %I:%M:%S %p',
50 level=logging.INFO)
51 log = logging.getLogger(__name__)
52
53
54 def get_vim_type(db_manager, vim_uuid):
55 """Get the vim type that is required by the message."""
56 credentials = db_manager.get_credentials(vim_uuid)
57 return credentials.type
58
59
60 def get_vdur(common_db, nsr_id, member_index, vdu_name):
61 vnfr = get_vnfr(common_db, nsr_id, member_index)
62 for vdur in vnfr['vdur']:
63 if vdur['vdu-id-ref'] == vdu_name:
64 return vdur
65 raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name)
66
67
68 def get_vnfr(common_db, nsr_id, member_index):
69 vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
70 return vnfr
71
72
73 def main():
74 cfg = Config.instance()
75 cfg.read_environ()
76
77 auth_manager = AuthManager()
78 database_manager = DatabaseManager()
79 database_manager.create_tables()
80
81 # Create OpenStack alarming and metric instances
82 openstack_metrics = metrics.Metrics()
83 openstack_alarms = alarming.Alarming()
84
85 # Create CloudWatch alarm and metric instances
86 cloudwatch_alarms = plugin_alarms()
87 cloudwatch_metrics = plugin_metrics()
88 aws_connection = Connection()
89 aws_access_credentials = AccessCredentials()
90
91 # Create vROps plugin_receiver class instance
92 vrops_rcvr = plugin_receiver.PluginReceiver()
93
94 common_db = dbmongo.DbMongo()
95 common_db_uri = cfg.MONGO_URI.split(':')
96 common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
97
98 # Initialize consumers for alarms and metrics
99 common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
100 key_deserializer=bytes.decode,
101 value_deserializer=bytes.decode,
102 group_id="mon-consumer")
103
104 # Define subscribe the consumer for the plugins
105 topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
106 # TODO: Remove access_credentials
107 common_consumer.subscribe(topics)
108
109 log.info("Listening for alarm_request and metric_request messages")
110 for message in common_consumer:
111 log.info("Message arrived: %s", message)
112 try:
113 try:
114 values = json.loads(message.value)
115 except ValueError:
116 values = yaml.safe_load(message.value)
117
118 if message.topic == "vim_account":
119 if message.key == "create" or message.key == "edit":
120 auth_manager.store_auth_credentials(values)
121 if message.key == "delete":
122 auth_manager.delete_auth_credentials(values)
123
124 else:
125 # Get ns_id from message
126 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
127 contains_list = False
128 list_index = None
129 for k, v in six.iteritems(values):
130 if isinstance(v, dict):
131 if 'ns_id' in v:
132 contains_list = True
133 list_index = k
134 break
135 if not contains_list and 'ns_id' in values:
136 ns_id = values['ns_id']
137 else:
138 ns_id = values[list_index]['ns_id']
139
140 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
141
142 # Check the vim desired by the message
143 vnfr = get_vnfr(common_db, ns_id, vnf_index)
144 vim_uuid = vnfr['vim-account-id']
145 vim_type = get_vim_type(database_manager, vim_uuid)
146
147 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
148 vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
149 vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name)
150 if contains_list:
151 values[list_index]['resource_uuid'] = vdur['vim-id']
152 else:
153 values['resource_uuid'] = vdur['vim-id']
154 message = message._replace(value=json.dumps(values))
155
156 if vim_type == "openstack":
157 log.info("This message is for the OpenStack plugin.")
158 if message.topic == "metric_request":
159 openstack_metrics.metric_calls(message, vim_uuid)
160 if message.topic == "alarm_request":
161 openstack_alarms.alarming(message, vim_uuid)
162
163 elif vim_type == "aws":
164 log.info("This message is for the CloudWatch plugin.")
165 aws_conn = aws_connection.setEnvironment()
166 if message.topic == "metric_request":
167 cloudwatch_metrics.metric_calls(message, aws_conn)
168 if message.topic == "alarm_request":
169 cloudwatch_alarms.alarm_calls(message, aws_conn)
170 if message.topic == "access_credentials":
171 aws_access_credentials.access_credential_calls(message)
172
173 elif vim_type == "vmware":
174 log.info("This metric_request message is for the vROPs plugin.")
175 vrops_rcvr.consume(message)
176
177 else:
178 log.debug("vim_type is misconfigured or unsupported; %s",
179 vim_type)
180
181 except Exception:
182 log.exception("Exception processing message: ")
183
184
185 if __name__ == '__main__':
186 main()