Adds granularity support in OpenStack vim config
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 import os
25 import sys
26
27 import yaml
28
29 from osm_mon.core.settings import Config
30
31 logging.basicConfig(stream=sys.stdout,
32 format='%(asctime)s %(message)s',
33 datefmt='%m/%d/%Y %I:%M:%S %p',
34 level=logging.INFO)
35 log = logging.getLogger(__name__)
36
37 sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..')))
38
39 from kafka import KafkaConsumer
40
41 from osm_mon.plugins.OpenStack.Aodh import alarming
42 from osm_mon.plugins.OpenStack.Gnocchi import metrics
43
44 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
45 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
46 from osm_mon.plugins.CloudWatch.connection import Connection
47 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
48
49 from osm_mon.plugins.vRealiseOps import plugin_receiver
50
51 from osm_mon.core.auth import AuthManager
52 from osm_mon.core.database import DatabaseManager
53
54 cfg = Config.instance()
55 cfg.read_environ()
56
57 # Initialize consumers for alarms and metrics
58 common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
59 key_deserializer=bytes.decode,
60 value_deserializer=bytes.decode,
61 group_id="mon-consumer")
62
63 auth_manager = AuthManager()
64 database_manager = DatabaseManager()
65 database_manager.create_tables()
66
67 # Create OpenStack alarming and metric instances
68 openstack_metrics = metrics.Metrics()
69 openstack_alarms = alarming.Alarming()
70
71 # Create CloudWatch alarm and metric instances
72 cloudwatch_alarms = plugin_alarms()
73 cloudwatch_metrics = plugin_metrics()
74 aws_connection = Connection()
75 aws_access_credentials = AccessCredentials()
76
77 # Create vROps plugin_receiver class instance
78 vrops_rcvr = plugin_receiver.PluginReceiver()
79
80
81 def get_vim_type(vim_uuid):
82 """Get the vim type that is required by the message."""
83 try:
84 credentials = database_manager.get_credentials(vim_uuid)
85 return credentials.type
86 except Exception:
87 log.exception("Error getting vim_type: ")
88 return None
89
90
91 # Define subscribe the consumer for the plugins
92 topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
93 # TODO: Remove access_credentials
94 common_consumer.subscribe(topics)
95
96 log.info("Listening for alarm_request and metric_request messages")
97 for message in common_consumer:
98 log.info("Message arrived: %s", message)
99 try:
100 try:
101 values = json.loads(message.value)
102 except ValueError:
103 values = yaml.safe_load(message.value)
104
105 if message.topic == "vim_account":
106 if message.key == "create" or message.key == "edit":
107 auth_manager.store_auth_credentials(values)
108 if message.key == "delete":
109 auth_manager.delete_auth_credentials(values)
110
111 else:
112 # Check the vim desired by the message
113 vim_type = get_vim_type(values['vim_uuid'])
114 if vim_type == "openstack":
115 log.info("This message is for the OpenStack plugin.")
116 if message.topic == "metric_request":
117 openstack_metrics.metric_calls(message)
118 if message.topic == "alarm_request":
119 openstack_alarms.alarming(message)
120
121 elif vim_type == "aws":
122 log.info("This message is for the CloudWatch plugin.")
123 aws_conn = aws_connection.setEnvironment()
124 if message.topic == "metric_request":
125 cloudwatch_metrics.metric_calls(message, aws_conn)
126 if message.topic == "alarm_request":
127 cloudwatch_alarms.alarm_calls(message, aws_conn)
128 if message.topic == "access_credentials":
129 aws_access_credentials.access_credential_calls(message)
130
131 elif vim_type == "vmware":
132 log.info("This metric_request message is for the vROPs plugin.")
133 vrops_rcvr.consume(message)
134
135 else:
136 log.debug("vim_type is misconfigured or unsupported; %s",
137 vim_type)
138
139 except Exception:
140 log.exception("Exception processing message: ")