ENV OS_DEFAULT_GRANULARITY 300
ENV REQUEST_TIMEOUT 10
ENV OSMMON_LOG_LEVEL INFO
+ENV OSMMON_KAFKA_LOG_LEVEL INFO
EXPOSE 8662
level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
log = logging.getLogger(__name__)
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
class CommonConsumer:
self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
log.info("Connection successful.")
- # Initialize consumers for alarms and metrics
- self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer")
-
- # Define subscribe the consumer for the plugins
- topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
- # TODO: Remove access_credentials
- self.common_consumer.subscribe(topics)
-
def get_vim_type(self, vim_uuid):
"""Get the vim type that is required by the message."""
credentials = self.database_manager.get_credentials(vim_uuid)
return vnfr
def run(self):
+ common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ group_id="mon-consumer")
+
+ topics = ['metric_request', 'alarm_request', 'vim_account']
+ common_consumer.subscribe(topics)
+
log.info("Listening for messages...")
- for message in self.common_consumer:
+ for message in common_consumer:
t = threading.Thread(target=self.consume_message, args=(message,))
t.start()
self.producer = kaf(
key_serializer=str.encode,
value_serializer=str.encode,
- bootstrap_servers=broker, api_version=(0, 10))
+ bootstrap_servers=broker, api_version=(0, 10, 1))
def publish(self, key, value, topic=None):
"""Send the required message on the Kafka message bus."""
try:
future = self.producer.send(topic=topic, key=key, value=value)
- record_metadata = future.get(timeout=10)
+ future.get(timeout=10)
except Exception:
logging.exception("Error publishing to {} topic." .format(topic))
raise
- try:
- logging.debug("TOPIC:", record_metadata.topic)
- logging.debug("PARTITION:", record_metadata.partition)
- logging.debug("OFFSET:", record_metadata.offset)
- except KafkaError:
- pass
def publish_alarm_request(self, key, message):
"""Publish an alarm request."""
CfgParam('OS_DEFAULT_GRANULARITY', "300", six.text_type),
CfgParam('REQUEST_TIMEOUT', 10, int),
CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type),
+ CfgParam('OSMMON_KAFKA_LOG_LEVEL', "INFO", six.text_type),
]
_config_dict = {cfg.key: cfg for cfg in _configuration}
level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
log = logging.getLogger(__name__)
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..')))
from osm_mon.core.database import DatabaseManager