X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcollector%2Fcollector.py;h=cc844364e91d749d2b2901fa3d3c0c0bbdb78b51;hb=4de60c537b56d7bf9767a8b3b027793c76bf02b5;hp=5074308600fb9be4ae381104c424098523f91b79;hpb=dcfe2522f51578edc68156bd823c9bc3607e746b;p=osm%2FMON.git diff --git a/osm_mon/collector/collector.py b/osm_mon/collector/collector.py index 5074308..cc84436 100644 --- a/osm_mon/collector/collector.py +++ b/osm_mon/collector/collector.py @@ -21,31 +21,16 @@ # contact: bdiaz@whitestack.com or glavado@whitestack.com ## import logging -import multiprocessing import time import peewee from osm_mon.collector.backends.prometheus import PrometheusBackend -from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector -from osm_mon.collector.vnf_collectors.juju import VCACollector -from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector -from osm_mon.collector.vnf_collectors.vmware import VMwareCollector -from osm_mon.collector.vnf_collectors.vio import VIOCollector -from osm_mon.core.common_db import CommonDbClient +from osm_mon.collector.service import CollectorService from osm_mon.core.config import Config -from osm_mon.core.database import DatabaseManager log = logging.getLogger(__name__) -VIM_COLLECTORS = { - "openstack": OpenstackCollector, - "vmware": VMwareCollector, - "vio": VIOCollector -} -VIM_INFRA_COLLECTORS = { - "openstack": OpenstackInfraCollector -} METRIC_BACKENDS = [ PrometheusBackend ] @@ -54,11 +39,8 @@ METRIC_BACKENDS = [ class Collector: def __init__(self, config: Config): self.conf = config - self.common_db = CommonDbClient(self.conf) - self.plugins = [] - self.database_manager = DatabaseManager(self.conf) - self.database_manager.create_tables() - self.queue = multiprocessing.Queue() + self.service = CollectorService(config) + self.backends = [] self._init_backends() def collect_forever(self): @@ -73,66 +55,11 @@ class Collector: except Exception: log.exception("Error collecting metrics") - def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str): - # TODO(diazb) Add support for aws - database_manager = DatabaseManager(self.conf) - vim_type = database_manager.get_vim_type(vim_account_id) - if vim_type in VIM_COLLECTORS: - collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id) - metrics = collector.collect(vnfr) - for metric in metrics: - self.queue.put(metric) - else: - log.debug("vimtype %s is not supported.", vim_type) - - def _collect_vim_infra_metrics(self, vim_account_id: str): - database_manager = DatabaseManager(self.conf) - vim_type = database_manager.get_vim_type(vim_account_id) - if vim_type in VIM_INFRA_COLLECTORS: - collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id) - metrics = collector.collect() - for metric in metrics: - self.queue.put(metric) - else: - log.debug("vimtype %s is not supported.", vim_type) - - def _collect_vca_metrics(self, vnfr: dict): - log.debug('_collect_vca_metrics') - log.debug('vnfr: %s', vnfr) - vca_collector = VCACollector(self.conf) - metrics = vca_collector.collect(vnfr) - for metric in metrics: - self.queue.put(metric) - def collect_metrics(self): - vnfrs = self.common_db.get_vnfrs() - processes = [] - for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index) - p = multiprocessing.Process(target=self._collect_vim_metrics, - args=(vnfr, vim_account_id)) - processes.append(p) - p.start() - p = multiprocessing.Process(target=self._collect_vca_metrics, - args=(vnfr,)) - processes.append(p) - p.start() - vims = self.common_db.get_vim_accounts() - for vim in vims: - p = multiprocessing.Process(target=self._collect_vim_infra_metrics, - args=(vim['_id'],)) - processes.append(p) - p.start() - for process in processes: - process.join(timeout=10) - metrics = [] - while not self.queue.empty(): - metrics.append(self.queue.get()) - for plugin in self.plugins: - plugin.handle(metrics) + metrics = self.service.collect_metrics() + for backend in self.backends: + backend.handle(metrics) def _init_backends(self): for backend in METRIC_BACKENDS: - self.plugins.append(backend()) + self.backends.append(backend())