From 0e34244e420bd68e6acb0cf6bb4383fedb662070 Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Fri, 9 Nov 2018 17:52:08 -0300 Subject: [PATCH] Changes way metrics are collected, removing the use of mon-proxy Collector now collects vim metrics directly, without to send kafka msgs to mon-proxy. Also a plugin/backend model has been implemented to ease possible inclusions of other vims and tsbds in the future. Signed-off-by: Benjamin Diaz Change-Id: I554a4f5e410a31ec70aa301c8aa819b1f03a3857 --- debian/python3-osm-mon.postinst | 1 + docker/scripts/runInstall.sh | 2 +- ...rometheus_exporter.py => mon_collector.py} | 9 +- osm_mon/collector/__init__.py | 2 +- osm_mon/collector/backends/__init__.py | 21 ++ osm_mon/collector/backends/base.py | 26 +++ .../prometheus.py} | 67 +++---- osm_mon/collector/collector.py | 182 +++++++----------- osm_mon/collector/collectors/__init__.py | 21 ++ osm_mon/collector/collectors/base.py | 27 +++ osm_mon/collector/collectors/base_vim.py | 30 +++ osm_mon/collector/collectors/juju.py | 71 +++++++ osm_mon/collector/collectors/openstack.py | 113 +++++++++++ osm_mon/collector/metric.py | 30 +++ osm_mon/common/__init__.py | 21 ++ osm_mon/common/common_db_client.py | 12 +- osm_mon/core/message_bus/consumer.py | 2 +- osm_mon/core/settings.py | 2 +- .../OpenStack/Gnocchi/metric_handler.py | 5 +- osm_mon/test/collector/test_collector.py | 104 +++++++--- .../plugins/OpenStack/unit/test_alarming.py | 2 +- requirements.txt | 1 + setup.py | 4 +- 23 files changed, 559 insertions(+), 196 deletions(-) rename osm_mon/cmd/{mon_prometheus_exporter.py => mon_collector.py} (89%) create mode 100644 osm_mon/collector/backends/__init__.py create mode 100644 osm_mon/collector/backends/base.py rename osm_mon/collector/{prometheus_exporter.py => backends/prometheus.py} (50%) create mode 100644 osm_mon/collector/collectors/__init__.py create mode 100644 osm_mon/collector/collectors/base.py create mode 100644 osm_mon/collector/collectors/base_vim.py create mode 100644 osm_mon/collector/collectors/juju.py create mode 100644 osm_mon/collector/collectors/openstack.py create mode 100644 osm_mon/collector/metric.py diff --git a/debian/python3-osm-mon.postinst b/debian/python3-osm-mon.postinst index 6f9b9c1..f4fabd1 100644 --- a/debian/python3-osm-mon.postinst +++ b/debian/python3-osm-mon.postinst @@ -17,4 +17,5 @@ pip3 install bottle==0.12.* pip3 install peewee==3.1.* pip3 install pyyaml==3.* pip3 install prometheus_client==0.4.* +pip3 install gnocchiclient==7.0.* echo "Installation of python dependencies finished" \ No newline at end of file diff --git a/docker/scripts/runInstall.sh b/docker/scripts/runInstall.sh index 29ab3c6..2e12772 100755 --- a/docker/scripts/runInstall.sh +++ b/docker/scripts/runInstall.sh @@ -23,5 +23,5 @@ /bin/bash /mon/osm_mon/plugins/vRealiseOps/vROPs_Webservice/install.sh python3 /mon/osm_mon/plugins/OpenStack/Aodh/notifier.py & python3 /mon/osm_mon/core/message_bus/common_consumer.py & -osm-mon-prometheus-exporter +osm-mon-collector diff --git a/osm_mon/cmd/mon_prometheus_exporter.py b/osm_mon/cmd/mon_collector.py similarity index 89% rename from osm_mon/cmd/mon_prometheus_exporter.py rename to osm_mon/cmd/mon_collector.py index 522bd2f..3d0e836 100644 --- a/osm_mon/cmd/mon_prometheus_exporter.py +++ b/osm_mon/cmd/mon_collector.py @@ -24,8 +24,8 @@ import logging import sys +from osm_mon.collector.collector import Collector from osm_mon.core.settings import Config -from osm_mon.collector.prometheus_exporter import MonPrometheusExporter def main(): @@ -43,10 +43,11 @@ def main(): kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) log = logging.getLogger(__name__) - log.info("Starting MON Prometheus Exporter...") + log.info("Starting MON Collector...") log.info("Config: %s", vars(cfg)) - exporter = MonPrometheusExporter() - exporter.run() + collector = Collector() + collector.init_plugins() + collector.collect_forever() if __name__ == '__main__': diff --git a/osm_mon/collector/__init__.py b/osm_mon/collector/__init__.py index 8fc00af..d81308a 100644 --- a/osm_mon/collector/__init__.py +++ b/osm_mon/collector/__init__.py @@ -20,4 +20,4 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com -## \ No newline at end of file +## diff --git a/osm_mon/collector/backends/__init__.py b/osm_mon/collector/backends/__init__.py new file mode 100644 index 0000000..971f4e9 --- /dev/null +++ b/osm_mon/collector/backends/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## diff --git a/osm_mon/collector/backends/base.py b/osm_mon/collector/backends/base.py new file mode 100644 index 0000000..8cba5e1 --- /dev/null +++ b/osm_mon/collector/backends/base.py @@ -0,0 +1,26 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## + + +class BaseBackend: + def handle(self, metrics: list): + pass diff --git a/osm_mon/collector/prometheus_exporter.py b/osm_mon/collector/backends/prometheus.py similarity index 50% rename from osm_mon/collector/prometheus_exporter.py rename to osm_mon/collector/backends/prometheus.py index 2bb2e27..70632a1 100644 --- a/osm_mon/collector/prometheus_exporter.py +++ b/osm_mon/collector/backends/prometheus.py @@ -20,61 +20,48 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import asyncio import logging -import threading -import time +from typing import List from prometheus_client import start_http_server -from prometheus_client.core import REGISTRY +from prometheus_client.core import REGISTRY, GaugeMetricFamily -from osm_mon.collector.collector import MonCollector -from osm_mon.core.settings import Config +from osm_mon.collector.backends.base import BaseBackend +from osm_mon.collector.metric import Metric log = logging.getLogger(__name__) +OSM_METRIC_PREFIX = 'osm_' -class MonPrometheusExporter: + +class PrometheusBackend(BaseBackend): def __init__(self): self.custom_collector = CustomCollector() - - def _run_exporter(self): - log.debug('_run_exporter') + self._start_exporter(8000) + + def handle(self, metrics: List[Metric]): + prometheus_metrics = [] + for metric in metrics: + prometheus_metric = GaugeMetricFamily( + OSM_METRIC_PREFIX + metric.name, + 'OSM metric', + labels=['ns_id', 'vnf_member_index', 'vdu_name'] + ) + prometheus_metric.add_metric([metric.nsr_id, metric.vnf_member_index, metric.vdur_name], metric.value) + prometheus_metrics.append(prometheus_metric) + self.custom_collector.metrics = prometheus_metrics + + def _start_exporter(self, port): + log.debug('_start_exporter') REGISTRY.register(self.custom_collector) - log.info("Starting MON Prometheus exporter at port %s", 8000) - start_http_server(8000) - - def run(self): - log.debug('_run') - collector_thread = threading.Thread(target=self._run_collector) - collector_thread.setDaemon(True) - collector_thread.start() - exporter_thread = threading.Thread(target=self._run_exporter) - exporter_thread.setDaemon(True) - exporter_thread.start() - collector_thread.join() - exporter_thread.join() - - def _run_collector(self): - log.debug('_run_collector') - asyncio.set_event_loop(asyncio.new_event_loop()) - mon_collector = MonCollector() - cfg = Config.instance() - while True: - try: - log.debug('_run_collector_loop') - metrics = asyncio.get_event_loop().run_until_complete(mon_collector.collect_metrics()) - self.custom_collector.metrics = metrics - time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL) - except Exception: - log.exception("Error collecting metrics") + log.info("Starting MON Prometheus exporter at port %s", port) + start_http_server(port) class CustomCollector(object): def __init__(self): - self.mon_collector = MonCollector() self.metrics = [] def describe(self): @@ -84,7 +71,3 @@ class CustomCollector(object): def collect(self): log.debug("collect") return self.metrics - - -if __name__ == '__main__': - MonPrometheusExporter().run() diff --git a/osm_mon/collector/collector.py b/osm_mon/collector/collector.py index 2c77f28..ac9408f 100644 --- a/osm_mon/collector/collector.py +++ b/osm_mon/collector/collector.py @@ -20,125 +20,89 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import json import logging -import random -import uuid - -from n2vc.vnf import N2VC -from prometheus_client.core import GaugeMetricFamily +import multiprocessing +import time +from osm_mon.collector.backends.prometheus import PrometheusBackend +from osm_mon.collector.collectors.juju import VCACollector +from osm_mon.collector.collectors.openstack import OpenstackCollector from osm_mon.common.common_db_client import CommonDbClient -from osm_mon.core.message_bus.consumer import Consumer -from osm_mon.core.message_bus.producer import Producer +from osm_mon.core.database import DatabaseManager from osm_mon.core.settings import Config log = logging.getLogger(__name__) +VIM_COLLECTORS = { + "openstack": OpenstackCollector +} + -class MonCollector: +class Collector: def __init__(self): - cfg = Config.instance() - self.kafka_server = cfg.BROKER_URI - self.common_db_client = CommonDbClient() - self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET) + self.common_db = CommonDbClient() self.producer_timeout = 5 + self.consumer_timeout = 5 + self.plugins = [] + self.database_manager = DatabaseManager() + self.database_manager.create_tables() + + def init_plugins(self): + prometheus_plugin = PrometheusBackend() + self.plugins.append(prometheus_plugin) + + def collect_forever(self): + log.debug('collect_forever') + cfg = Config.instance() + while True: + try: + self.collect_metrics() + time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL) + except Exception: + log.exception("Error collecting metrics") + + def _get_vim_account_id(self, nsr_id: str, vnf_member_index: int) -> str: + vnfr = self.common_db.get_vnfr(nsr_id, vnf_member_index) + return vnfr['vim-account-id'] + + def _get_vim_type(self, vim_account_id): + """Get the vim type that is required by the message.""" + credentials = self.database_manager.get_credentials(vim_account_id) + return credentials.type + + def _init_vim_collector_and_collect(self, vnfr: dict, vim_account_id: str, queue: multiprocessing.Queue): + # TODO(diazb) Add support for vrops and aws + vim_type = self._get_vim_type(vim_account_id) + if vim_type in VIM_COLLECTORS: + collector = VIM_COLLECTORS[vim_type](vim_account_id) + collector.collect(vnfr, queue) + else: + log.debug("vimtype %s is not supported.", vim_type) + + def _init_vca_collector_and_collect(self, vnfr: dict, queue: multiprocessing.Queue): + vca_collector = VCACollector() + vca_collector.collect(vnfr, queue) - async def collect_metrics(self): - """ - Collects vdu metrics. These can be vim and/or n2vc metrics. - It checks for monitoring-params or metrics inside vdu section of vnfd, then collects the metric accordingly. - If vim related, it sends a metric read request through Kafka, to be handled by mon-proxy. - If n2vc related, it uses the n2vc client to obtain the readings. - :return: lists of metrics - """ - # TODO(diazb): Remove dependencies on prometheus_client - log.debug("collect_metrics") - producer = Producer() - consumer = Consumer('mon-collector-' + str(uuid.uuid4()), - consumer_timeout_ms=10000, - enable_auto_commit=False) - consumer.subscribe(['metric_response']) - metrics = {} - vnfrs = self.common_db_client.get_vnfrs() - vca_model_name = 'default' + def collect_metrics(self): + queue = multiprocessing.Queue() + vnfrs = self.common_db.get_vnfrs() + processes = [] for vnfr in vnfrs: nsr_id = vnfr['nsr-id-ref'] - nsr = self.common_db_client.get_nsr(nsr_id) - vnfd = self.common_db_client.get_vnfd(vnfr['vnfd-id']) - for vdur in vnfr['vdur']: - # This avoids errors when vdur records have not been completely filled - if 'name' not in vdur: - continue - vdu = next( - filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) - ) - vnf_member_index = vnfr['member-vnf-index-ref'] - vdu_name = vdur['name'] - if 'monitoring-param' in vdu: - for param in vdu['monitoring-param']: - metric_name = param['nfvi-metric'] - payload = self._generate_read_metric_payload(metric_name, - nsr_id, - vdu_name, - vnf_member_index) - producer.send(topic='metric_request', key='read_metric_data_request', - value=json.dumps(payload)) - producer.flush(self.producer_timeout) - for message in consumer: - if message.key == 'read_metric_data_response': - content = json.loads(message.value) - if content['correlation_id'] == payload['correlation_id']: - log.debug("Found read_metric_data_response with same correlation_id") - if len(content['metrics_data']['metrics_series']): - metric_reading = content['metrics_data']['metrics_series'][-1] - if metric_name not in metrics.keys(): - metrics[metric_name] = GaugeMetricFamily( - metric_name, - 'OSM metric', - labels=['ns_id', 'vnf_member_index', 'vdu_name'] - ) - metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name], - metric_reading) - break - if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']: - vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref']) - vnf_metrics = await self.n2vc.GetMetrics(vca_model_name, vnf_name_vca) - log.debug('VNF Metrics: %s', vnf_metrics) - for vnf_metric_list in vnf_metrics.values(): - for vnf_metric in vnf_metric_list: - log.debug("VNF Metric: %s", vnf_metric) - if vnf_metric['key'] not in metrics.keys(): - metrics[vnf_metric['key']] = GaugeMetricFamily( - vnf_metric['key'], - 'OSM metric', - labels=['ns_id', 'vnf_member_index', 'vdu_name'] - ) - metrics[vnf_metric['key']].add_metric([nsr_id, vnf_member_index, vdu_name], - float(vnf_metric['value'])) - consumer.close() - producer.close(self.producer_timeout) - log.debug("metric.values = %s", metrics.values()) - return metrics.values() - - @staticmethod - def _generate_read_metric_payload(metric_name, nsr_id, vdu_name, vnf_member_index) -> dict: - """ - Builds JSON payload for asking for a metric measurement in MON. It follows the model defined in core.models. - :param metric_name: OSM metric name (e.g.: cpu_utilization) - :param nsr_id: NSR ID - :param vdu_name: Vdu name according to the vdur - :param vnf_member_index: Index of the VNF in the NS according to the vnfr - :return: JSON payload as dict - """ - cor_id = random.randint(1, 10e7) - payload = { - 'correlation_id': cor_id, - 'metric_name': metric_name, - 'ns_id': nsr_id, - 'vnf_member_index': vnf_member_index, - 'vdu_name': vdu_name, - 'collection_period': 1, - 'collection_unit': 'DAY', - } - return payload + vnf_member_index = vnfr['member-vnf-index-ref'] + vim_account_id = self._get_vim_account_id(nsr_id, vnf_member_index) + p = multiprocessing.Process(target=self._init_vim_collector_and_collect, + args=(vnfr, vim_account_id, queue)) + processes.append(p) + p.start() + p = multiprocessing.Process(target=self._init_vca_collector_and_collect, + args=(vnfr, queue)) + processes.append(p) + p.start() + for process in processes: + process.join() + metrics = [] + while not queue.empty(): + metrics.append(queue.get()) + for plugin in self.plugins: + plugin.handle(metrics) diff --git a/osm_mon/collector/collectors/__init__.py b/osm_mon/collector/collectors/__init__.py new file mode 100644 index 0000000..971f4e9 --- /dev/null +++ b/osm_mon/collector/collectors/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## diff --git a/osm_mon/collector/collectors/base.py b/osm_mon/collector/collectors/base.py new file mode 100644 index 0000000..ed620bf --- /dev/null +++ b/osm_mon/collector/collectors/base.py @@ -0,0 +1,27 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +from multiprocessing import Queue + + +class BaseCollector: + def collect(self, vnfr: dict, queue: Queue): + pass diff --git a/osm_mon/collector/collectors/base_vim.py b/osm_mon/collector/collectors/base_vim.py new file mode 100644 index 0000000..bf7da38 --- /dev/null +++ b/osm_mon/collector/collectors/base_vim.py @@ -0,0 +1,30 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +from multiprocessing import Queue + + +class BaseVimCollector: + def __init__(self, vim_account_id: str): + pass + + def collect(self, vnfr: dict, queue: Queue): + pass diff --git a/osm_mon/collector/collectors/juju.py b/osm_mon/collector/collectors/juju.py new file mode 100644 index 0000000..162a527 --- /dev/null +++ b/osm_mon/collector/collectors/juju.py @@ -0,0 +1,71 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +import asyncio +import logging +from multiprocessing import Queue + +from n2vc.vnf import N2VC + +from osm_mon.collector.collectors.base import BaseCollector +from osm_mon.collector.metric import Metric +from osm_mon.common.common_db_client import CommonDbClient +from osm_mon.core.settings import Config + +log = logging.getLogger(__name__) + + +class VCACollector(BaseCollector): + def __init__(self): + cfg = Config.instance() + self.common_db = CommonDbClient() + self.loop = asyncio.get_event_loop() + self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET) + + def collect(self, vnfr: dict, queue: Queue): + vca_model_name = 'default' + nsr_id = vnfr['nsr-id-ref'] + vnf_member_index = vnfr['member-vnf-index-ref'] + vnfd = self.common_db.get_vnfd(vnfr['vnfd-id']) + for vdur in vnfr['vdur']: + nsr = self.common_db.get_nsr(nsr_id) + vdu = next( + filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) + ) + if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']: + vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref']) + metrics = self.loop.run_until_complete(self.n2vc.GetMetrics(vca_model_name, vnf_name_vca)) + log.debug('Metrics: %s', metrics) + for metric_list in metrics.values(): + for metric in metric_list: + log.debug("Metric: %s", metric) + metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric['key'], float(metric['value'])) + queue.put(metric) + if 'vnf-configuration' in vnfr and 'metrics' in vnfr['vnf-configuration']: + vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref']) + metrics = self.loop.run_until_complete(self.n2vc.GetMetrics(vca_model_name, vnf_name_vca)) + log.debug('Metrics: %s', metrics) + for metric_list in metrics.values(): + for metric in metric_list: + log.debug("Metric: %s", metric) + metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric['key'], float(metric['value'])) + queue.put(metric) + # TODO (diazb): Implement vnf-configuration config diff --git a/osm_mon/collector/collectors/openstack.py b/osm_mon/collector/collectors/openstack.py new file mode 100644 index 0000000..811014d --- /dev/null +++ b/osm_mon/collector/collectors/openstack.py @@ -0,0 +1,113 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +import datetime +import json +import logging +import multiprocessing +import gnocchiclient.exceptions + +from gnocchiclient.v1 import client as gnocchi_client +from keystoneauth1 import session +from keystoneauth1.identity import v3 + +from osm_mon.collector.collectors.base_vim import BaseVimCollector +from osm_mon.collector.metric import Metric +from osm_mon.common.common_db_client import CommonDbClient +from osm_mon.core.auth import AuthManager +from osm_mon.core.settings import Config + +log = logging.getLogger(__name__) + +METRIC_MAPPINGS = { + "average_memory_utilization": "memory.usage", + "disk_read_ops": "disk.read.requests", + "disk_write_ops": "disk.write.requests", + "disk_read_bytes": "disk.read.bytes", + "disk_write_bytes": "disk.write.bytes", + "packets_dropped": "interface.if_dropped", + "packets_received": "interface.if_packets", + "packets_sent": "interface.if_packets", + "cpu_utilization": "cpu_util", +} + + +class OpenstackCollector(BaseVimCollector): + def __init__(self, vim_account_id: str): + super().__init__(vim_account_id) + self.common_db = CommonDbClient() + self.auth_manager = AuthManager() + self.granularity = self._get_granularity(vim_account_id) + self.gnocchi_client = self._build_gnocchi_client(vim_account_id) + + def _get_resource_uuid(self, nsr_id, vnf_member_index, vdur_name) -> str: + vdur = self.common_db.get_vdur(nsr_id, vnf_member_index, vdur_name) + return vdur['vim-id'] + + def _build_gnocchi_client(self, vim_account_id: str) -> gnocchi_client.Client: + creds = self.auth_manager.get_credentials(vim_account_id) + verify_ssl = self.auth_manager.is_verify_ssl(vim_account_id) + auth = v3.Password(auth_url=creds.url, + username=creds.user, + password=creds.password, + project_name=creds.tenant_name, + project_domain_id='default', + user_domain_id='default') + sess = session.Session(auth=auth, verify=verify_ssl) + return gnocchi_client.Client(session=sess) + + def _get_granularity(self, vim_account_id: str): + creds = self.auth_manager.get_credentials(vim_account_id) + vim_config = json.loads(creds.config) + if 'granularity' in vim_config: + return int(vim_config['granularity']) + else: + cfg = Config.instance() + return cfg.OS_DEFAULT_GRANULARITY + + def collect(self, vnfr: dict, queue: multiprocessing.Queue): + nsr_id = vnfr['nsr-id-ref'] + vnf_member_index = vnfr['member-vnf-index-ref'] + vnfd = self.common_db.get_vnfd(vnfr['vnfd-id']) + for vdur in vnfr['vdur']: + # This avoids errors when vdur records have not been completely filled + if 'name' not in vdur: + return None + vdu = next( + filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) + ) + if 'monitoring-param' in vdu: + for param in vdu['monitoring-param']: + metric_name = param['nfvi-metric'] + gnocchi_metric_name = METRIC_MAPPINGS[metric_name] + start_date = datetime.datetime.now() - datetime.timedelta(seconds=self.granularity) + resource_id = self._get_resource_uuid(nsr_id, vnf_member_index, vdur['name']) + try: + metrics = self.gnocchi_client.metric.get_measures(gnocchi_metric_name, + start=start_date, + resource_id=resource_id, + granularity=self.granularity) + if len(metrics): + metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric_name, metrics[-1][2]) + queue.put(metric) + except gnocchiclient.exceptions.NotFound as e: + log.debug("No metric found: %s", e) + pass diff --git a/osm_mon/collector/metric.py b/osm_mon/collector/metric.py new file mode 100644 index 0000000..e8da11b --- /dev/null +++ b/osm_mon/collector/metric.py @@ -0,0 +1,30 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## + + +class Metric: + def __init__(self, nsr_id, vnf_member_index, vdur_name, name, value): + self.nsr_id = nsr_id + self.vnf_member_index = vnf_member_index + self.vdur_name = vdur_name + self.name = name + self.value = value diff --git a/osm_mon/common/__init__.py b/osm_mon/common/__init__.py index e69de29..971f4e9 100644 --- a/osm_mon/common/__init__.py +++ b/osm_mon/common/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## diff --git a/osm_mon/common/common_db_client.py b/osm_mon/common/common_db_client.py index c6237ee..71d1306 100644 --- a/osm_mon/common/common_db_client.py +++ b/osm_mon/common/common_db_client.py @@ -39,12 +39,12 @@ class CommonDbClient: {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) return vnfr - def get_vnfrs(self, nsr_id: str): - return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in - self.get_nsr(nsr_id)['nsd']['constituent-vnfd']] - - def get_vnfrs(self): - return self.common_db.get_list('vnfrs') + def get_vnfrs(self, nsr_id: str = None): + if nsr_id: + return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in + self.get_nsr(nsr_id)['nsd']['constituent-vnfd']] + else: + return self.common_db.get_list('vnfrs') def get_vnfd(self, vnfd_id: str): vnfr = self.common_db.get_one("vnfds", diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py index 1ccf936..c0a9dd0 100644 --- a/osm_mon/core/message_bus/consumer.py +++ b/osm_mon/core/message_bus/consumer.py @@ -10,6 +10,6 @@ class Consumer(KafkaConsumer): super().__init__(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - max_poll_interval_ms=900000, + max_poll_interval_ms=180000, group_id=group_id, **kwargs) diff --git a/osm_mon/core/settings.py b/osm_mon/core/settings.py index 978c957..a7599cc 100644 --- a/osm_mon/core/settings.py +++ b/osm_mon/core/settings.py @@ -63,7 +63,7 @@ class Config(object): CfgParam('MONGO_URI', "mongo:27017", six.text_type), CfgParam('DATABASE', "sqlite:///mon_sqlite.db", six.text_type), CfgParam('OS_NOTIFIER_URI', "http://localhost:8662", six.text_type), - CfgParam('OS_DEFAULT_GRANULARITY', "300", six.text_type), + CfgParam('OS_DEFAULT_GRANULARITY', 300, int), CfgParam('REQUEST_TIMEOUT', 10, int), CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type), CfgParam('OSMMON_KAFKA_LOG_LEVEL', "WARN", six.text_type), diff --git a/osm_mon/plugins/OpenStack/Gnocchi/metric_handler.py b/osm_mon/plugins/OpenStack/Gnocchi/metric_handler.py index 91dc402..9736da8 100644 --- a/osm_mon/plugins/OpenStack/Gnocchi/metric_handler.py +++ b/osm_mon/plugins/OpenStack/Gnocchi/metric_handler.py @@ -384,10 +384,7 @@ class OpenstackMetricHandler(object): # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X") end_time = int(round(time.time() * 1000)) - if collection_unit == 'YEAR': - diff = PERIOD_MS[collection_unit] - else: - diff = collection_period * PERIOD_MS[collection_unit] + diff = collection_period * PERIOD_MS[collection_unit] s_time = (end_time - diff) / 1000.0 start_time = datetime.datetime.fromtimestamp(s_time).strftime( '%Y-%m-%dT%H:%M:%S.%f') diff --git a/osm_mon/test/collector/test_collector.py b/osm_mon/test/collector/test_collector.py index 2a5491f..6c3df42 100644 --- a/osm_mon/test/collector/test_collector.py +++ b/osm_mon/test/collector/test_collector.py @@ -12,6 +12,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 +import multiprocessing # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -20,35 +21,88 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import asyncio -import random import unittest from unittest import mock -from osm_mon.collector.collector import MonCollector +from osm_mon.collector.collector import Collector +from osm_mon.collector.collectors.juju import VCACollector +from osm_mon.collector.collectors.openstack import OpenstackCollector +from osm_mon.core.database import VimCredentials -class MonCollectorTest(unittest.TestCase): +class CollectorTest(unittest.TestCase): def setUp(self): super().setUp() - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) - - @mock.patch.object(random, 'randint') - def test_generate_read_metric_payload(self, randint): - randint.return_value = 1 - metric_name = 'cpu_utilization' - nsr_id = 'test_id' - vdu_name = 'test_vdu' - vnf_member_index = 1 - expected_payload = { - 'correlation_id': 1, - 'metric_name': metric_name, - 'ns_id': nsr_id, - 'vnf_member_index': vnf_member_index, - 'vdu_name': vdu_name, - 'collection_period': 1, - 'collection_unit': 'DAY', - } - result = MonCollector._generate_read_metric_payload(metric_name, nsr_id, vdu_name, vnf_member_index) - self.assertEqual(result, expected_payload) + + @mock.patch("osm_mon.collector.collector.CommonDbClient", autospec=True) + def test_get_vim_id(self, common_db): + common_db.return_value.get_vnfr.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01', + '_admin': { + 'projects_read': ['admin'], 'created': 1526044312.102287, + 'modified': 1526044312.102287, 'projects_write': ['admin'] + }, + 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb', + 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e', + 'vdur': [ + { + 'internal-connection-point': [], + 'vdu-id-ref': 'ubuntuvnf_vnfd-VM', + 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d', + 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7', + 'name': 'ubuntuvnf_vnfd-VM' + } + ], + 'vnfd-ref': 'ubuntuvnf_vnfd', + 'member-vnf-index-ref': '1', + 'created-time': 1526044312.0999322, + 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01', + 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'} + collector = Collector() + vim_account_id = collector._get_vim_account_id('5ec3f571-d540-4cb0-9992-971d1b08312e', 1) + self.assertEqual(vim_account_id, 'c1740601-7287-48c8-a2c9-bce8fee459eb') + + @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock()) + @mock.patch("osm_mon.collector.collector.DatabaseManager", autospec=True) + def test_get_vim_type(self, database_manager): + mock_creds = VimCredentials() + mock_creds.id = 'test_id' + mock_creds.user = 'user' + mock_creds.url = 'url' + mock_creds.password = 'password' + mock_creds.tenant_name = 'tenant_name' + mock_creds.type = 'openstack' + + database_manager.return_value.get_credentials.return_value = mock_creds + collector = Collector() + vim_type = collector._get_vim_type('test_id') + self.assertEqual(vim_type, 'openstack') + + @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock()) + @mock.patch.object(OpenstackCollector, "__init__", lambda *args, **kwargs: None) + @mock.patch.object(OpenstackCollector, "collect") + @mock.patch.object(Collector, "_get_vim_type") + def test_init_vim_collector_and_collect_openstack(self, _get_vim_type, collect): + _get_vim_type.return_value = 'openstack' + collector = Collector() + queue = multiprocessing.Queue() + collector._init_vim_collector_and_collect({}, 'test_vim_account_id', queue) + collect.assert_called_once_with({}, queue) + + @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock()) + @mock.patch.object(OpenstackCollector, "collect") + @mock.patch.object(Collector, "_get_vim_type") + def test_init_vim_collector_and_collect_unknown(self, _get_vim_type, openstack_collect): + _get_vim_type.return_value = 'unknown' + collector = Collector() + queue = multiprocessing.Queue() + collector._init_vim_collector_and_collect({}, 'test_vim_account_id', queue) + openstack_collect.assert_not_called() + + @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock()) + @mock.patch("osm_mon.collector.collector.VCACollector", autospec=True) + def test_init_vca_collector_and_collect(self, vca_collector): + collector = Collector() + queue = multiprocessing.Queue() + collector._init_vca_collector_and_collect({}, queue) + vca_collector.assert_called_once_with() + vca_collector.return_value.collect.assert_called_once_with({}, queue) diff --git a/osm_mon/test/plugins/OpenStack/unit/test_alarming.py b/osm_mon/test/plugins/OpenStack/unit/test_alarming.py index 67486e7..aeacd7b 100644 --- a/osm_mon/test/plugins/OpenStack/unit/test_alarming.py +++ b/osm_mon/test/plugins/OpenStack/unit/test_alarming.py @@ -92,7 +92,7 @@ class TestAlarming(unittest.TestCase): self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {}, True) payload = {"name": "disk_write_ops", "gnocchi_resources_threshold_rule": {"resource_type": "generic", "comparison_operator": "gt", - "granularity": "300", "metric": "disk.write.requests", + "granularity": 300, "metric": "disk.write.requests", "aggregation_method": "mean", "threshold": 60, "resource_id": "my_r_id"}, "alarm_actions": ["http://localhost:8662"], "repeat_actions": True, "state": "ok", "type": "gnocchi_resources_threshold", diff --git a/requirements.txt b/requirements.txt index f9b4407..e0dba98 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,5 +34,6 @@ bottle==0.12.* peewee==3.1.* pyyaml==3.* prometheus_client==0.4.* +gnocchiclient==7.0.* git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc \ No newline at end of file diff --git a/setup.py b/setup.py index 1015de4..3646610 100644 --- a/setup.py +++ b/setup.py @@ -68,13 +68,15 @@ setup( "bottle==0.12.*", "peewee==3.1.*", "pyyaml==3.*", + "prometheus_client==0.4.*", + "gnocchiclient==7.0.*", "osm-common", "n2vc" ], include_package_data=True, entry_points={ "console_scripts": [ - "osm-mon-prometheus-exporter = osm_mon.cmd.mon_prometheus_exporter:main", + "osm-mon-collector = osm_mon.cmd.mon_collector:main", ] }, dependency_links=[ -- 2.17.1