X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcollector%2Fcollector.py;h=aad395a15a2eabb3004331680df3c92679e2a5c3;hb=8721c5ce76869719af22fb238a56fec612f76289;hp=016014bf85cbc854c7c57ec885a8b70519158320;hpb=ca663f8468d46ac30d23db369a33390f7538b814;p=osm%2FMON.git diff --git a/osm_mon/collector/collector.py b/osm_mon/collector/collector.py index 016014b..aad395a 100644 --- a/osm_mon/collector/collector.py +++ b/osm_mon/collector/collector.py @@ -24,13 +24,17 @@ import logging import multiprocessing import time +import peewee + from osm_mon.collector.backends.prometheus import PrometheusBackend -from osm_mon.collector.collectors.juju import VCACollector -from osm_mon.collector.collectors.openstack import OpenstackCollector -from osm_mon.collector.collectors.vmware import VMwareCollector +from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector +from osm_mon.collector.metric import Metric +from osm_mon.collector.vnf_collectors.juju import VCACollector +from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector +from osm_mon.collector.vnf_collectors.vmware import VMwareCollector from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager -from osm_mon.core.settings import Config log = logging.getLogger(__name__) @@ -38,48 +42,63 @@ VIM_COLLECTORS = { "openstack": OpenstackCollector, "vmware": VMwareCollector } +VIM_INFRA_COLLECTORS = { + "openstack": OpenstackInfraCollector +} METRIC_BACKENDS = [ PrometheusBackend ] class Collector: - def __init__(self): - self.common_db = CommonDbClient() + def __init__(self, config: Config): + self.conf = config + self.common_db = CommonDbClient(self.conf) self.plugins = [] - self.database_manager = DatabaseManager() + self.database_manager = DatabaseManager(self.conf) self.database_manager.create_tables() self.queue = multiprocessing.Queue() - - def init_backends(self): - for backend in METRIC_BACKENDS: - self.plugins.append(backend()) + self._init_backends() def collect_forever(self): log.debug('collect_forever') - cfg = Config.instance() while True: try: self.collect_metrics() - time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL) + time.sleep(int(self.conf.get('collector', 'interval'))) + except peewee.PeeweeException: + log.exception("Database error consuming message: ") + raise except Exception: log.exception("Error collecting metrics") def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str): # TODO(diazb) Add support for vrops and aws - vim_type = self.database_manager.get_vim_type(vim_account_id) + database_manager = DatabaseManager(self.conf) + vim_type = database_manager.get_vim_type(vim_account_id) if vim_type in VIM_COLLECTORS: - collector = VIM_COLLECTORS[vim_type](vim_account_id) + collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id) metrics = collector.collect(vnfr) for metric in metrics: self.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) + def _collect_vim_infra_metrics(self, vim_account_id: str): + database_manager = DatabaseManager(self.conf) + vim_type = database_manager.get_vim_type(vim_account_id) + if vim_type in VIM_INFRA_COLLECTORS: + collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id) + status = collector.is_vim_ok() + status_metric = Metric({'vim_id': vim_account_id}, 'vim_status', status) + self.queue.put(status_metric) + else: + log.debug("vimtype %s is not supported.", vim_type) + def _collect_vca_metrics(self, vnfr: dict): log.debug('_collect_vca_metrics') log.debug('vnfr: %s', vnfr) - vca_collector = VCACollector() + vca_collector = VCACollector(self.conf) metrics = vca_collector.collect(vnfr) for metric in metrics: self.queue.put(metric) @@ -99,10 +118,20 @@ class Collector: args=(vnfr,)) processes.append(p) p.start() + vims = self.common_db.get_vim_accounts() + for vim in vims: + p = multiprocessing.Process(target=self._collect_vim_infra_metrics, + args=(vim['_id'],)) + processes.append(p) + p.start() for process in processes: - process.join() + process.join(timeout=10) metrics = [] while not self.queue.empty(): metrics.append(self.queue.get()) for plugin in self.plugins: plugin.handle(metrics) + + def _init_backends(self): + for backend in METRIC_BACKENDS: + self.plugins.append(backend())