pip3 install peewee==3.1.*
pip3 install pyyaml==3.*
pip3 install prometheus_client==0.4.*
+pip3 install gnocchiclient==7.0.*
echo "Installation of python dependencies finished"
\ No newline at end of file
/bin/bash /mon/osm_mon/plugins/vRealiseOps/vROPs_Webservice/install.sh
python3 /mon/osm_mon/plugins/OpenStack/Aodh/notifier.py &
python3 /mon/osm_mon/core/message_bus/common_consumer.py &
-osm-mon-prometheus-exporter
+osm-mon-collector
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import logging
+import sys
+
+from osm_mon.collector.collector import Collector
+from osm_mon.core.settings import Config
+
+
+def main():
+ cfg = Config.instance()
+
+ root = logging.getLogger()
+ root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+ ch = logging.StreamHandler(sys.stdout)
+ ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
+ ch.setFormatter(formatter)
+ root.addHandler(ch)
+
+ kafka_logger = logging.getLogger('kafka')
+ kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+
+ log = logging.getLogger(__name__)
+ log.info("Starting MON Collector...")
+ log.info("Config: %s", vars(cfg))
+ collector = Collector()
+ collector.init_plugins()
+ collector.collect_forever()
+
+
+if __name__ == '__main__':
+ main()
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-import logging
-import sys
-
-from osm_mon.core.settings import Config
-from osm_mon.collector.prometheus_exporter import MonPrometheusExporter
-
-
-def main():
- cfg = Config.instance()
-
- root = logging.getLogger()
- root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
- ch = logging.StreamHandler(sys.stdout)
- ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
- ch.setFormatter(formatter)
- root.addHandler(ch)
-
- kafka_logger = logging.getLogger('kafka')
- kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
- log = logging.getLogger(__name__)
- log.info("Starting MON Prometheus Exporter...")
- log.info("Config: %s", vars(cfg))
- exporter = MonPrometheusExporter()
- exporter.run()
-
-
-if __name__ == '__main__':
- main()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
\ No newline at end of file
+##
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
+
+class BaseBackend:
+ def handle(self, metrics: list):
+ pass
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import logging
+from typing import List
+
+from prometheus_client import start_http_server
+from prometheus_client.core import REGISTRY, GaugeMetricFamily
+
+from osm_mon.collector.backends.base import BaseBackend
+from osm_mon.collector.metric import Metric
+
+log = logging.getLogger(__name__)
+
+OSM_METRIC_PREFIX = 'osm_'
+
+
+class PrometheusBackend(BaseBackend):
+
+ def __init__(self):
+ self.custom_collector = CustomCollector()
+ self._start_exporter(8000)
+
+ def handle(self, metrics: List[Metric]):
+ prometheus_metrics = []
+ for metric in metrics:
+ prometheus_metric = GaugeMetricFamily(
+ OSM_METRIC_PREFIX + metric.name,
+ 'OSM metric',
+ labels=['ns_id', 'vnf_member_index', 'vdu_name']
+ )
+ prometheus_metric.add_metric([metric.nsr_id, metric.vnf_member_index, metric.vdur_name], metric.value)
+ prometheus_metrics.append(prometheus_metric)
+ self.custom_collector.metrics = prometheus_metrics
+
+ def _start_exporter(self, port):
+ log.debug('_start_exporter')
+ REGISTRY.register(self.custom_collector)
+ log.info("Starting MON Prometheus exporter at port %s", port)
+ start_http_server(port)
+
+
+class CustomCollector(object):
+
+ def __init__(self):
+ self.metrics = []
+
+ def describe(self):
+ log.debug('describe')
+ return []
+
+ def collect(self):
+ log.debug("collect")
+ return self.metrics
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import json
import logging
-import random
-import uuid
-
-from n2vc.vnf import N2VC
-from prometheus_client.core import GaugeMetricFamily
+import multiprocessing
+import time
+from osm_mon.collector.backends.prometheus import PrometheusBackend
+from osm_mon.collector.collectors.juju import VCACollector
+from osm_mon.collector.collectors.openstack import OpenstackCollector
from osm_mon.common.common_db_client import CommonDbClient
-from osm_mon.core.message_bus.consumer import Consumer
-from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.database import DatabaseManager
from osm_mon.core.settings import Config
log = logging.getLogger(__name__)
+VIM_COLLECTORS = {
+ "openstack": OpenstackCollector
+}
+
-class MonCollector:
+class Collector:
def __init__(self):
- cfg = Config.instance()
- self.kafka_server = cfg.BROKER_URI
- self.common_db_client = CommonDbClient()
- self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET)
+ self.common_db = CommonDbClient()
self.producer_timeout = 5
+ self.consumer_timeout = 5
+ self.plugins = []
+ self.database_manager = DatabaseManager()
+ self.database_manager.create_tables()
+
+ def init_plugins(self):
+ prometheus_plugin = PrometheusBackend()
+ self.plugins.append(prometheus_plugin)
+
+ def collect_forever(self):
+ log.debug('collect_forever')
+ cfg = Config.instance()
+ while True:
+ try:
+ self.collect_metrics()
+ time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL)
+ except Exception:
+ log.exception("Error collecting metrics")
+
+ def _get_vim_account_id(self, nsr_id: str, vnf_member_index: int) -> str:
+ vnfr = self.common_db.get_vnfr(nsr_id, vnf_member_index)
+ return vnfr['vim-account-id']
+
+ def _get_vim_type(self, vim_account_id):
+ """Get the vim type that is required by the message."""
+ credentials = self.database_manager.get_credentials(vim_account_id)
+ return credentials.type
+
+ def _init_vim_collector_and_collect(self, vnfr: dict, vim_account_id: str, queue: multiprocessing.Queue):
+ # TODO(diazb) Add support for vrops and aws
+ vim_type = self._get_vim_type(vim_account_id)
+ if vim_type in VIM_COLLECTORS:
+ collector = VIM_COLLECTORS[vim_type](vim_account_id)
+ collector.collect(vnfr, queue)
+ else:
+ log.debug("vimtype %s is not supported.", vim_type)
+
+ def _init_vca_collector_and_collect(self, vnfr: dict, queue: multiprocessing.Queue):
+ vca_collector = VCACollector()
+ vca_collector.collect(vnfr, queue)
- async def collect_metrics(self):
- """
- Collects vdu metrics. These can be vim and/or n2vc metrics.
- It checks for monitoring-params or metrics inside vdu section of vnfd, then collects the metric accordingly.
- If vim related, it sends a metric read request through Kafka, to be handled by mon-proxy.
- If n2vc related, it uses the n2vc client to obtain the readings.
- :return: lists of metrics
- """
- # TODO(diazb): Remove dependencies on prometheus_client
- log.debug("collect_metrics")
- producer = Producer()
- consumer = Consumer('mon-collector-' + str(uuid.uuid4()),
- consumer_timeout_ms=10000,
- enable_auto_commit=False)
- consumer.subscribe(['metric_response'])
- metrics = {}
- vnfrs = self.common_db_client.get_vnfrs()
- vca_model_name = 'default'
+ def collect_metrics(self):
+ queue = multiprocessing.Queue()
+ vnfrs = self.common_db.get_vnfrs()
+ processes = []
for vnfr in vnfrs:
nsr_id = vnfr['nsr-id-ref']
- nsr = self.common_db_client.get_nsr(nsr_id)
- vnfd = self.common_db_client.get_vnfd(vnfr['vnfd-id'])
- for vdur in vnfr['vdur']:
- # This avoids errors when vdur records have not been completely filled
- if 'name' not in vdur:
- continue
- vdu = next(
- filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
- )
- vnf_member_index = vnfr['member-vnf-index-ref']
- vdu_name = vdur['name']
- if 'monitoring-param' in vdu:
- for param in vdu['monitoring-param']:
- metric_name = param['nfvi-metric']
- payload = self._generate_read_metric_payload(metric_name,
- nsr_id,
- vdu_name,
- vnf_member_index)
- producer.send(topic='metric_request', key='read_metric_data_request',
- value=json.dumps(payload))
- producer.flush(self.producer_timeout)
- for message in consumer:
- if message.key == 'read_metric_data_response':
- content = json.loads(message.value)
- if content['correlation_id'] == payload['correlation_id']:
- log.debug("Found read_metric_data_response with same correlation_id")
- if len(content['metrics_data']['metrics_series']):
- metric_reading = content['metrics_data']['metrics_series'][-1]
- if metric_name not in metrics.keys():
- metrics[metric_name] = GaugeMetricFamily(
- metric_name,
- 'OSM metric',
- labels=['ns_id', 'vnf_member_index', 'vdu_name']
- )
- metrics[metric_name].add_metric([nsr_id, vnf_member_index, vdu_name],
- metric_reading)
- break
- if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']:
- vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref'])
- vnf_metrics = await self.n2vc.GetMetrics(vca_model_name, vnf_name_vca)
- log.debug('VNF Metrics: %s', vnf_metrics)
- for vnf_metric_list in vnf_metrics.values():
- for vnf_metric in vnf_metric_list:
- log.debug("VNF Metric: %s", vnf_metric)
- if vnf_metric['key'] not in metrics.keys():
- metrics[vnf_metric['key']] = GaugeMetricFamily(
- vnf_metric['key'],
- 'OSM metric',
- labels=['ns_id', 'vnf_member_index', 'vdu_name']
- )
- metrics[vnf_metric['key']].add_metric([nsr_id, vnf_member_index, vdu_name],
- float(vnf_metric['value']))
- consumer.close()
- producer.close(self.producer_timeout)
- log.debug("metric.values = %s", metrics.values())
- return metrics.values()
-
- @staticmethod
- def _generate_read_metric_payload(metric_name, nsr_id, vdu_name, vnf_member_index) -> dict:
- """
- Builds JSON payload for asking for a metric measurement in MON. It follows the model defined in core.models.
- :param metric_name: OSM metric name (e.g.: cpu_utilization)
- :param nsr_id: NSR ID
- :param vdu_name: Vdu name according to the vdur
- :param vnf_member_index: Index of the VNF in the NS according to the vnfr
- :return: JSON payload as dict
- """
- cor_id = random.randint(1, 10e7)
- payload = {
- 'correlation_id': cor_id,
- 'metric_name': metric_name,
- 'ns_id': nsr_id,
- 'vnf_member_index': vnf_member_index,
- 'vdu_name': vdu_name,
- 'collection_period': 1,
- 'collection_unit': 'DAY',
- }
- return payload
+ vnf_member_index = vnfr['member-vnf-index-ref']
+ vim_account_id = self._get_vim_account_id(nsr_id, vnf_member_index)
+ p = multiprocessing.Process(target=self._init_vim_collector_and_collect,
+ args=(vnfr, vim_account_id, queue))
+ processes.append(p)
+ p.start()
+ p = multiprocessing.Process(target=self._init_vca_collector_and_collect,
+ args=(vnfr, queue))
+ processes.append(p)
+ p.start()
+ for process in processes:
+ process.join()
+ metrics = []
+ while not queue.empty():
+ metrics.append(queue.get())
+ for plugin in self.plugins:
+ plugin.handle(metrics)
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+from multiprocessing import Queue
+
+
+class BaseCollector:
+ def collect(self, vnfr: dict, queue: Queue):
+ pass
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+from multiprocessing import Queue
+
+
+class BaseVimCollector:
+ def __init__(self, vim_account_id: str):
+ pass
+
+ def collect(self, vnfr: dict, queue: Queue):
+ pass
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+import logging
+from multiprocessing import Queue
+
+from n2vc.vnf import N2VC
+
+from osm_mon.collector.collectors.base import BaseCollector
+from osm_mon.collector.metric import Metric
+from osm_mon.common.common_db_client import CommonDbClient
+from osm_mon.core.settings import Config
+
+log = logging.getLogger(__name__)
+
+
+class VCACollector(BaseCollector):
+ def __init__(self):
+ cfg = Config.instance()
+ self.common_db = CommonDbClient()
+ self.loop = asyncio.get_event_loop()
+ self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET)
+
+ def collect(self, vnfr: dict, queue: Queue):
+ vca_model_name = 'default'
+ nsr_id = vnfr['nsr-id-ref']
+ vnf_member_index = vnfr['member-vnf-index-ref']
+ vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
+ for vdur in vnfr['vdur']:
+ nsr = self.common_db.get_nsr(nsr_id)
+ vdu = next(
+ filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
+ )
+ if 'vdu-configuration' in vdu and 'metrics' in vdu['vdu-configuration']:
+ vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref'])
+ metrics = self.loop.run_until_complete(self.n2vc.GetMetrics(vca_model_name, vnf_name_vca))
+ log.debug('Metrics: %s', metrics)
+ for metric_list in metrics.values():
+ for metric in metric_list:
+ log.debug("Metric: %s", metric)
+ metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric['key'], float(metric['value']))
+ queue.put(metric)
+ if 'vnf-configuration' in vnfr and 'metrics' in vnfr['vnf-configuration']:
+ vnf_name_vca = self.n2vc.FormatApplicationName(nsr['name'], vnf_member_index, vdur['vdu-id-ref'])
+ metrics = self.loop.run_until_complete(self.n2vc.GetMetrics(vca_model_name, vnf_name_vca))
+ log.debug('Metrics: %s', metrics)
+ for metric_list in metrics.values():
+ for metric in metric_list:
+ log.debug("Metric: %s", metric)
+ metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric['key'], float(metric['value']))
+ queue.put(metric)
+ # TODO (diazb): Implement vnf-configuration config
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import datetime
+import json
+import logging
+import multiprocessing
+import gnocchiclient.exceptions
+
+from gnocchiclient.v1 import client as gnocchi_client
+from keystoneauth1 import session
+from keystoneauth1.identity import v3
+
+from osm_mon.collector.collectors.base_vim import BaseVimCollector
+from osm_mon.collector.metric import Metric
+from osm_mon.common.common_db_client import CommonDbClient
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.settings import Config
+
+log = logging.getLogger(__name__)
+
+METRIC_MAPPINGS = {
+ "average_memory_utilization": "memory.usage",
+ "disk_read_ops": "disk.read.requests",
+ "disk_write_ops": "disk.write.requests",
+ "disk_read_bytes": "disk.read.bytes",
+ "disk_write_bytes": "disk.write.bytes",
+ "packets_dropped": "interface.if_dropped",
+ "packets_received": "interface.if_packets",
+ "packets_sent": "interface.if_packets",
+ "cpu_utilization": "cpu_util",
+}
+
+
+class OpenstackCollector(BaseVimCollector):
+ def __init__(self, vim_account_id: str):
+ super().__init__(vim_account_id)
+ self.common_db = CommonDbClient()
+ self.auth_manager = AuthManager()
+ self.granularity = self._get_granularity(vim_account_id)
+ self.gnocchi_client = self._build_gnocchi_client(vim_account_id)
+
+ def _get_resource_uuid(self, nsr_id, vnf_member_index, vdur_name) -> str:
+ vdur = self.common_db.get_vdur(nsr_id, vnf_member_index, vdur_name)
+ return vdur['vim-id']
+
+ def _build_gnocchi_client(self, vim_account_id: str) -> gnocchi_client.Client:
+ creds = self.auth_manager.get_credentials(vim_account_id)
+ verify_ssl = self.auth_manager.is_verify_ssl(vim_account_id)
+ auth = v3.Password(auth_url=creds.url,
+ username=creds.user,
+ password=creds.password,
+ project_name=creds.tenant_name,
+ project_domain_id='default',
+ user_domain_id='default')
+ sess = session.Session(auth=auth, verify=verify_ssl)
+ return gnocchi_client.Client(session=sess)
+
+ def _get_granularity(self, vim_account_id: str):
+ creds = self.auth_manager.get_credentials(vim_account_id)
+ vim_config = json.loads(creds.config)
+ if 'granularity' in vim_config:
+ return int(vim_config['granularity'])
+ else:
+ cfg = Config.instance()
+ return cfg.OS_DEFAULT_GRANULARITY
+
+ def collect(self, vnfr: dict, queue: multiprocessing.Queue):
+ nsr_id = vnfr['nsr-id-ref']
+ vnf_member_index = vnfr['member-vnf-index-ref']
+ vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
+ for vdur in vnfr['vdur']:
+ # This avoids errors when vdur records have not been completely filled
+ if 'name' not in vdur:
+ return None
+ vdu = next(
+ filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
+ )
+ if 'monitoring-param' in vdu:
+ for param in vdu['monitoring-param']:
+ metric_name = param['nfvi-metric']
+ gnocchi_metric_name = METRIC_MAPPINGS[metric_name]
+ start_date = datetime.datetime.now() - datetime.timedelta(seconds=self.granularity)
+ resource_id = self._get_resource_uuid(nsr_id, vnf_member_index, vdur['name'])
+ try:
+ metrics = self.gnocchi_client.metric.get_measures(gnocchi_metric_name,
+ start=start_date,
+ resource_id=resource_id,
+ granularity=self.granularity)
+ if len(metrics):
+ metric = Metric(nsr_id, vnf_member_index, vdur['name'], metric_name, metrics[-1][2])
+ queue.put(metric)
+ except gnocchiclient.exceptions.NotFound as e:
+ log.debug("No metric found: %s", e)
+ pass
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
+
+class Metric:
+ def __init__(self, nsr_id, vnf_member_index, vdur_name, name, value):
+ self.nsr_id = nsr_id
+ self.vnf_member_index = vnf_member_index
+ self.vdur_name = vdur_name
+ self.name = name
+ self.value = value
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-import asyncio
-import logging
-import threading
-import time
-
-from prometheus_client import start_http_server
-from prometheus_client.core import REGISTRY
-
-from osm_mon.collector.collector import MonCollector
-from osm_mon.core.settings import Config
-
-log = logging.getLogger(__name__)
-
-
-class MonPrometheusExporter:
-
- def __init__(self):
- self.custom_collector = CustomCollector()
-
- def _run_exporter(self):
- log.debug('_run_exporter')
- REGISTRY.register(self.custom_collector)
- log.info("Starting MON Prometheus exporter at port %s", 8000)
- start_http_server(8000)
-
- def run(self):
- log.debug('_run')
- collector_thread = threading.Thread(target=self._run_collector)
- collector_thread.setDaemon(True)
- collector_thread.start()
- exporter_thread = threading.Thread(target=self._run_exporter)
- exporter_thread.setDaemon(True)
- exporter_thread.start()
- collector_thread.join()
- exporter_thread.join()
-
- def _run_collector(self):
- log.debug('_run_collector')
- asyncio.set_event_loop(asyncio.new_event_loop())
- mon_collector = MonCollector()
- cfg = Config.instance()
- while True:
- try:
- log.debug('_run_collector_loop')
- metrics = asyncio.get_event_loop().run_until_complete(mon_collector.collect_metrics())
- self.custom_collector.metrics = metrics
- time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL)
- except Exception:
- log.exception("Error collecting metrics")
-
-
-class CustomCollector(object):
-
- def __init__(self):
- self.mon_collector = MonCollector()
- self.metrics = []
-
- def describe(self):
- log.debug('describe')
- return []
-
- def collect(self):
- log.debug("collect")
- return self.metrics
-
-
-if __name__ == '__main__':
- MonPrometheusExporter().run()
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
{"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
return vnfr
- def get_vnfrs(self, nsr_id: str):
- return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in
- self.get_nsr(nsr_id)['nsd']['constituent-vnfd']]
-
- def get_vnfrs(self):
- return self.common_db.get_list('vnfrs')
+ def get_vnfrs(self, nsr_id: str = None):
+ if nsr_id:
+ return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in
+ self.get_nsr(nsr_id)['nsd']['constituent-vnfd']]
+ else:
+ return self.common_db.get_list('vnfrs')
def get_vnfd(self, vnfd_id: str):
vnfr = self.common_db.get_one("vnfds",
super().__init__(bootstrap_servers=cfg.BROKER_URI,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
- max_poll_interval_ms=900000,
+ max_poll_interval_ms=180000,
group_id=group_id,
**kwargs)
CfgParam('MONGO_URI', "mongo:27017", six.text_type),
CfgParam('DATABASE', "sqlite:///mon_sqlite.db", six.text_type),
CfgParam('OS_NOTIFIER_URI', "http://localhost:8662", six.text_type),
- CfgParam('OS_DEFAULT_GRANULARITY', "300", six.text_type),
+ CfgParam('OS_DEFAULT_GRANULARITY', 300, int),
CfgParam('REQUEST_TIMEOUT', 10, int),
CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type),
CfgParam('OSMMON_KAFKA_LOG_LEVEL', "WARN", six.text_type),
# FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
end_time = int(round(time.time() * 1000))
- if collection_unit == 'YEAR':
- diff = PERIOD_MS[collection_unit]
- else:
- diff = collection_period * PERIOD_MS[collection_unit]
+ diff = collection_period * PERIOD_MS[collection_unit]
s_time = (end_time - diff) / 1000.0
start_time = datetime.datetime.fromtimestamp(s_time).strftime(
'%Y-%m-%dT%H:%M:%S.%f')
# http://www.apache.org/licenses/LICENSE-2.0
+import multiprocessing
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
-import random
import unittest
from unittest import mock
-from osm_mon.collector.collector import MonCollector
+from osm_mon.collector.collector import Collector
+from osm_mon.collector.collectors.juju import VCACollector
+from osm_mon.collector.collectors.openstack import OpenstackCollector
+from osm_mon.core.database import VimCredentials
-class MonCollectorTest(unittest.TestCase):
+class CollectorTest(unittest.TestCase):
def setUp(self):
super().setUp()
- self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
-
- @mock.patch.object(random, 'randint')
- def test_generate_read_metric_payload(self, randint):
- randint.return_value = 1
- metric_name = 'cpu_utilization'
- nsr_id = 'test_id'
- vdu_name = 'test_vdu'
- vnf_member_index = 1
- expected_payload = {
- 'correlation_id': 1,
- 'metric_name': metric_name,
- 'ns_id': nsr_id,
- 'vnf_member_index': vnf_member_index,
- 'vdu_name': vdu_name,
- 'collection_period': 1,
- 'collection_unit': 'DAY',
- }
- result = MonCollector._generate_read_metric_payload(metric_name, nsr_id, vdu_name, vnf_member_index)
- self.assertEqual(result, expected_payload)
+
+ @mock.patch("osm_mon.collector.collector.CommonDbClient", autospec=True)
+ def test_get_vim_id(self, common_db):
+ common_db.return_value.get_vnfr.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+ '_admin': {
+ 'projects_read': ['admin'], 'created': 1526044312.102287,
+ 'modified': 1526044312.102287, 'projects_write': ['admin']
+ },
+ 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
+ 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
+ 'vdur': [
+ {
+ 'internal-connection-point': [],
+ 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
+ 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
+ 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+ 'name': 'ubuntuvnf_vnfd-VM'
+ }
+ ],
+ 'vnfd-ref': 'ubuntuvnf_vnfd',
+ 'member-vnf-index-ref': '1',
+ 'created-time': 1526044312.0999322,
+ 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+ 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
+ collector = Collector()
+ vim_account_id = collector._get_vim_account_id('5ec3f571-d540-4cb0-9992-971d1b08312e', 1)
+ self.assertEqual(vim_account_id, 'c1740601-7287-48c8-a2c9-bce8fee459eb')
+
+ @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
+ @mock.patch("osm_mon.collector.collector.DatabaseManager", autospec=True)
+ def test_get_vim_type(self, database_manager):
+ mock_creds = VimCredentials()
+ mock_creds.id = 'test_id'
+ mock_creds.user = 'user'
+ mock_creds.url = 'url'
+ mock_creds.password = 'password'
+ mock_creds.tenant_name = 'tenant_name'
+ mock_creds.type = 'openstack'
+
+ database_manager.return_value.get_credentials.return_value = mock_creds
+ collector = Collector()
+ vim_type = collector._get_vim_type('test_id')
+ self.assertEqual(vim_type, 'openstack')
+
+ @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
+ @mock.patch.object(OpenstackCollector, "__init__", lambda *args, **kwargs: None)
+ @mock.patch.object(OpenstackCollector, "collect")
+ @mock.patch.object(Collector, "_get_vim_type")
+ def test_init_vim_collector_and_collect_openstack(self, _get_vim_type, collect):
+ _get_vim_type.return_value = 'openstack'
+ collector = Collector()
+ queue = multiprocessing.Queue()
+ collector._init_vim_collector_and_collect({}, 'test_vim_account_id', queue)
+ collect.assert_called_once_with({}, queue)
+
+ @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
+ @mock.patch.object(OpenstackCollector, "collect")
+ @mock.patch.object(Collector, "_get_vim_type")
+ def test_init_vim_collector_and_collect_unknown(self, _get_vim_type, openstack_collect):
+ _get_vim_type.return_value = 'unknown'
+ collector = Collector()
+ queue = multiprocessing.Queue()
+ collector._init_vim_collector_and_collect({}, 'test_vim_account_id', queue)
+ openstack_collect.assert_not_called()
+
+ @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
+ @mock.patch("osm_mon.collector.collector.VCACollector", autospec=True)
+ def test_init_vca_collector_and_collect(self, vca_collector):
+ collector = Collector()
+ queue = multiprocessing.Queue()
+ collector._init_vca_collector_and_collect({}, queue)
+ vca_collector.assert_called_once_with()
+ vca_collector.return_value.collect.assert_called_once_with({}, queue)
self.alarming.configure_alarm(alarm_endpoint, auth_token, values, {}, True)
payload = {"name": "disk_write_ops",
"gnocchi_resources_threshold_rule": {"resource_type": "generic", "comparison_operator": "gt",
- "granularity": "300", "metric": "disk.write.requests",
+ "granularity": 300, "metric": "disk.write.requests",
"aggregation_method": "mean", "threshold": 60,
"resource_id": "my_r_id"},
"alarm_actions": ["http://localhost:8662"], "repeat_actions": True, "state": "ok", "type": "gnocchi_resources_threshold",
peewee==3.1.*
pyyaml==3.*
prometheus_client==0.4.*
+gnocchiclient==7.0.*
git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc
\ No newline at end of file
"bottle==0.12.*",
"peewee==3.1.*",
"pyyaml==3.*",
+ "prometheus_client==0.4.*",
+ "gnocchiclient==7.0.*",
"osm-common",
"n2vc"
],
include_package_data=True,
entry_points={
"console_scripts": [
- "osm-mon-prometheus-exporter = osm_mon.cmd.mon_prometheus_exporter:main",
+ "osm-mon-collector = osm_mon.cmd.mon_collector:main",
]
},
dependency_links=[