import json
import logging
-import os
import sys
-import yaml
+import six
import yaml
-import logstash
-
-logging.basicConfig(stream=sys.stdout,
- format='%(asctime)s %(message)s',
- datefmt='%m/%d/%Y %I:%M:%S %p',
- level=logging.INFO)
-log = logging.getLogger(__name__)
-log.addHandler(logstash.TCPLogstashHandler('dockerelk_logstash_1', 5000, version=1))
-
-
-sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..')))
from kafka import KafkaConsumer
+from osm_mon.core.settings import Config
from osm_mon.plugins.OpenStack.Aodh import alarming
from osm_mon.plugins.OpenStack.Gnocchi import metrics
from osm_mon.core.auth import AuthManager
from osm_mon.core.database import DatabaseManager
-# Initialize servers
-if "BROKER_URI" in os.environ:
- server = {'server': os.getenv("BROKER_URI")}
-else:
- server = {'server': 'localhost:9092'}
+from osm_common import dbmongo
-# Initialize consumers for alarms and metrics
-common_consumer = KafkaConsumer(bootstrap_servers=server['server'],
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer")
-
-auth_manager = AuthManager()
-database_manager = DatabaseManager()
-database_manager.create_tables()
+logging.basicConfig(stream=sys.stdout,
+ format='%(asctime)s %(message)s',
+ datefmt='%m/%d/%Y %I:%M:%S %p',
+ level=logging.INFO)
+log = logging.getLogger(__name__)
-# Create OpenStack alarming and metric instances
-openstack_metrics = metrics.Metrics()
-openstack_alarms = alarming.Alarming()
-# Create CloudWatch alarm and metric instances
-cloudwatch_alarms = plugin_alarms()
-cloudwatch_metrics = plugin_metrics()
-aws_connection = Connection()
-aws_access_credentials = AccessCredentials()
+def get_vim_type(db_manager, vim_uuid):
+ """Get the vim type that is required by the message."""
+ credentials = db_manager.get_credentials(vim_uuid)
+ return credentials.type
-# Create vROps plugin_receiver class instance
-vrops_rcvr = plugin_receiver.PluginReceiver()
+def get_vdur(common_db, nsr_id, member_index, vdu_name):
+ vnfr = get_vnfr(common_db, nsr_id, member_index)
+ for vdur in vnfr['vdur']:
+ if vdur['vdu-id-ref'] == vdu_name:
+ return vdur
+ raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name)
-def get_vim_type(vim_uuid):
- """Get the vim type that is required by the message."""
- try:
- credentials = database_manager.get_credentials(vim_uuid)
- return credentials.type
- except Exception as exc:
- log.exception("Error getting vim_type: ")
- return None
-
-
-# Define subscribe the consumer for the plugins
-topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
-# TODO: Remove access_credentials
-common_consumer.subscribe(topics)
-
-log.info("Listening for alarm_request and metric_request messages")
-for message in common_consumer:
- log.info("Message arrived: %s", message)
- try:
- try:
- values = json.loads(message.value)
- except ValueError:
- values = yaml.safe_load(message.value)
- # Check the message topic
- if message.topic == "metric_request":
- # Check the vim desired by the message
- vim_type = get_vim_type(values['vim_uuid'])
-
- if vim_type == "openstack":
- log.info("This message is for the OpenStack plugin.")
- openstack_metrics.metric_calls(message)
- elif vim_type == "aws":
- log.info("This message is for the CloudWatch plugin.")
- aws_conn = aws_connection.setEnvironment()
- cloudwatch_metrics.metric_calls(message, aws_conn)
-
- elif vim_type == "vmware":
- log.info("This metric_request message is for the vROPs plugin.")
- vrops_rcvr.consume(message)
- else:
- log.debug("vim_type is misconfigured or unsupported; %s",
- vim_type)
+def get_vnfr(common_db, nsr_id, member_index):
+ vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
+ return vnfr
- elif message.topic == "alarm_request":
- # Check the vim desired by the message
- vim_type = get_vim_type(values['vim_uuid'])
- if vim_type == "openstack":
- log.info("This message is for the OpenStack plugin.")
- openstack_alarms.alarming(message)
- elif vim_type == "aws":
- log.info("This message is for the CloudWatch plugin.")
- aws_conn = aws_connection.setEnvironment()
- cloudwatch_alarms.alarm_calls(message, aws_conn)
+def main():
+ cfg = Config.instance()
+ cfg.read_environ()
- elif vim_type == "vmware":
- log.info("This alarm_request message is for the vROPs plugin.")
- vrops_rcvr.consume(message)
+ auth_manager = AuthManager()
+ database_manager = DatabaseManager()
+ database_manager.create_tables()
- else:
- log.debug("vim_type is misconfigured or unsupported; %s",
- vim_type)
+ # Create OpenStack alarming and metric instances
+ openstack_metrics = metrics.Metrics()
+ openstack_alarms = alarming.Alarming()
- elif message.topic == "vim_account":
- if message.key == "create" or message.key == "edit":
- auth_manager.store_auth_credentials(values)
- if message.key == "delete":
- auth_manager.delete_auth_credentials(values)
+ # Create CloudWatch alarm and metric instances
+ cloudwatch_alarms = plugin_alarms()
+ cloudwatch_metrics = plugin_metrics()
+ aws_connection = Connection()
+ aws_access_credentials = AccessCredentials()
- # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly.
- elif message.topic == "access_credentials":
- # Check the vim desired by the message
- vim_type = get_vim_type(values['vim_uuid'])
+ # Create vROps plugin_receiver class instance
+ vrops_rcvr = plugin_receiver.PluginReceiver()
- if vim_type == "aws":
- log.info("This message is for the CloudWatch plugin.")
- aws_access_credentials.access_credential_calls(message)
+ common_db = dbmongo.DbMongo()
+ common_db_uri = cfg.MONGO_URI.split(':')
+ common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
- elif vim_type == "vmware":
- log.info("This access_credentials message is for the vROPs plugin.")
- vrops_rcvr.consume(message)
+ # Initialize consumers for alarms and metrics
+ common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ group_id="mon-consumer")
- else:
- log.debug("vim_type is misconfigured or unsupported; %s",
- vim_type)
+ # Define subscribe the consumer for the plugins
+ topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
+ # TODO: Remove access_credentials
+ common_consumer.subscribe(topics)
- else:
- log.info("This topic is not relevant to any of the MON plugins.")
+ log.info("Listening for alarm_request and metric_request messages")
+ for message in common_consumer:
+ log.info("Message arrived: %s", message)
+ try:
+ try:
+ values = json.loads(message.value)
+ except ValueError:
+ values = yaml.safe_load(message.value)
+ if message.topic == "vim_account":
+ if message.key == "create" or message.key == "edit":
+ auth_manager.store_auth_credentials(values)
+ if message.key == "delete":
+ auth_manager.delete_auth_credentials(values)
- except Exception as exc:
- log.exception("Exception: %s")
+ else:
+ # Get ns_id from message
+ # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
+ contains_list = False
+ list_index = None
+ for k, v in six.iteritems(values):
+ if isinstance(v, dict):
+ if 'ns_id' in v:
+ contains_list = True
+ list_index = k
+ break
+ if not contains_list and 'ns_id' in values:
+ ns_id = values['ns_id']
+ else:
+ ns_id = values[list_index]['ns_id']
+
+ vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
+
+ # Check the vim desired by the message
+ vnfr = get_vnfr(common_db, ns_id, vnf_index)
+ vim_uuid = vnfr['vim-account-id']
+ vim_type = get_vim_type(database_manager, vim_uuid)
+
+ if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
+ vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
+ vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name)
+ if contains_list:
+ values[list_index]['resource_uuid'] = vdur['vim-id']
+ else:
+ values['resource_uuid'] = vdur['vim-id']
+ message = message._replace(value=json.dumps(values))
+
+ if vim_type == "openstack":
+ log.info("This message is for the OpenStack plugin.")
+ if message.topic == "metric_request":
+ openstack_metrics.metric_calls(message, vim_uuid)
+ if message.topic == "alarm_request":
+ openstack_alarms.alarming(message, vim_uuid)
+
+ elif vim_type == "aws":
+ log.info("This message is for the CloudWatch plugin.")
+ aws_conn = aws_connection.setEnvironment()
+ if message.topic == "metric_request":
+ cloudwatch_metrics.metric_calls(message, aws_conn)
+ if message.topic == "alarm_request":
+ cloudwatch_alarms.alarm_calls(message, aws_conn)
+ if message.topic == "access_credentials":
+ aws_access_credentials.access_credential_calls(message)
+
+ elif vim_type == "vmware":
+ log.info("This metric_request message is for the vROPs plugin.")
+ vrops_rcvr.consume(message)
+
+ else:
+ log.debug("vim_type is misconfigured or unsupported; %s",
+ vim_type)
+
+ except Exception:
+ log.exception("Exception processing message: ")
+
+
+if __name__ == '__main__':
+ main()