import six
import yaml
from kafka import KafkaConsumer
-from osm_common import dbmongo
+from osm_mon.common.common_db_client import CommonDbClient
from osm_mon.core.auth import AuthManager
from osm_mon.core.database import DatabaseManager
from osm_mon.core.settings import Config
from osm_mon.plugins.OpenStack.Gnocchi import metrics
from osm_mon.plugins.vRealiseOps import plugin_receiver
+cfg = Config.instance()
+
logging.basicConfig(stream=sys.stdout,
- format='%(asctime)s %(message)s',
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
- level=logging.INFO)
+ level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
log = logging.getLogger(__name__)
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
class CommonConsumer:
self.vrops_rcvr = plugin_receiver.PluginReceiver()
log.info("Connecting to MongoDB...")
- self.common_db = dbmongo.DbMongo()
- common_db_uri = cfg.MONGO_URI.split(':')
- self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
+ self.common_db = CommonDbClient()
log.info("Connection successful.")
- # Initialize consumers for alarms and metrics
- self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer")
-
- # Define subscribe the consumer for the plugins
- topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
- # TODO: Remove access_credentials
- self.common_consumer.subscribe(topics)
-
def get_vim_type(self, vim_uuid):
"""Get the vim type that is required by the message."""
credentials = self.database_manager.get_credentials(vim_uuid)
return credentials.type
- def get_vdur(self, nsr_id, member_index, vdu_name):
- vnfr = self.get_vnfr(nsr_id, member_index)
- for vdur in vnfr['vdur']:
- if vdur['vdu-id-ref'] == vdu_name:
- return vdur
- raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
- vdu_name)
+ def run(self):
+ common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ group_id="mon-consumer",
+ session_timeout_ms=60000,
+ heartbeat_interval_ms=20000)
- def get_vnfr(self, nsr_id, member_index):
- vnfr = self.common_db.get_one(table="vnfrs",
- filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
- return vnfr
+ topics = ['metric_request', 'alarm_request', 'vim_account']
+ common_consumer.subscribe(topics)
- def run(self):
log.info("Listening for messages...")
- for message in self.common_consumer:
+ for message in common_consumer:
t = threading.Thread(target=self.consume_message, args=(message,))
t.start()
vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
# Check the vim desired by the message
- vnfr = self.get_vnfr(ns_id, vnf_index)
+ vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
vim_uuid = vnfr['vim-account-id']
if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
- vdur = self.get_vdur(ns_id, vnf_index, vdu_name)
+ vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
if contains_list:
values[list_index]['resource_uuid'] = vdur['vim-id']
else:
elif vim_type == "vmware":
log.info("This metric_request message is for the vROPs plugin.")
- self.vrops_rcvr.consume(message)
+ self.vrops_rcvr.consume(message, vim_uuid)
else:
log.debug("vim_type is misconfigured or unsupported; %s",