import json
import logging
import sys
+import time
+from json import JSONDecodeError
import six
import yaml
-from kafka import KafkaConsumer
-
+from osm_mon.common.common_db_client import CommonDbClient
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import DatabaseManager
+from osm_mon.core.message_bus.consumer import Consumer
+from osm_mon.core.message_bus.producer import Producer
from osm_mon.core.settings import Config
-from osm_mon.plugins.OpenStack.Aodh import alarming
-from osm_mon.plugins.OpenStack.Gnocchi import metrics
-
+from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
+from osm_mon.plugins.CloudWatch.connection import Connection
from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
-from osm_mon.plugins.CloudWatch.connection import Connection
-from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
-
+from osm_mon.plugins.OpenStack.Aodh import alarm_handler
+from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
from osm_mon.plugins.vRealiseOps import plugin_receiver
-from osm_mon.core.auth import AuthManager
-from osm_mon.core.database import DatabaseManager
-
-from osm_common import dbmongo
+cfg = Config.instance()
logging.basicConfig(stream=sys.stdout,
- format='%(asctime)s %(message)s',
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
- level=logging.INFO)
+ level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
log = logging.getLogger(__name__)
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-def get_vim_type(db_manager, vim_uuid):
- """Get the vim type that is required by the message."""
- credentials = db_manager.get_credentials(vim_uuid)
- return credentials.type
+class CommonConsumer:
-def get_vdur(common_db, nsr_id, member_index, vdu_name):
- vnfr = get_vnfr(common_db, nsr_id, member_index)
- for vdur in vnfr['vdur']:
- if vdur['vdu-id-ref'] == vdu_name:
- return vdur
- raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name)
+ def __init__(self):
+ self.auth_manager = AuthManager()
+ self.database_manager = DatabaseManager()
+ self.database_manager.create_tables()
+ # Create OpenStack alarming and metric instances
+ self.openstack_metrics = metric_handler.OpenstackMetricHandler()
+ self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
-def get_vnfr(common_db, nsr_id, member_index):
- vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
- return vnfr
+ # Create CloudWatch alarm and metric instances
+ self.cloudwatch_alarms = plugin_alarms()
+ self.cloudwatch_metrics = plugin_metrics()
+ self.aws_connection = Connection()
+ self.aws_access_credentials = AccessCredentials()
+ # Create vROps plugin_receiver class instance
+ self.vrops_rcvr = plugin_receiver.PluginReceiver()
-def main():
- cfg = Config.instance()
- cfg.read_environ()
+ log.info("Connecting to MongoDB...")
+ self.common_db = CommonDbClient()
+ log.info("Connection successful.")
- auth_manager = AuthManager()
- database_manager = DatabaseManager()
- database_manager.create_tables()
+ def get_vim_type(self, vim_uuid):
+ """Get the vim type that is required by the message."""
+ credentials = self.database_manager.get_credentials(vim_uuid)
+ return credentials.type
- # Create OpenStack alarming and metric instances
- openstack_metrics = metrics.Metrics()
- openstack_alarms = alarming.Alarming()
+ def run(self):
+ common_consumer = Consumer("mon-consumer")
- # Create CloudWatch alarm and metric instances
- cloudwatch_alarms = plugin_alarms()
- cloudwatch_metrics = plugin_metrics()
- aws_connection = Connection()
- aws_access_credentials = AccessCredentials()
-
- # Create vROps plugin_receiver class instance
- vrops_rcvr = plugin_receiver.PluginReceiver()
-
- common_db = dbmongo.DbMongo()
- common_db_uri = cfg.MONGO_URI.split(':')
- common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
-
- # Initialize consumers for alarms and metrics
- common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer")
-
- # Define subscribe the consumer for the plugins
- topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
- # TODO: Remove access_credentials
- common_consumer.subscribe(topics)
-
- log.info("Listening for alarm_request and metric_request messages")
- for message in common_consumer:
+ topics = ['metric_request', 'alarm_request', 'vim_account']
+ common_consumer.subscribe(topics)
+ retries = 1
+ max_retries = 5
+ while True:
+ try:
+ common_consumer.poll()
+ common_consumer.seek_to_end()
+ break
+ except Exception:
+ log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.")
+ log.error("Retry number %d of %d", retries, max_retries)
+ if retries >= max_retries:
+ log.error("Achieved max number of retries. Logging exception and exiting...")
+ log.exception("Exception: ")
+ return
+ retries = retries + 1
+ time.sleep(2)
+
+ log.info("Listening for messages...")
+ for message in common_consumer:
+ self.consume_message(message)
+
+ def consume_message(self, message):
log.info("Message arrived: %s", message)
try:
try:
values = json.loads(message.value)
- except ValueError:
+ except JSONDecodeError:
values = yaml.safe_load(message.value)
+ response = None
+
if message.topic == "vim_account":
if message.key == "create" or message.key == "edit":
- auth_manager.store_auth_credentials(values)
+ values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
+ values['schema_version'],
+ values['_id'])
+ self.auth_manager.store_auth_credentials(values)
if message.key == "delete":
- auth_manager.delete_auth_credentials(values)
+ self.auth_manager.delete_auth_credentials(values)
else:
# Get ns_id from message
# TODO: Standardize all message models to avoid the need of figuring out where are certain fields
contains_list = False
list_index = None
- ns_id = None
for k, v in six.iteritems(values):
if isinstance(v, dict):
if 'ns_id' in v:
- ns_id = v['ns_id']
contains_list = True
list_index = k
+ break
if not contains_list and 'ns_id' in values:
ns_id = values['ns_id']
+ else:
+ ns_id = values[list_index]['ns_id']
vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
# Check the vim desired by the message
- vnfr = get_vnfr(common_db, ns_id, vnf_index)
+ vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
vim_uuid = vnfr['vim-account-id']
- vim_type = get_vim_type(database_manager, vim_uuid)
if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
- vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name)
+ vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
if contains_list:
values[list_index]['resource_uuid'] = vdur['vim-id']
else:
values['resource_uuid'] = vdur['vim-id']
message = message._replace(value=json.dumps(values))
+ vim_type = self.get_vim_type(vim_uuid)
+
if vim_type == "openstack":
log.info("This message is for the OpenStack plugin.")
if message.topic == "metric_request":
- openstack_metrics.metric_calls(message, vim_uuid)
+ response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
if message.topic == "alarm_request":
- openstack_alarms.alarming(message, vim_uuid)
+ response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
elif vim_type == "aws":
log.info("This message is for the CloudWatch plugin.")
- aws_conn = aws_connection.setEnvironment()
+ aws_conn = self.aws_connection.setEnvironment()
if message.topic == "metric_request":
- cloudwatch_metrics.metric_calls(message, aws_conn)
+ response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
if message.topic == "alarm_request":
- cloudwatch_alarms.alarm_calls(message, aws_conn)
- if message.topic == "access_credentials":
- aws_access_credentials.access_credential_calls(message)
+ response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
elif vim_type == "vmware":
log.info("This metric_request message is for the vROPs plugin.")
- vrops_rcvr.consume(message)
+ if message.topic == "metric_request":
+ response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
+ if message.topic == "alarm_request":
+ response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
else:
log.debug("vim_type is misconfigured or unsupported; %s",
vim_type)
+ if response:
+ self._publish_response(message.topic, message.key, response)
except Exception:
log.exception("Exception processing message: ")
+ def _publish_response(self, topic: str, key: str, msg: dict):
+ topic = topic.replace('request', 'response')
+ key = key.replace('request', 'response')
+ producer = Producer()
+ producer.send(topic=topic, key=key, value=json.dumps(msg))
+ producer.flush(timeout=5)
+ producer.close()
+
if __name__ == '__main__':
- main()
+ CommonConsumer().run()