X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcollector%2Fcollector.py;h=60af579217d6bd9481e711423d7f55f4bcec636d;hb=27784a805d77f11d049e9a16704e6977e6967e85;hp=bf485fffe2d31c9c0057eb70ad40fa0d42bf78b9;hpb=cca77765cc8d43a0a5524e3754b7587b149300b4;p=osm%2FMON.git diff --git a/osm_mon/collector/collector.py b/osm_mon/collector/collector.py index bf485ff..60af579 100644 --- a/osm_mon/collector/collector.py +++ b/osm_mon/collector/collector.py @@ -27,11 +27,12 @@ import re import uuid from string import ascii_lowercase -from kafka import KafkaProducer, KafkaConsumer from n2vc.vnf import N2VC from prometheus_client.core import GaugeMetricFamily from osm_mon.common.common_db_client import CommonDbClient +from osm_mon.core.message_bus.consumer import Consumer +from osm_mon.core.message_bus.producer import Producer from osm_mon.core.settings import Config log = logging.getLogger(__name__) @@ -42,16 +43,7 @@ class MonCollector: cfg = Config.instance() self.kafka_server = cfg.BROKER_URI self.common_db_client = CommonDbClient() - self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, secret=cfg.OSMMON_VCA_SECRET) - self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - self.consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - consumer_timeout_ms=10000, - group_id='mon-collector-' + str(uuid.uuid4())) - self.consumer.subscribe(['metric_response']) + self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET) async def collect_metrics(self): """ @@ -63,65 +55,69 @@ class MonCollector: """ # TODO(diazb): Remove dependencies on prometheus_client log.debug("collect_metrics") + producer = Producer() + consumer = Consumer('mon-collector-' + str(uuid.uuid4()), + consumer_timeout_ms=10000, + enable_auto_commit=False) + consumer.subscribe(['metric_response']) metrics = {} - try: - vnfrs = self.common_db_client.get_vnfrs() - vca_model_name = 'default' - for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnfd = self.common_db_client.get_vnfd(vnfr['vnfd-id']) - for vdur in vnfr['vdur']: - # This avoids errors when vdur records have not been completely filled - if 'name' not in vdur: - continue - vdu = next( - filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) - ) - vnf_member_index = vnfr['member-vnf-index-ref'] - vdu_name = vdur['name'] - if 'monitoring-param' in vdu: - for param in vdu['monitoring-param']: - metric_name = param['nfvi-metric'] - payload = await self._generate_read_metric_payload(metric_name, nsr_id, vdu_name, - vnf_member_index) - self.producer.send(topic='metric_request', key='read_metric_data_request', - value=json.dumps(payload)) - self.producer.flush() - for message in self.consumer: - if message.key == 'read_metric_data_response': - content = json.loads(message.value) - if content['correlation_id'] == payload['correlation_id']: - if len(content['metrics_data']['metrics_series']): - metric_reading = content['metrics_data']['metrics_series'][-1] - if metric_name not in metrics.keys(): - metrics[metric_name] = GaugeMetricFamily( - metric_name, - 'OSM metric', - labels=['ns_id', 'vnf_member_index', 'vdu_name'] - ) - metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name], - metric_reading) - break - if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']: - vnf_name_vca = await self._generate_vca_vdu_name(vdu_name) - vnf_metrics = await self.n2vc.GetMetrics(vca_model_name, vnf_name_vca) - log.debug('VNF Metrics: %s', vnf_metrics) - for vnf_metric_list in vnf_metrics.values(): - for vnf_metric in vnf_metric_list: - log.debug("VNF Metric: %s", vnf_metric) - if vnf_metric['key'] not in metrics.keys(): - metrics[vnf_metric['key']] = GaugeMetricFamily( - vnf_metric['key'], - 'OSM metric', - labels=['ns_id', 'vnf_member_index', 'vdu_name'] - ) - metrics[vnf_metric['key']].add_metric([nsr_id, vnf_member_index, vdu_name], - float(vnf_metric['value'])) - log.debug("metric.values = %s", metrics.values()) - return metrics.values() - except Exception as e: - log.exception("Error collecting metrics") - raise e + vnfrs = self.common_db_client.get_vnfrs() + vca_model_name = 'default' + for vnfr in vnfrs: + nsr_id = vnfr['nsr-id-ref'] + vnfd = self.common_db_client.get_vnfd(vnfr['vnfd-id']) + for vdur in vnfr['vdur']: + # This avoids errors when vdur records have not been completely filled + if 'name' not in vdur: + continue + vdu = next( + filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) + ) + vnf_member_index = vnfr['member-vnf-index-ref'] + vdu_name = vdur['name'] + if 'monitoring-param' in vdu: + for param in vdu['monitoring-param']: + metric_name = param['nfvi-metric'] + payload = await self._generate_read_metric_payload(metric_name, nsr_id, vdu_name, + vnf_member_index) + producer.send(topic='metric_request', key='read_metric_data_request', + value=json.dumps(payload)) + producer.flush(5) + for message in consumer: + if message.key == 'read_metric_data_response': + content = json.loads(message.value) + if content['correlation_id'] == payload['correlation_id']: + log.debug("Found read_metric_data_response with same correlation_id") + if len(content['metrics_data']['metrics_series']): + metric_reading = content['metrics_data']['metrics_series'][-1] + if metric_name not in metrics.keys(): + metrics[metric_name] = GaugeMetricFamily( + metric_name, + 'OSM metric', + labels=['ns_id', 'vnf_member_index', 'vdu_name'] + ) + metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name], + metric_reading) + break + if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']: + vnf_name_vca = await self._generate_vca_vdu_name(vdu_name) + vnf_metrics = await self.n2vc.GetMetrics(vca_model_name, vnf_name_vca) + log.debug('VNF Metrics: %s', vnf_metrics) + for vnf_metric_list in vnf_metrics.values(): + for vnf_metric in vnf_metric_list: + log.debug("VNF Metric: %s", vnf_metric) + if vnf_metric['key'] not in metrics.keys(): + metrics[vnf_metric['key']] = GaugeMetricFamily( + vnf_metric['key'], + 'OSM metric', + labels=['ns_id', 'vnf_member_index', 'vdu_name'] + ) + metrics[vnf_metric['key']].add_metric([nsr_id, vnf_member_index, vdu_name], + float(vnf_metric['value'])) + consumer.close() + producer.close(5) + log.debug("metric.values = %s", metrics.values()) + return metrics.values() @staticmethod async def _generate_vca_vdu_name(vdu_name) -> str: