From 27784a805d77f11d049e9a16704e6977e6967e85 Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Thu, 25 Oct 2018 14:54:35 -0300 Subject: [PATCH] Adds OSMMON_VCA_USER and adds timeout and max.poll.interval to collector consumer Signed-off-by: Benjamin Diaz Change-Id: Ib4db9874d69bd72d7267542d7a1f149cb44faf41 --- docker/Dockerfile | 1 + osm_mon/cmd/mon_prometheus_exporter.py | 3 - osm_mon/collector/collector.py | 134 ++++++++++---------- osm_mon/collector/prometheus_exporter.py | 12 +- osm_mon/core/message_bus/common_consumer.py | 6 +- osm_mon/core/message_bus/consumer.py | 5 +- osm_mon/core/settings.py | 1 + 7 files changed, 79 insertions(+), 83 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index be93541..2cf98cb 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -46,6 +46,7 @@ ENV OSMMON_LOG_LEVEL INFO ENV OSMMON_KAFKA_LOG_LEVEL INFO ENV OSMMON_VCA_HOST localhost ENV OSMMON_VCA_SECRET secret +ENV OSMMON_VCA_USER admin EXPOSE 8662 8000 diff --git a/osm_mon/cmd/mon_prometheus_exporter.py b/osm_mon/cmd/mon_prometheus_exporter.py index f89a28d..522bd2f 100644 --- a/osm_mon/cmd/mon_prometheus_exporter.py +++ b/osm_mon/cmd/mon_prometheus_exporter.py @@ -41,9 +41,6 @@ def main(): kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) - kafka_handler = logging.StreamHandler(sys.stdout) - kafka_handler.setFormatter(formatter) - kafka_logger.addHandler(kafka_handler) log = logging.getLogger(__name__) log.info("Starting MON Prometheus Exporter...") diff --git a/osm_mon/collector/collector.py b/osm_mon/collector/collector.py index bf485ff..60af579 100644 --- a/osm_mon/collector/collector.py +++ b/osm_mon/collector/collector.py @@ -27,11 +27,12 @@ import re import uuid from string import ascii_lowercase -from kafka import KafkaProducer, KafkaConsumer from n2vc.vnf import N2VC from prometheus_client.core import GaugeMetricFamily from osm_mon.common.common_db_client import CommonDbClient +from osm_mon.core.message_bus.consumer import Consumer +from osm_mon.core.message_bus.producer import Producer from osm_mon.core.settings import Config log = logging.getLogger(__name__) @@ -42,16 +43,7 @@ class MonCollector: cfg = Config.instance() self.kafka_server = cfg.BROKER_URI self.common_db_client = CommonDbClient() - self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, secret=cfg.OSMMON_VCA_SECRET) - self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - self.consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - consumer_timeout_ms=10000, - group_id='mon-collector-' + str(uuid.uuid4())) - self.consumer.subscribe(['metric_response']) + self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET) async def collect_metrics(self): """ @@ -63,65 +55,69 @@ class MonCollector: """ # TODO(diazb): Remove dependencies on prometheus_client log.debug("collect_metrics") + producer = Producer() + consumer = Consumer('mon-collector-' + str(uuid.uuid4()), + consumer_timeout_ms=10000, + enable_auto_commit=False) + consumer.subscribe(['metric_response']) metrics = {} - try: - vnfrs = self.common_db_client.get_vnfrs() - vca_model_name = 'default' - for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnfd = self.common_db_client.get_vnfd(vnfr['vnfd-id']) - for vdur in vnfr['vdur']: - # This avoids errors when vdur records have not been completely filled - if 'name' not in vdur: - continue - vdu = next( - filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) - ) - vnf_member_index = vnfr['member-vnf-index-ref'] - vdu_name = vdur['name'] - if 'monitoring-param' in vdu: - for param in vdu['monitoring-param']: - metric_name = param['nfvi-metric'] - payload = await self._generate_read_metric_payload(metric_name, nsr_id, vdu_name, - vnf_member_index) - self.producer.send(topic='metric_request', key='read_metric_data_request', - value=json.dumps(payload)) - self.producer.flush() - for message in self.consumer: - if message.key == 'read_metric_data_response': - content = json.loads(message.value) - if content['correlation_id'] == payload['correlation_id']: - if len(content['metrics_data']['metrics_series']): - metric_reading = content['metrics_data']['metrics_series'][-1] - if metric_name not in metrics.keys(): - metrics[metric_name] = GaugeMetricFamily( - metric_name, - 'OSM metric', - labels=['ns_id', 'vnf_member_index', 'vdu_name'] - ) - metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name], - metric_reading) - break - if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']: - vnf_name_vca = await self._generate_vca_vdu_name(vdu_name) - vnf_metrics = await self.n2vc.GetMetrics(vca_model_name, vnf_name_vca) - log.debug('VNF Metrics: %s', vnf_metrics) - for vnf_metric_list in vnf_metrics.values(): - for vnf_metric in vnf_metric_list: - log.debug("VNF Metric: %s", vnf_metric) - if vnf_metric['key'] not in metrics.keys(): - metrics[vnf_metric['key']] = GaugeMetricFamily( - vnf_metric['key'], - 'OSM metric', - labels=['ns_id', 'vnf_member_index', 'vdu_name'] - ) - metrics[vnf_metric['key']].add_metric([nsr_id, vnf_member_index, vdu_name], - float(vnf_metric['value'])) - log.debug("metric.values = %s", metrics.values()) - return metrics.values() - except Exception as e: - log.exception("Error collecting metrics") - raise e + vnfrs = self.common_db_client.get_vnfrs() + vca_model_name = 'default' + for vnfr in vnfrs: + nsr_id = vnfr['nsr-id-ref'] + vnfd = self.common_db_client.get_vnfd(vnfr['vnfd-id']) + for vdur in vnfr['vdur']: + # This avoids errors when vdur records have not been completely filled + if 'name' not in vdur: + continue + vdu = next( + filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) + ) + vnf_member_index = vnfr['member-vnf-index-ref'] + vdu_name = vdur['name'] + if 'monitoring-param' in vdu: + for param in vdu['monitoring-param']: + metric_name = param['nfvi-metric'] + payload = await self._generate_read_metric_payload(metric_name, nsr_id, vdu_name, + vnf_member_index) + producer.send(topic='metric_request', key='read_metric_data_request', + value=json.dumps(payload)) + producer.flush(5) + for message in consumer: + if message.key == 'read_metric_data_response': + content = json.loads(message.value) + if content['correlation_id'] == payload['correlation_id']: + log.debug("Found read_metric_data_response with same correlation_id") + if len(content['metrics_data']['metrics_series']): + metric_reading = content['metrics_data']['metrics_series'][-1] + if metric_name not in metrics.keys(): + metrics[metric_name] = GaugeMetricFamily( + metric_name, + 'OSM metric', + labels=['ns_id', 'vnf_member_index', 'vdu_name'] + ) + metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name], + metric_reading) + break + if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']: + vnf_name_vca = await self._generate_vca_vdu_name(vdu_name) + vnf_metrics = await self.n2vc.GetMetrics(vca_model_name, vnf_name_vca) + log.debug('VNF Metrics: %s', vnf_metrics) + for vnf_metric_list in vnf_metrics.values(): + for vnf_metric in vnf_metric_list: + log.debug("VNF Metric: %s", vnf_metric) + if vnf_metric['key'] not in metrics.keys(): + metrics[vnf_metric['key']] = GaugeMetricFamily( + vnf_metric['key'], + 'OSM metric', + labels=['ns_id', 'vnf_member_index', 'vdu_name'] + ) + metrics[vnf_metric['key']].add_metric([nsr_id, vnf_member_index, vdu_name], + float(vnf_metric['value'])) + consumer.close() + producer.close(5) + log.debug("metric.values = %s", metrics.values()) + return metrics.values() @staticmethod async def _generate_vca_vdu_name(vdu_name) -> str: diff --git a/osm_mon/collector/prometheus_exporter.py b/osm_mon/collector/prometheus_exporter.py index d890337..58aee05 100644 --- a/osm_mon/collector/prometheus_exporter.py +++ b/osm_mon/collector/prometheus_exporter.py @@ -62,10 +62,14 @@ class MonPrometheusExporter: mon_collector = MonCollector() cfg = Config.instance() while True: - log.debug('_run_collector_loop') - metrics = asyncio.get_event_loop().run_until_complete(mon_collector.collect_metrics()) - self.custom_collector.metrics = metrics - time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL) + try: + log.debug('_run_collector_loop') + metrics = asyncio.get_event_loop().run_until_complete(mon_collector.collect_metrics()) + self.custom_collector.metrics = metrics + time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL) + except Exception: + log.exception("Error collecting metrics") + class CustomCollector(object): diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index 3e1f745..e32fa2b 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -52,10 +52,6 @@ log = logging.getLogger(__name__) kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) -kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -kafka_handler = logging.StreamHandler(sys.stdout) -kafka_handler.setFormatter(kafka_formatter) -kafka_logger.addHandler(kafka_handler) class CommonConsumer: @@ -198,7 +194,7 @@ class CommonConsumer: key = key.replace('request', 'response') producer = Producer() producer.send(topic=topic, key=key, value=json.dumps(msg)) - producer.flush() + producer.flush(timeout=5) producer.close() diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py index 7936513..1ccf936 100644 --- a/osm_mon/core/message_bus/consumer.py +++ b/osm_mon/core/message_bus/consumer.py @@ -5,10 +5,11 @@ from osm_mon.core.settings import Config # noinspection PyAbstractClass class Consumer(KafkaConsumer): - def __init__(self, group_id): + def __init__(self, group_id, **kwargs): cfg = Config.instance() super().__init__(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, max_poll_interval_ms=900000, - group_id=group_id) + group_id=group_id, + **kwargs) diff --git a/osm_mon/core/settings.py b/osm_mon/core/settings.py index 8f5e8f5..978c957 100644 --- a/osm_mon/core/settings.py +++ b/osm_mon/core/settings.py @@ -70,6 +70,7 @@ class Config(object): CfgParam('OSMMON_COLLECTOR_INTERVAL', 10, int), CfgParam('OSMMON_VCA_HOST', "localhost", six.text_type), CfgParam('OSMMON_VCA_SECRET', "secret", six.text_type), + CfgParam('OSMMON_VCA_USER', "admin", six.text_type), ] _config_dict = {cfg.key: cfg for cfg in _configuration} -- 2.17.1