From: Benjamin Diaz Date: Tue, 23 Oct 2018 22:44:26 +0000 (-0300) Subject: Adds support for Juju metrics in collector X-Git-Tag: v5.0.0~19 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FMON.git;a=commitdiff_plain;h=91b1018e1c84758bbc47394f50d04fe3ee81d812 Adds support for Juju metrics in collector MON now collects VNF metrics defined in the VNFD in the metrics section. This are obtained through N2VC. Prometheus exporter now exposes this metrics. Signed-off-by: Benjamin Diaz --- diff --git a/debian/python3-osm-mon.postinst b/debian/python3-osm-mon.postinst index 89cd2b3..6f9b9c1 100644 --- a/debian/python3-osm-mon.postinst +++ b/debian/python3-osm-mon.postinst @@ -1,7 +1,7 @@ #!/bin/bash echo "Installing python dependencies via pip..." -pip3 install kafka==1.3.* +pip3 install kafka-python==1.4.* pip3 install requests==2.18.* pip3 install cherrypy==14.0.* pip3 install jsmin==2.2.* diff --git a/docker/Dockerfile b/docker/Dockerfile index 5e32f9e..be93541 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -44,6 +44,8 @@ ENV OS_DEFAULT_GRANULARITY 300 ENV REQUEST_TIMEOUT 10 ENV OSMMON_LOG_LEVEL INFO ENV OSMMON_KAFKA_LOG_LEVEL INFO +ENV OSMMON_VCA_HOST localhost +ENV OSMMON_VCA_SECRET secret EXPOSE 8662 8000 diff --git a/osm_mon/cmd/mon_prometheus_exporter.py b/osm_mon/cmd/mon_prometheus_exporter.py index f018b5d..f89a28d 100644 --- a/osm_mon/cmd/mon_prometheus_exporter.py +++ b/osm_mon/cmd/mon_prometheus_exporter.py @@ -30,17 +30,21 @@ from osm_mon.collector.prometheus_exporter import MonPrometheusExporter def main(): cfg = Config.instance() - log_formatter_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - logging.basicConfig(stream=sys.stdout, - format=log_formatter_str, - datefmt='%m/%d/%Y %I:%M:%S %p', - level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + + root = logging.getLogger() + root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p') + ch.setFormatter(formatter) + root.addHandler(ch) + kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) - kafka_formatter = logging.Formatter(log_formatter_str) kafka_handler = logging.StreamHandler(sys.stdout) - kafka_handler.setFormatter(kafka_formatter) + kafka_handler.setFormatter(formatter) kafka_logger.addHandler(kafka_handler) + log = logging.getLogger(__name__) log.info("Starting MON Prometheus Exporter...") log.info("Config: %s", vars(cfg)) diff --git a/osm_mon/collector/collector.py b/osm_mon/collector/collector.py index 9bf3953..bf485ff 100644 --- a/osm_mon/collector/collector.py +++ b/osm_mon/collector/collector.py @@ -23,13 +23,15 @@ import json import logging import random +import re import uuid -from collections import Iterable +from string import ascii_lowercase from kafka import KafkaProducer, KafkaConsumer -from osm_common import dbmongo +from n2vc.vnf import N2VC from prometheus_client.core import GaugeMetricFamily +from osm_mon.common.common_db_client import CommonDbClient from osm_mon.core.settings import Config log = logging.getLogger(__name__) @@ -39,10 +41,8 @@ class MonCollector: def __init__(self): cfg = Config.instance() self.kafka_server = cfg.BROKER_URI - self.common_db_host = cfg.MONGO_URI.split(':')[0] - self.common_db_port = cfg.MONGO_URI.split(':')[1] - self.common_db = dbmongo.DbMongo() - self.common_db.db_connect({'host': self.common_db_host, 'port': int(self.common_db_port), 'name': 'osm'}) + self.common_db_client = CommonDbClient() + self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, secret=cfg.OSMMON_VCA_SECRET) self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, key_serializer=str.encode, value_serializer=str.encode) @@ -53,66 +53,106 @@ class MonCollector: group_id='mon-collector-' + str(uuid.uuid4())) self.consumer.subscribe(['metric_response']) - def collect_metrics(self) -> Iterable: + async def collect_metrics(self): + """ + Collects vdu metrics. These can be vim and/or n2vc metrics. + It checks for monitoring-params or metrics inside vdu section of vnfd, then collects the metric accordingly. + If vim related, it sends a metric read request through Kafka, to be handled by mon-proxy. + If n2vc related, it uses the n2vc client to obtain the readings. + :return: lists of metrics + """ # TODO(diazb): Remove dependencies on prometheus_client log.debug("collect_metrics") metrics = {} - vnfrs = self.common_db.get_list('vnfrs') - for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnfd = self.common_db.get_one('vnfds', {"_id": vnfr['vnfd-id']}) - payloads = self._generate_metric_data_payloads(vnfr, vnfd) - for payload in payloads: - cor_id = payload['correlation_id'] - metric_name = payload['metric_name'] - vnf_member_index = payload['vnf_member_index'] - vdu_name = payload['vdu_name'] - self.producer.send(topic='metric_request', key='read_metric_data_request', - value=json.dumps(payload)) - self.producer.flush() - for message in self.consumer: - if message.key == 'read_metric_data_response': - content = json.loads(message.value) - if content['correlation_id'] == cor_id: - if len(content['metrics_data']['metrics_series']): - metric_reading = content['metrics_data']['metrics_series'][-1] - if metric_name not in metrics.keys(): - metrics[metric_name] = GaugeMetricFamily( - metric_name, + try: + vnfrs = self.common_db_client.get_vnfrs() + vca_model_name = 'default' + for vnfr in vnfrs: + nsr_id = vnfr['nsr-id-ref'] + vnfd = self.common_db_client.get_vnfd(vnfr['vnfd-id']) + for vdur in vnfr['vdur']: + # This avoids errors when vdur records have not been completely filled + if 'name' not in vdur: + continue + vdu = next( + filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) + ) + vnf_member_index = vnfr['member-vnf-index-ref'] + vdu_name = vdur['name'] + if 'monitoring-param' in vdu: + for param in vdu['monitoring-param']: + metric_name = param['nfvi-metric'] + payload = await self._generate_read_metric_payload(metric_name, nsr_id, vdu_name, + vnf_member_index) + self.producer.send(topic='metric_request', key='read_metric_data_request', + value=json.dumps(payload)) + self.producer.flush() + for message in self.consumer: + if message.key == 'read_metric_data_response': + content = json.loads(message.value) + if content['correlation_id'] == payload['correlation_id']: + if len(content['metrics_data']['metrics_series']): + metric_reading = content['metrics_data']['metrics_series'][-1] + if metric_name not in metrics.keys(): + metrics[metric_name] = GaugeMetricFamily( + metric_name, + 'OSM metric', + labels=['ns_id', 'vnf_member_index', 'vdu_name'] + ) + metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name], + metric_reading) + break + if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']: + vnf_name_vca = await self._generate_vca_vdu_name(vdu_name) + vnf_metrics = await self.n2vc.GetMetrics(vca_model_name, vnf_name_vca) + log.debug('VNF Metrics: %s', vnf_metrics) + for vnf_metric_list in vnf_metrics.values(): + for vnf_metric in vnf_metric_list: + log.debug("VNF Metric: %s", vnf_metric) + if vnf_metric['key'] not in metrics.keys(): + metrics[vnf_metric['key']] = GaugeMetricFamily( + vnf_metric['key'], 'OSM metric', labels=['ns_id', 'vnf_member_index', 'vdu_name'] ) - metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name], - metric_reading) - break - return metrics.values() + metrics[vnf_metric['key']].add_metric([nsr_id, vnf_member_index, vdu_name], + float(vnf_metric['value'])) + log.debug("metric.values = %s", metrics.values()) + return metrics.values() + except Exception as e: + log.exception("Error collecting metrics") + raise e @staticmethod - def _generate_metric_data_payloads(vnfr: dict, vnfd: dict) -> list: - log.debug('_generate_metric_data_payloads') - payloads = [] - nsr_id = vnfr['nsr-id-ref'] - for vdur in vnfr['vdur']: - # This avoids errors when vdur records have not been completely filled - if 'name' not in vdur: - continue - vdu = next( - filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) - ) - if 'monitoring-param' in vdu: - for param in vdu['monitoring-param']: - metric_name = param['nfvi-metric'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vdu_name = vdur['name'] - cor_id = random.randint(1, 10e7) - payload = { - 'correlation_id': cor_id, - 'metric_name': metric_name, - 'ns_id': nsr_id, - 'vnf_member_index': vnf_member_index, - 'vdu_name': vdu_name, - 'collection_period': 1, - 'collection_unit': 'DAY', - } - payloads.append(payload) - return payloads + async def _generate_vca_vdu_name(vdu_name) -> str: + """ + Replaces all digits in vdu name for corresponding ascii characters. This is the format required by N2VC. + :param vdu_name: Vdu name according to the vdur + :return: Name with digits replaced with characters + """ + vnf_name_vca = ''.join( + ascii_lowercase[int(char)] if char.isdigit() else char for char in vdu_name) + vnf_name_vca = re.sub(r'-[a-z]+$', '', vnf_name_vca) + return vnf_name_vca + + @staticmethod + async def _generate_read_metric_payload(metric_name, nsr_id, vdu_name, vnf_member_index) -> dict: + """ + Builds JSON payload for asking for a metric measurement in MON. It follows the model defined in core.models. + :param metric_name: OSM metric name (e.g.: cpu_utilization) + :param nsr_id: NSR ID + :param vdu_name: Vdu name according to the vdur + :param vnf_member_index: Index of the VNF in the NS according to the vnfr + :return: JSON payload as dict + """ + cor_id = random.randint(1, 10e7) + payload = { + 'correlation_id': cor_id, + 'metric_name': metric_name, + 'ns_id': nsr_id, + 'vnf_member_index': vnf_member_index, + 'vdu_name': vdu_name, + 'collection_period': 1, + 'collection_unit': 'DAY', + } + return payload diff --git a/osm_mon/collector/prometheus_exporter.py b/osm_mon/collector/prometheus_exporter.py index d61a286..d890337 100644 --- a/osm_mon/collector/prometheus_exporter.py +++ b/osm_mon/collector/prometheus_exporter.py @@ -20,12 +20,12 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import asyncio import logging import threading import time -from http.server import HTTPServer -from prometheus_client import MetricsHandler +from prometheus_client import start_http_server from prometheus_client.core import REGISTRY from osm_mon.collector.collector import MonCollector @@ -37,35 +37,35 @@ log = logging.getLogger(__name__) class MonPrometheusExporter: def __init__(self): - self.mon_collector = MonCollector() self.custom_collector = CustomCollector() def _run_exporter(self): log.debug('_run_exporter') REGISTRY.register(self.custom_collector) - server_address = ('', 8000) - httpd = HTTPServer(server_address, MetricsHandler) log.info("Starting MON Prometheus exporter at port %s", 8000) - httpd.serve_forever() + start_http_server(8000) def run(self): log.debug('_run') - self._run_exporter() - self._run_collector() + collector_thread = threading.Thread(target=self._run_collector) + collector_thread.setDaemon(True) + collector_thread.start() + exporter_thread = threading.Thread(target=self._run_exporter) + exporter_thread.setDaemon(True) + exporter_thread.start() + collector_thread.join() + exporter_thread.join() def _run_collector(self): log.debug('_run_collector') - t = threading.Thread(target=self._collect_metrics_forever) - t.setDaemon(True) - t.start() - - def _collect_metrics_forever(self): - log.debug('_collect_metrics_forever') + asyncio.set_event_loop(asyncio.new_event_loop()) + mon_collector = MonCollector() cfg = Config.instance() while True: - time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL) - metrics = self.mon_collector.collect_metrics() + log.debug('_run_collector_loop') + metrics = asyncio.get_event_loop().run_until_complete(mon_collector.collect_metrics()) self.custom_collector.metrics = metrics + time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL) class CustomCollector(object): @@ -80,8 +80,7 @@ class CustomCollector(object): def collect(self): log.debug("collect") - metrics = self.mon_collector.collect_metrics() - return metrics + return self.metrics if __name__ == '__main__': diff --git a/osm_mon/common/common_db_client.py b/osm_mon/common/common_db_client.py index 920fe36..c6237ee 100644 --- a/osm_mon/common/common_db_client.py +++ b/osm_mon/common/common_db_client.py @@ -43,6 +43,9 @@ class CommonDbClient: return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in self.get_nsr(nsr_id)['nsd']['constituent-vnfd']] + def get_vnfrs(self): + return self.common_db.get_list('vnfrs') + def get_vnfd(self, vnfd_id: str): vnfr = self.common_db.get_one("vnfds", {"_id": vnfd_id}) diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index b2677d8..85e679f 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -91,8 +91,21 @@ class CommonConsumer: topics = ['metric_request', 'alarm_request', 'vim_account'] common_consumer.subscribe(topics) - common_consumer.poll() - common_consumer.seek_to_end() + retries = 1 + max_retries = 5 + while True: + try: + common_consumer.poll() + common_consumer.seek_to_end() + break + except Exception: + log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.") + log.error("Retry number %d of %d", retries, max_retries) + if retries >= max_retries: + log.error("Achieved max number of retries. Logging exception and exiting...") + log.exception("Exception: ") + return + retries = retries + 1 log.info("Listening for messages...") for message in common_consumer: diff --git a/osm_mon/core/settings.py b/osm_mon/core/settings.py index 9700af5..8f5e8f5 100644 --- a/osm_mon/core/settings.py +++ b/osm_mon/core/settings.py @@ -68,6 +68,8 @@ class Config(object): CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type), CfgParam('OSMMON_KAFKA_LOG_LEVEL', "WARN", six.text_type), CfgParam('OSMMON_COLLECTOR_INTERVAL', 10, int), + CfgParam('OSMMON_VCA_HOST', "localhost", six.text_type), + CfgParam('OSMMON_VCA_SECRET', "secret", six.text_type), ] _config_dict = {cfg.key: cfg for cfg in _configuration} diff --git a/osm_mon/test/collector/test_collector.py b/osm_mon/test/collector/test_collector.py index b4ec741..84905b7 100644 --- a/osm_mon/test/collector/test_collector.py +++ b/osm_mon/test/collector/test_collector.py @@ -20,187 +20,43 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import asyncio +import random import unittest -import mock -from kafka import KafkaProducer -from kafka.errors import KafkaError -from osm_common import dbmongo +from mock import mock from osm_mon.collector.collector import MonCollector -from osm_mon.core.database import VimCredentials, DatabaseManager -from osm_mon.core.message_bus.common_consumer import CommonConsumer -@mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock()) class MonCollectorTest(unittest.TestCase): - def test_generate_metric_data_payloads(self): - vnfr = { - "_id": "0d9d06ad-3fc2-418c-9934-465e815fafe2", - "ip-address": "192.168.160.2", - "created-time": 1535392482.0044956, - "vim-account-id": "be48ae31-1d46-4892-a4b4-d69abd55714b", - "vdur": [ - { - "interfaces": [ - { - "mac-address": "fa:16:3e:71:fd:b8", - "name": "eth0", - "ip-address": "192.168.160.2" - } - ], - "status": "ACTIVE", - "vim-id": "63a65636-9fc8-4022-b070-980823e6266a", - "name": "cirros_ns-1-cirros_vnfd-VM-1", - "status-detailed": None, - "ip-address": "192.168.160.2", - "vdu-id-ref": "cirros_vnfd-VM" - } - ], - "id": "0d9d06ad-3fc2-418c-9934-465e815fafe2", - "vnfd-ref": "cirros_vdu_scaling_vnf", - "vnfd-id": "63f44c41-45ee-456b-b10d-5f08fb1796e0", - "_admin": { - "created": 1535392482.0067868, - "projects_read": [ - "admin" - ], - "modified": 1535392482.0067868, - "projects_write": [ - "admin" - ] - }, - "nsr-id-ref": "87776f33-b67c-417a-8119-cb08e4098951", - "member-vnf-index-ref": "1", - "connection-point": [ - { - "name": "eth0", - "id": None, - "connection-point-id": None - } - ] + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + + def test_generate_vca_vdu_name(self): + vdur_name = 'test-juju-metrics01-1-ubuntuvdu1-1' + expected = 'test-juju-metricsab-b-ubuntuvdub' + result = self.loop.run_until_complete(MonCollector._generate_vca_vdu_name(vdur_name)) + self.assertEqual(result, expected) + + @mock.patch.object(random, 'randint') + def test_generate_read_metric_payload(self, randint): + randint.return_value = 1 + metric_name = 'cpu_utilization' + nsr_id = 'test_id' + vdu_name = 'test_vdu' + vnf_member_index = 1 + expected_payload = { + 'correlation_id': 1, + 'metric_name': metric_name, + 'ns_id': nsr_id, + 'vnf_member_index': vnf_member_index, + 'vdu_name': vdu_name, + 'collection_period': 1, + 'collection_unit': 'DAY', } - vnfd = { - "_id": "63f44c41-45ee-456b-b10d-5f08fb1796e0", - "name": "cirros_vdu_scaling_vnf", - "vendor": "OSM", - "vdu": [ - { - "name": "cirros_vnfd-VM", - "monitoring-param": [ - { - "id": "cirros_vnfd-VM_memory_util", - "nfvi-metric": "average_memory_utilization" - } - ], - "vm-flavor": { - "vcpu-count": 1, - "memory-mb": 256, - "storage-gb": 2 - }, - "description": "cirros_vnfd-VM", - "count": 1, - "id": "cirros_vnfd-VM", - "interface": [ - { - "name": "eth0", - "external-connection-point-ref": "eth0", - "type": "EXTERNAL", - "virtual-interface": { - "bandwidth": "0", - "type": "VIRTIO", - "vpci": "0000:00:0a.0" - } - } - ], - "image": "cirros034" - } - ], - "monitoring-param": [ - { - "id": "cirros_vnf_memory_util", - "name": "cirros_vnf_memory_util", - "aggregation-type": "AVERAGE", - "vdu-monitoring-param-ref": "cirros_vnfd-VM_memory_util", - "vdu-ref": "cirros_vnfd-VM" - } - ], - "description": "Simple VNF example with a cirros and a scaling group descriptor", - "id": "cirros_vdu_scaling_vnf", - "logo": "cirros-64.png", - "version": "1.0", - "connection-point": [ - { - "name": "eth0", - "type": "VPORT" - } - ], - "mgmt-interface": { - "cp": "eth0" - }, - "scaling-group-descriptor": [ - { - "name": "scale_cirros_vnfd-VM", - "min-instance-count": 1, - "vdu": [ - { - "count": 1, - "vdu-id-ref": "cirros_vnfd-VM" - } - ], - "max-instance-count": 10, - "scaling-policy": [ - { - "name": "auto_memory_util_above_threshold", - "scaling-type": "automatic", - "cooldown-time": 60, - "threshold-time": 10, - "scaling-criteria": [ - { - "name": "group1_memory_util_above_threshold", - "vnf-monitoring-param-ref": "cirros_vnf_memory_util", - "scale-out-threshold": 80, - "scale-out-relational-operation": "GT", - "scale-in-relational-operation": "LT", - "scale-in-threshold": 20 - } - ] - } - ] - } - ], - "short-name": "cirros_vdu_scaling_vnf", - "_admin": { - "created": 1535392242.6281035, - "modified": 1535392242.6281035, - "storage": { - "zipfile": "package.tar.gz", - "pkg-dir": "cirros_vnf", - "path": "/app/storage/", - "folder": "63f44c41-45ee-456b-b10d-5f08fb1796e0", - "fs": "local", - "descriptor": "cirros_vnf/cirros_vdu_scaling_vnfd.yaml" - }, - "usageSate": "NOT_IN_USE", - "onboardingState": "ONBOARDED", - "userDefinedData": { - - }, - "projects_read": [ - "admin" - ], - "operationalState": "ENABLED", - "projects_write": [ - "admin" - ] - } - } - payloads = MonCollector._generate_metric_data_payloads(vnfr, vnfd) - expected_payload = {'ns_id': '87776f33-b67c-417a-8119-cb08e4098951', - 'vnf_member_index': '1', - 'metric_name': 'average_memory_utilization', - 'collection_period': 1, - 'collection_unit': 'DAY', - 'vdu_name': 'cirros_ns-1-cirros_vnfd-VM-1'} - self.assertEqual(len(payloads), 1) - self.assertEqual(set(expected_payload.items()).issubset(set(payloads[0].items())), True) + result = self.loop.run_until_complete( + MonCollector._generate_read_metric_payload(metric_name, nsr_id, vdu_name, vnf_member_index)) + self.assertEqual(result, expected_payload) diff --git a/requirements.txt b/requirements.txt index a1582fa..f9b4407 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,7 +18,7 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com -kafka==1.3.* +kafka-python==1.4.* requests==2.18.* cherrypy==14.0.* jsmin==2.2.* @@ -34,4 +34,5 @@ bottle==0.12.* peewee==3.1.* pyyaml==3.* prometheus_client==0.4.* -git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common \ No newline at end of file +git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common +git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc \ No newline at end of file diff --git a/setup.py b/setup.py index db7a715..1015de4 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,7 @@ setup( scripts=['osm_mon/plugins/vRealiseOps/vROPs_Webservice/vrops_webservice', 'osm_mon/core/message_bus/common_consumer.py'], install_requires=[ - "kafka==1.3.*", + "kafka-python==1.4.*", "requests==2.18.*", "cherrypy==14.0.*", "jsmin==2.2.*", @@ -68,7 +68,8 @@ setup( "bottle==0.12.*", "peewee==3.1.*", "pyyaml==3.*", - "osm-common" + "osm-common", + "n2vc" ], include_package_data=True, entry_points={