Changes way metrics are collected, removing the use of mon-proxy 63/6863/4
authorBenjamin Diaz <bdiaz@whitestack.com>
Fri, 9 Nov 2018 20:52:08 +0000 (17:52 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Tue, 13 Nov 2018 21:06:07 +0000 (18:06 -0300)
Collector now collects vim metrics directly, without to send kafka msgs
to mon-proxy. Also a plugin/backend model has been implemented to ease
possible inclusions of other vims and tsbds in the future.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: I554a4f5e410a31ec70aa301c8aa819b1f03a3857

25 files changed:
debian/python3-osm-mon.postinst
docker/scripts/runInstall.sh
osm_mon/cmd/mon_collector.py [new file with mode: 0644]
osm_mon/cmd/mon_prometheus_exporter.py [deleted file]
osm_mon/collector/__init__.py
osm_mon/collector/backends/__init__.py [new file with mode: 0644]
osm_mon/collector/backends/base.py [new file with mode: 0644]
osm_mon/collector/backends/prometheus.py [new file with mode: 0644]
osm_mon/collector/collector.py
osm_mon/collector/collectors/__init__.py [new file with mode: 0644]
osm_mon/collector/collectors/base.py [new file with mode: 0644]
osm_mon/collector/collectors/base_vim.py [new file with mode: 0644]
osm_mon/collector/collectors/juju.py [new file with mode: 0644]
osm_mon/collector/collectors/openstack.py [new file with mode: 0644]
osm_mon/collector/metric.py [new file with mode: 0644]
osm_mon/collector/prometheus_exporter.py [deleted file]
osm_mon/common/__init__.py
osm_mon/common/common_db_client.py
osm_mon/core/message_bus/consumer.py
osm_mon/core/settings.py
osm_mon/plugins/OpenStack/Gnocchi/metric_handler.py
osm_mon/test/collector/test_collector.py
osm_mon/test/plugins/OpenStack/unit/test_alarming.py
requirements.txt
setup.py

index 6f9b9c1..f4fabd1 100644 (file)
@@ -17,4 +17,5 @@ pip3 install bottle==0.12.*
 pip3 install peewee==3.1.*
 pip3 install pyyaml==3.*
 pip3 install prometheus_client==0.4.*
+pip3 install gnocchiclient==7.0.*
 echo "Installation of python dependencies finished"
\ No newline at end of file
index 29ab3c6..2e12772 100755 (executable)
@@ -23,5 +23,5 @@
 /bin/bash /mon/osm_mon/plugins/vRealiseOps/vROPs_Webservice/install.sh
 python3 /mon/osm_mon/plugins/OpenStack/Aodh/notifier.py &
 python3 /mon/osm_mon/core/message_bus/common_consumer.py &
-osm-mon-prometheus-exporter
+osm-mon-collector
 
diff --git a/osm_mon/cmd/mon_collector.py b/osm_mon/cmd/mon_collector.py
new file mode 100644 (file)
index 0000000..3d0e836
--- /dev/null
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import logging
+import sys
+
+from osm_mon.collector.collector import Collector
+from osm_mon.core.settings import Config
+
+
+def main():
+    cfg = Config.instance()
+
+    root = logging.getLogger()
+    root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+    ch = logging.StreamHandler(sys.stdout)
+    ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
+    ch.setFormatter(formatter)
+    root.addHandler(ch)
+
+    kafka_logger = logging.getLogger('kafka')
+    kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+
+    log = logging.getLogger(__name__)
+    log.info("Starting MON Collector...")
+    log.info("Config: %s", vars(cfg))
+    collector = Collector()
+    collector.init_plugins()
+    collector.collect_forever()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/osm_mon/cmd/mon_prometheus_exporter.py b/osm_mon/cmd/mon_prometheus_exporter.py
deleted file mode 100644 (file)
index 522bd2f..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-import logging
-import sys
-
-from osm_mon.core.settings import Config
-from osm_mon.collector.prometheus_exporter import MonPrometheusExporter
-
-
-def main():
-    cfg = Config.instance()
-
-    root = logging.getLogger()
-    root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
-    ch = logging.StreamHandler(sys.stdout)
-    ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
-    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
-    ch.setFormatter(formatter)
-    root.addHandler(ch)
-
-    kafka_logger = logging.getLogger('kafka')
-    kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
-    log = logging.getLogger(__name__)
-    log.info("Starting MON Prometheus Exporter...")
-    log.info("Config: %s", vars(cfg))
-    exporter = MonPrometheusExporter()
-    exporter.run()
-
-
-if __name__ == '__main__':
-    main()
index 8fc00af..d81308a 100644 (file)
@@ -20,4 +20,4 @@
 
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
\ No newline at end of file
+##
diff --git a/osm_mon/collector/backends/__init__.py b/osm_mon/collector/backends/__init__.py
new file mode 100644 (file)
index 0000000..971f4e9
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
diff --git a/osm_mon/collector/backends/base.py b/osm_mon/collector/backends/base.py
new file mode 100644 (file)
index 0000000..8cba5e1
--- /dev/null
@@ -0,0 +1,26 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
+
+class BaseBackend:
+    def handle(self, metrics: list):
+        pass
diff --git a/osm_mon/collector/backends/prometheus.py b/osm_mon/collector/backends/prometheus.py
new file mode 100644 (file)
index 0000000..70632a1
--- /dev/null
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import logging
+from typing import List
+
+from prometheus_client import start_http_server
+from prometheus_client.core import REGISTRY, GaugeMetricFamily
+
+from osm_mon.collector.backends.base import BaseBackend
+from osm_mon.collector.metric import Metric
+
+log = logging.getLogger(__name__)
+
+OSM_METRIC_PREFIX = 'osm_'
+
+
+class PrometheusBackend(BaseBackend):
+
+    def __init__(self):
+        self.custom_collector = CustomCollector()
+        self._start_exporter(8000)
+
+    def handle(self, metrics: List[Metric]):
+        prometheus_metrics = []
+        for metric in metrics:
+            prometheus_metric = GaugeMetricFamily(
+                OSM_METRIC_PREFIX + metric.name,
+                'OSM metric',
+                labels=['ns_id', 'vnf_member_index', 'vdu_name']
+            )
+            prometheus_metric.add_metric([metric.nsr_id, metric.vnf_member_index, metric.vdur_name], metric.value)
+            prometheus_metrics.append(prometheus_metric)
+        self.custom_collector.metrics = prometheus_metrics
+
+    def _start_exporter(self, port):
+        log.debug('_start_exporter')
+        REGISTRY.register(self.custom_collector)
+        log.info("Starting MON Prometheus exporter at port %s", port)
+        start_http_server(port)
+
+
+class CustomCollector(object):
+
+    def __init__(self):
+        self.metrics = []
+
+    def describe(self):
+        log.debug('describe')
+        return []
+
+    def collect(self):
+        log.debug("collect")
+        return self.metrics
index 2c77f28..ac9408f 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-import json
 import logging
-import random
-import uuid
-
-from n2vc.vnf import N2VC
-from prometheus_client.core import GaugeMetricFamily
+import multiprocessing
+import time
 
+from osm_mon.collector.backends.prometheus import PrometheusBackend
+from osm_mon.collector.collectors.juju import VCACollector
+from osm_mon.collector.collectors.openstack import OpenstackCollector
 from osm_mon.common.common_db_client import CommonDbClient
-from osm_mon.core.message_bus.consumer import Consumer
-from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.database import DatabaseManager
 from osm_mon.core.settings import Config
 
 log = logging.getLogger(__name__)
 
+VIM_COLLECTORS = {
+    "openstack": OpenstackCollector
+}
+
 
-class MonCollector:
+class Collector:
     def __init__(self):
-        cfg = Config.instance()
-        self.kafka_server = cfg.BROKER_URI
-        self.common_db_client = CommonDbClient()
-        self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET)
+        self.common_db = CommonDbClient()
         self.producer_timeout = 5
+        self.consumer_timeout = 5
+        self.plugins = []
+        self.database_manager = DatabaseManager()
+        self.database_manager.create_tables()
+
+    def init_plugins(self):
+        prometheus_plugin = PrometheusBackend()
+        self.plugins.append(prometheus_plugin)
+
+    def collect_forever(self):
+        log.debug('collect_forever')
+        cfg = Config.instance()
+        while True:
+            try:
+                self.collect_metrics()
+                time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL)
+            except Exception:
+                log.exception("Error collecting metrics")
+
+    def _get_vim_account_id(self, nsr_id: str, vnf_member_index: int) -> str:
+        vnfr = self.common_db.get_vnfr(nsr_id, vnf_member_index)
+        return vnfr['vim-account-id']
+
+    def _get_vim_type(self, vim_account_id):
+        """Get the vim type that is required by the message."""
+        credentials = self.database_manager.get_credentials(vim_account_id)
+        return credentials.type
+
+    def _init_vim_collector_and_collect(self, vnfr: dict, vim_account_id: str, queue: multiprocessing.Queue):
+        # TODO(diazb) Add support for vrops and aws
+        vim_type = self._get_vim_type(vim_account_id)
+        if vim_type in VIM_COLLECTORS:
+            collector = VIM_COLLECTORS[vim_type](vim_account_id)
+            collector.collect(vnfr, queue)
+        else:
+            log.debug("vimtype %s is not supported.", vim_type)
+
+    def _init_vca_collector_and_collect(self, vnfr: dict, queue: multiprocessing.Queue):
+        vca_collector = VCACollector()
+        vca_collector.collect(vnfr, queue)
 
-    async def collect_metrics(self):
-        """
-        Collects vdu metrics. These can be vim and/or n2vc metrics.
-        It checks for monitoring-params or metrics inside vdu section of vnfd, then collects the metric accordingly.
-        If vim related, it sends a metric read request through Kafka, to be handled by mon-proxy.
-        If n2vc related, it uses the n2vc client to obtain the readings.
-        :return: lists of metrics
-        """
-        # TODO(diazb): Remove dependencies on prometheus_client
-        log.debug("collect_metrics")
-        producer = Producer()
-        consumer = Consumer('mon-collector-' + str(uuid.uuid4()),
-                            consumer_timeout_ms=10000,
-                            enable_auto_commit=False)
-        consumer.subscribe(['metric_response'])
-        metrics = {}
-        vnfrs = self.common_db_client.get_vnfrs()
-        vca_model_name = 'default'
+    def collect_metrics(self):
+        queue = multiprocessing.Queue()
+        vnfrs = self.common_db.get_vnfrs()
+        processes = []
         for vnfr in vnfrs:
             nsr_id = vnfr['nsr-id-ref']
-            nsr = self.common_db_client.get_nsr(nsr_id)
-            vnfd = self.common_db_client.get_vnfd(vnfr['vnfd-id'])
-            for vdur in vnfr['vdur']:
-                # This avoids errors when vdur records have not been completely filled
-                if 'name' not in vdur:
-                    continue
-                vdu = next(
-                    filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
-                )
-                vnf_member_index = vnfr['member-vnf-index-ref']
-                vdu_name = vdur['name']
-                if 'monitoring-param' in vdu:
-                    for param in vdu['monitoring-param']:
-                        metric_name = param['nfvi-metric']
-                        payload = self._generate_read_metric_payload(metric_name,
-                                                                     nsr_id,
-                                                                     vdu_name,
-                                                                     vnf_member_index)
-                        producer.send(topic='metric_request', key='read_metric_data_request',
-                                      value=json.dumps(payload))
-                        producer.flush(self.producer_timeout)
-                        for message in consumer:
-                            if message.key == 'read_metric_data_response':
-                                content = json.loads(message.value)
-                                if content['correlation_id'] == payload['correlation_id']:
-                                    log.debug("Found read_metric_data_response with same correlation_id")
-                                    if len(content['metrics_data']['metrics_series']):
-                                        metric_reading = content['metrics_data']['metrics_series'][-1]
-                                        if metric_name not in metrics.keys():
-                                            metrics[metric_name] = GaugeMetricFamily(
-                                                metric_name,
-                                                'OSM metric',
-                                                labels=['ns_id', 'vnf_member_index', 'vdu_name']
-                                            )
-                                        metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name],
-                                                                        metric_reading)
-                                    break
-                if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']:
-                    vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref'])
-                    vnf_metrics = await self.n2vc.GetMetrics(vca_model_name, vnf_name_vca)
-                    log.debug('VNF Metrics: %s', vnf_metrics)
-                    for vnf_metric_list in vnf_metrics.values():
-                        for vnf_metric in vnf_metric_list:
-                            log.debug("VNF Metric: %s", vnf_metric)
-                            if vnf_metric['key'] not in metrics.keys():
-                                metrics[vnf_metric['key']] = GaugeMetricFamily(
-                                    vnf_metric['key'],
-                                    'OSM metric',
-                                    labels=['ns_id', 'vnf_member_index', 'vdu_name']
-                                )
-                            metrics[vnf_metric['key']].add_metric([nsr_id, vnf_member_index, vdu_name],
-                                                                  float(vnf_metric['value']))
-        consumer.close()
-        producer.close(self.producer_timeout)
-        log.debug("metric.values = %s", metrics.values())
-        return metrics.values()
-
-    @staticmethod
-    def _generate_read_metric_payload(metric_name, nsr_id, vdu_name, vnf_member_index) -> dict:
-        """
-        Builds JSON payload for asking for a metric measurement in MON. It follows the model defined in core.models.
-        :param metric_name: OSM metric name (e.g.: cpu_utilization)
-        :param nsr_id: NSR ID
-        :param vdu_name: Vdu name according to the vdur
-        :param vnf_member_index: Index of the VNF in the NS according to the vnfr
-        :return: JSON payload as dict
-        """
-        cor_id = random.randint(1, 10e7)
-        payload = {
-            'correlation_id': cor_id,
-            'metric_name': metric_name,
-            'ns_id': nsr_id,
-            'vnf_member_index': vnf_member_index,
-            'vdu_name': vdu_name,
-            'collection_period': 1,
-            'collection_unit': 'DAY',
-        }
-        return payload
+            vnf_member_index = vnfr['member-vnf-index-ref']
+            vim_account_id = self._get_vim_account_id(nsr_id, vnf_member_index)
+            p = multiprocessing.Process(target=self._init_vim_collector_and_collect,
+                                        args=(vnfr, vim_account_id, queue))
+            processes.append(p)
+            p.start()
+            p = multiprocessing.Process(target=self._init_vca_collector_and_collect,
+                                        args=(vnfr, queue))
+            processes.append(p)
+            p.start()
+        for process in processes:
+            process.join()
+        metrics = []
+        while not queue.empty():
+            metrics.append(queue.get())
+        for plugin in self.plugins:
+            plugin.handle(metrics)
diff --git a/osm_mon/collector/collectors/__init__.py b/osm_mon/collector/collectors/__init__.py
new file mode 100644 (file)
index 0000000..971f4e9
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
diff --git a/osm_mon/collector/collectors/base.py b/osm_mon/collector/collectors/base.py
new file mode 100644 (file)
index 0000000..ed620bf
--- /dev/null
@@ -0,0 +1,27 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+from multiprocessing import Queue
+
+
+class BaseCollector:
+    def collect(self, vnfr: dict, queue: Queue):
+        pass
diff --git a/osm_mon/collector/collectors/base_vim.py b/osm_mon/collector/collectors/base_vim.py
new file mode 100644 (file)
index 0000000..bf7da38
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+from multiprocessing import Queue
+
+
+class BaseVimCollector:
+    def __init__(self, vim_account_id: str):
+        pass
+
+    def collect(self, vnfr: dict, queue: Queue):
+        pass
diff --git a/osm_mon/collector/collectors/juju.py b/osm_mon/collector/collectors/juju.py
new file mode 100644 (file)
index 0000000..162a527
--- /dev/null
@@ -0,0 +1,71 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+import logging
+from multiprocessing import Queue
+
+from n2vc.vnf import N2VC
+
+from osm_mon.collector.collectors.base import BaseCollector
+from osm_mon.collector.metric import Metric
+from osm_mon.common.common_db_client import CommonDbClient
+from osm_mon.core.settings import Config
+
+log = logging.getLogger(__name__)
+
+
+class VCACollector(BaseCollector):
+    def __init__(self):
+        cfg = Config.instance()
+        self.common_db = CommonDbClient()
+        self.loop = asyncio.get_event_loop()
+        self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET)
+
+    def collect(self, vnfr: dict, queue: Queue):
+        vca_model_name = 'default'
+        nsr_id = vnfr['nsr-id-ref']
+        vnf_member_index = vnfr['member-vnf-index-ref']
+        vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
+        for vdur in vnfr['vdur']:
+            nsr = self.common_db.get_nsr(nsr_id)
+            vdu = next(
+                filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
+            )
+            if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']:
+                vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref'])
+                metrics = self.loop.run_until_complete(self.n2vc.GetMetrics(vca_model_name, vnf_name_vca))
+                log.debug('Metrics: %s', metrics)
+                for metric_list in metrics.values():
+                    for metric in metric_list:
+                        log.debug("Metric: %s", metric)
+                        metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric['key'], float(metric['value']))
+                        queue.put(metric)
+            if 'vnf-configuration' in vnfr and 'metrics' in vnfr['vnf-configuration']:
+                vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref'])
+                metrics = self.loop.run_until_complete(self.n2vc.GetMetrics(vca_model_name, vnf_name_vca))
+                log.debug('Metrics: %s', metrics)
+                for metric_list in metrics.values():
+                    for metric in metric_list:
+                        log.debug("Metric: %s", metric)
+                        metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric['key'], float(metric['value']))
+                        queue.put(metric)
+            # TODO (diazb): Implement vnf-configuration config
diff --git a/osm_mon/collector/collectors/openstack.py b/osm_mon/collector/collectors/openstack.py
new file mode 100644 (file)
index 0000000..811014d
--- /dev/null
@@ -0,0 +1,113 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import datetime
+import json
+import logging
+import multiprocessing
+import gnocchiclient.exceptions
+
+from gnocchiclient.v1 import client as gnocchi_client
+from keystoneauth1 import session
+from keystoneauth1.identity import v3
+
+from osm_mon.collector.collectors.base_vim import BaseVimCollector
+from osm_mon.collector.metric import Metric
+from osm_mon.common.common_db_client import CommonDbClient
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.settings import Config
+
+log = logging.getLogger(__name__)
+
+METRIC_MAPPINGS = {
+    "average_memory_utilization": "memory.usage",
+    "disk_read_ops": "disk.read.requests",
+    "disk_write_ops": "disk.write.requests",
+    "disk_read_bytes": "disk.read.bytes",
+    "disk_write_bytes": "disk.write.bytes",
+    "packets_dropped": "interface.if_dropped",
+    "packets_received": "interface.if_packets",
+    "packets_sent": "interface.if_packets",
+    "cpu_utilization": "cpu_util",
+}
+
+
+class OpenstackCollector(BaseVimCollector):
+    def __init__(self, vim_account_id: str):
+        super().__init__(vim_account_id)
+        self.common_db = CommonDbClient()
+        self.auth_manager = AuthManager()
+        self.granularity = self._get_granularity(vim_account_id)
+        self.gnocchi_client = self._build_gnocchi_client(vim_account_id)
+
+    def _get_resource_uuid(self, nsr_id, vnf_member_index, vdur_name) -> str:
+        vdur = self.common_db.get_vdur(nsr_id, vnf_member_index, vdur_name)
+        return vdur['vim-id']
+
+    def _build_gnocchi_client(self, vim_account_id: str) -> gnocchi_client.Client:
+        creds = self.auth_manager.get_credentials(vim_account_id)
+        verify_ssl = self.auth_manager.is_verify_ssl(vim_account_id)
+        auth = v3.Password(auth_url=creds.url,
+                           username=creds.user,
+                           password=creds.password,
+                           project_name=creds.tenant_name,
+                           project_domain_id='default',
+                           user_domain_id='default')
+        sess = session.Session(auth=auth, verify=verify_ssl)
+        return gnocchi_client.Client(session=sess)
+
+    def _get_granularity(self, vim_account_id: str):
+        creds = self.auth_manager.get_credentials(vim_account_id)
+        vim_config = json.loads(creds.config)
+        if 'granularity' in vim_config:
+            return int(vim_config['granularity'])
+        else:
+            cfg = Config.instance()
+            return cfg.OS_DEFAULT_GRANULARITY
+
+    def collect(self, vnfr: dict, queue: multiprocessing.Queue):
+        nsr_id = vnfr['nsr-id-ref']
+        vnf_member_index = vnfr['member-vnf-index-ref']
+        vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
+        for vdur in vnfr['vdur']:
+            # This avoids errors when vdur records have not been completely filled
+            if 'name' not in vdur:
+                return None
+            vdu = next(
+                filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
+            )
+            if 'monitoring-param' in vdu:
+                for param in vdu['monitoring-param']:
+                    metric_name = param['nfvi-metric']
+                    gnocchi_metric_name = METRIC_MAPPINGS[metric_name]
+                    start_date = datetime.datetime.now() - datetime.timedelta(seconds=self.granularity)
+                    resource_id = self._get_resource_uuid(nsr_id, vnf_member_index, vdur['name'])
+                    try:
+                        metrics = self.gnocchi_client.metric.get_measures(gnocchi_metric_name,
+                                                                          start=start_date,
+                                                                          resource_id=resource_id,
+                                                                          granularity=self.granularity)
+                        if len(metrics):
+                            metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric_name, metrics[-1][2])
+                            queue.put(metric)
+                    except gnocchiclient.exceptions.NotFound as e:
+                        log.debug("No metric found: %s", e)
+                        pass
diff --git a/osm_mon/collector/metric.py b/osm_mon/collector/metric.py
new file mode 100644 (file)
index 0000000..e8da11b
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
+
+class Metric:
+    def __init__(self, nsr_id, vnf_member_index, vdur_name, name, value):
+        self.nsr_id = nsr_id
+        self.vnf_member_index = vnf_member_index
+        self.vdur_name = vdur_name
+        self.name = name
+        self.value = value
diff --git a/osm_mon/collector/prometheus_exporter.py b/osm_mon/collector/prometheus_exporter.py
deleted file mode 100644 (file)
index 2bb2e27..0000000
+++ /dev/null
@@ -1,90 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-import asyncio
-import logging
-import threading
-import time
-
-from prometheus_client import start_http_server
-from prometheus_client.core import REGISTRY
-
-from osm_mon.collector.collector import MonCollector
-from osm_mon.core.settings import Config
-
-log = logging.getLogger(__name__)
-
-
-class MonPrometheusExporter:
-
-    def __init__(self):
-        self.custom_collector = CustomCollector()
-
-    def _run_exporter(self):
-        log.debug('_run_exporter')
-        REGISTRY.register(self.custom_collector)
-        log.info("Starting MON Prometheus exporter at port %s", 8000)
-        start_http_server(8000)
-
-    def run(self):
-        log.debug('_run')
-        collector_thread = threading.Thread(target=self._run_collector)
-        collector_thread.setDaemon(True)
-        collector_thread.start()
-        exporter_thread = threading.Thread(target=self._run_exporter)
-        exporter_thread.setDaemon(True)
-        exporter_thread.start()
-        collector_thread.join()
-        exporter_thread.join()
-
-    def _run_collector(self):
-        log.debug('_run_collector')
-        asyncio.set_event_loop(asyncio.new_event_loop())
-        mon_collector = MonCollector()
-        cfg = Config.instance()
-        while True:
-            try:
-                log.debug('_run_collector_loop')
-                metrics = asyncio.get_event_loop().run_until_complete(mon_collector.collect_metrics())
-                self.custom_collector.metrics = metrics
-                time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL)
-            except Exception:
-                log.exception("Error collecting metrics")
-
-
-class CustomCollector(object):
-
-    def __init__(self):
-        self.mon_collector = MonCollector()
-        self.metrics = []
-
-    def describe(self):
-        log.debug('describe')
-        return []
-
-    def collect(self):
-        log.debug("collect")
-        return self.metrics
-
-
-if __name__ == '__main__':
-    MonPrometheusExporter().run()
index e69de29..971f4e9 100644 (file)
@@ -0,0 +1,21 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
index c6237ee..71d1306 100644 (file)
@@ -39,12 +39,12 @@ class CommonDbClient:
                                       {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
         return vnfr
 
-    def get_vnfrs(self, nsr_id: str):
-        return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in
-                self.get_nsr(nsr_id)['nsd']['constituent-vnfd']]
-
-    def get_vnfrs(self):
-        return self.common_db.get_list('vnfrs')
+    def get_vnfrs(self, nsr_id: str = None):
+        if nsr_id:
+            return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in
+                    self.get_nsr(nsr_id)['nsd']['constituent-vnfd']]
+        else:
+            return self.common_db.get_list('vnfrs')
 
     def get_vnfd(self, vnfd_id: str):
         vnfr = self.common_db.get_one("vnfds",
index 1ccf936..c0a9dd0 100644 (file)
@@ -10,6 +10,6 @@ class Consumer(KafkaConsumer):
         super().__init__(bootstrap_servers=cfg.BROKER_URI,
                          key_deserializer=bytes.decode,
                          value_deserializer=bytes.decode,
-                         max_poll_interval_ms=900000,
+                         max_poll_interval_ms=180000,
                          group_id=group_id,
                          **kwargs)
index 978c957..a7599cc 100644 (file)
@@ -63,7 +63,7 @@ class Config(object):
         CfgParam('MONGO_URI', "mongo:27017", six.text_type),
         CfgParam('DATABASE', "sqlite:///mon_sqlite.db", six.text_type),
         CfgParam('OS_NOTIFIER_URI', "http://localhost:8662", six.text_type),
-        CfgParam('OS_DEFAULT_GRANULARITY', "300", six.text_type),
+        CfgParam('OS_DEFAULT_GRANULARITY', 300, int),
         CfgParam('REQUEST_TIMEOUT', 10, int),
         CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type),
         CfgParam('OSMMON_KAFKA_LOG_LEVEL', "WARN", six.text_type),
index 91dc402..9736da8 100644 (file)
@@ -384,10 +384,7 @@ class OpenstackMetricHandler(object):
         # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
         stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
         end_time = int(round(time.time() * 1000))
-        if collection_unit == 'YEAR':
-            diff = PERIOD_MS[collection_unit]
-        else:
-            diff = collection_period * PERIOD_MS[collection_unit]
+        diff = collection_period * PERIOD_MS[collection_unit]
         s_time = (end_time - diff) / 1000.0
         start_time = datetime.datetime.fromtimestamp(s_time).strftime(
             '%Y-%m-%dT%H:%M:%S.%f')
index 2a5491f..6c3df42 100644 (file)
@@ -12,6 +12,7 @@
 
 #         http://www.apache.org/licenses/LICENSE-2.0
 
+import multiprocessing
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-import asyncio
-import random
 import unittest
 from unittest import mock
 
-from osm_mon.collector.collector import MonCollector
+from osm_mon.collector.collector import Collector
+from osm_mon.collector.collectors.juju import VCACollector
+from osm_mon.collector.collectors.openstack import OpenstackCollector
+from osm_mon.core.database import VimCredentials
 
 
-class MonCollectorTest(unittest.TestCase):
+class CollectorTest(unittest.TestCase):
     def setUp(self):
         super().setUp()
-        self.loop = asyncio.new_event_loop()
-        asyncio.set_event_loop(None)
-
-    @mock.patch.object(random, 'randint')
-    def test_generate_read_metric_payload(self, randint):
-        randint.return_value = 1
-        metric_name = 'cpu_utilization'
-        nsr_id = 'test_id'
-        vdu_name = 'test_vdu'
-        vnf_member_index = 1
-        expected_payload = {
-            'correlation_id': 1,
-            'metric_name': metric_name,
-            'ns_id': nsr_id,
-            'vnf_member_index': vnf_member_index,
-            'vdu_name': vdu_name,
-            'collection_period': 1,
-            'collection_unit': 'DAY',
-        }
-        result = MonCollector._generate_read_metric_payload(metric_name, nsr_id, vdu_name, vnf_member_index)
-        self.assertEqual(result, expected_payload)
+
+    @mock.patch("osm_mon.collector.collector.CommonDbClient", autospec=True)
+    def test_get_vim_id(self, common_db):
+        common_db.return_value.get_vnfr.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+                                                        '_admin': {
+                                                            'projects_read': ['admin'], 'created': 1526044312.102287,
+                                                            'modified': 1526044312.102287, 'projects_write': ['admin']
+                                                        },
+                                                        'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
+                                                        'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
+                                                        'vdur': [
+                                                            {
+                                                                'internal-connection-point': [],
+                                                                'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
+                                                                'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
+                                                                'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+                                                                'name': 'ubuntuvnf_vnfd-VM'
+                                                            }
+                                                        ],
+                                                        'vnfd-ref': 'ubuntuvnf_vnfd',
+                                                        'member-vnf-index-ref': '1',
+                                                        'created-time': 1526044312.0999322,
+                                                        'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+                                                        'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
+        collector = Collector()
+        vim_account_id = collector._get_vim_account_id('5ec3f571-d540-4cb0-9992-971d1b08312e', 1)
+        self.assertEqual(vim_account_id, 'c1740601-7287-48c8-a2c9-bce8fee459eb')
+
+    @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
+    @mock.patch("osm_mon.collector.collector.DatabaseManager", autospec=True)
+    def test_get_vim_type(self, database_manager):
+        mock_creds = VimCredentials()
+        mock_creds.id = 'test_id'
+        mock_creds.user = 'user'
+        mock_creds.url = 'url'
+        mock_creds.password = 'password'
+        mock_creds.tenant_name = 'tenant_name'
+        mock_creds.type = 'openstack'
+
+        database_manager.return_value.get_credentials.return_value = mock_creds
+        collector = Collector()
+        vim_type = collector._get_vim_type('test_id')
+        self.assertEqual(vim_type, 'openstack')
+
+    @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
+    @mock.patch.object(OpenstackCollector, "__init__", lambda *args, **kwargs: None)
+    @mock.patch.object(OpenstackCollector, "collect")
+    @mock.patch.object(Collector, "_get_vim_type")
+    def test_init_vim_collector_and_collect_openstack(self, _get_vim_type, collect):
+        _get_vim_type.return_value = 'openstack'
+        collector = Collector()
+        queue = multiprocessing.Queue()
+        collector._init_vim_collector_and_collect({}, 'test_vim_account_id', queue)
+        collect.assert_called_once_with({}, queue)
+
+    @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
+    @mock.patch.object(OpenstackCollector, "collect")
+    @mock.patch.object(Collector, "_get_vim_type")
+    def test_init_vim_collector_and_collect_unknown(self, _get_vim_type, openstack_collect):
+        _get_vim_type.return_value = 'unknown'
+        collector = Collector()
+        queue = multiprocessing.Queue()
+        collector._init_vim_collector_and_collect({}, 'test_vim_account_id', queue)
+        openstack_collect.assert_not_called()
+
+    @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
+    @mock.patch("osm_mon.collector.collector.VCACollector", autospec=True)
+    def test_init_vca_collector_and_collect(self, vca_collector):
+        collector = Collector()
+        queue = multiprocessing.Queue()
+        collector._init_vca_collector_and_collect({}, queue)
+        vca_collector.assert_called_once_with()
+        vca_collector.return_value.collect.assert_called_once_with({}, queue)
index 67486e7..aeacd7b 100644 (file)
@@ -92,7 +92,7 @@ class TestAlarming(unittest.TestCase):
         self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {}, True)
         payload = {"name": "disk_write_ops",
                    "gnocchi_resources_threshold_rule": {"resource_type": "generic", "comparison_operator": "gt",
-                                                        "granularity": "300", "metric": "disk.write.requests",
+                                                        "granularity": 300, "metric": "disk.write.requests",
                                                         "aggregation_method": "mean", "threshold": 60,
                                                         "resource_id": "my_r_id"},
                    "alarm_actions": ["http://localhost:8662"], "repeat_actions": True, "state": "ok", "type": "gnocchi_resources_threshold",
index f9b4407..e0dba98 100644 (file)
@@ -34,5 +34,6 @@ bottle==0.12.*
 peewee==3.1.*
 pyyaml==3.*
 prometheus_client==0.4.*
+gnocchiclient==7.0.*
 git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
 git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc
\ No newline at end of file
index 1015de4..3646610 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -68,13 +68,15 @@ setup(
         "bottle==0.12.*",
         "peewee==3.1.*",
         "pyyaml==3.*",
+        "prometheus_client==0.4.*",
+        "gnocchiclient==7.0.*",
         "osm-common",
         "n2vc"
     ],
     include_package_data=True,
     entry_points={
         "console_scripts": [
-            "osm-mon-prometheus-exporter = osm_mon.cmd.mon_prometheus_exporter:main",
+            "osm-mon-collector = osm_mon.cmd.mon_collector:main",
         ]
     },
     dependency_links=[