import yaml
+from osm_mon.core.settings import Config
+
logging.basicConfig(stream=sys.stdout,
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.INFO)
log = logging.getLogger(__name__)
-
sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..')))
from kafka import KafkaConsumer
from osm_mon.core.auth import AuthManager
from osm_mon.core.database import DatabaseManager
-# Initialize servers
-if "BROKER_URI" in os.environ:
- server = {'server': os.getenv("BROKER_URI")}
-else:
- server = {'server': 'localhost:9092'}
+cfg = Config.instance()
+cfg.read_environ()
# Initialize consumers for alarms and metrics
-common_consumer = KafkaConsumer(bootstrap_servers=server['server'],
+common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
group_id="mon-consumer")
try:
credentials = database_manager.get_credentials(vim_uuid)
return credentials.type
- except Exception as exc:
+ except Exception:
log.exception("Error getting vim_type: ")
return None
values = json.loads(message.value)
except ValueError:
values = yaml.safe_load(message.value)
- # Check the message topic
- if message.topic == "metric_request":
- # Check the vim desired by the message
- vim_type = get_vim_type(values['vim_uuid'])
-
- if vim_type == "openstack":
- log.info("This message is for the OpenStack plugin.")
- openstack_metrics.metric_calls(message)
- elif vim_type == "aws":
- log.info("This message is for the CloudWatch plugin.")
- aws_conn = aws_connection.setEnvironment()
- cloudwatch_metrics.metric_calls(message, aws_conn)
-
- elif vim_type == "vmware":
- log.info("This metric_request message is for the vROPs plugin.")
- vrops_rcvr.consume(message)
- else:
- log.debug("vim_type is misconfigured or unsupported; %s",
- vim_type)
-
- elif message.topic == "alarm_request":
- # Check the vim desired by the message
- vim_type = get_vim_type(values['vim_uuid'])
- if vim_type == "openstack":
- log.info("This message is for the OpenStack plugin.")
- openstack_alarms.alarming(message)
-
- elif vim_type == "aws":
- log.info("This message is for the CloudWatch plugin.")
- aws_conn = aws_connection.setEnvironment()
- cloudwatch_alarms.alarm_calls(message, aws_conn)
-
- elif vim_type == "vmware":
- log.info("This alarm_request message is for the vROPs plugin.")
- vrops_rcvr.consume(message)
-
- else:
- log.debug("vim_type is misconfigured or unsupported; %s",
- vim_type)
-
- elif message.topic == "vim_account":
+ if message.topic == "vim_account":
if message.key == "create" or message.key == "edit":
auth_manager.store_auth_credentials(values)
if message.key == "delete":
auth_manager.delete_auth_credentials(values)
- # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly.
- elif message.topic == "access_credentials":
+ else:
# Check the vim desired by the message
vim_type = get_vim_type(values['vim_uuid'])
+ if vim_type == "openstack":
+ log.info("This message is for the OpenStack plugin.")
+ if message.topic == "metric_request":
+ openstack_metrics.metric_calls(message)
+ if message.topic == "alarm_request":
+ openstack_alarms.alarming(message)
- if vim_type == "aws":
+ elif vim_type == "aws":
log.info("This message is for the CloudWatch plugin.")
- aws_access_credentials.access_credential_calls(message)
+ aws_conn = aws_connection.setEnvironment()
+ if message.topic == "metric_request":
+ cloudwatch_metrics.metric_calls(message, aws_conn)
+ if message.topic == "alarm_request":
+ cloudwatch_alarms.alarm_calls(message, aws_conn)
+ if message.topic == "access_credentials":
+ aws_access_credentials.access_credential_calls(message)
elif vim_type == "vmware":
- log.info("This access_credentials message is for the vROPs plugin.")
+ log.info("This metric_request message is for the vROPs plugin.")
vrops_rcvr.consume(message)
else:
log.debug("vim_type is misconfigured or unsupported; %s",
vim_type)
- else:
- log.info("This topic is not relevant to any of the MON plugins.")
-
-
- except Exception as exc:
- log.exception("Exception: %s")
+ except Exception:
+ log.exception("Exception processing message: ")