X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcollector%2Fservice.py;h=9dd16838cdf92c03ac44a7f539ae49bf01decfe1;hb=34a0a0f1b7ccbecfbed40d3e68fb9fd16564052c;hp=7673aedb917bfd62cd96db2c404c18e524603a4e;hpb=d9e56359b64a3934ea5eb7c18bc517c4fc9fa160;p=osm%2FMON.git diff --git a/osm_mon/collector/service.py b/osm_mon/collector/service.py index 7673aed..9dd1683 100644 --- a/osm_mon/collector/service.py +++ b/osm_mon/collector/service.py @@ -20,9 +20,14 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## + +# This version uses a ProcessThreadPoolExecutor to limit the number of processes launched + import logging import multiprocessing from typing import List +import concurrent.futures +import time from osm_mon.collector.infra_collectors.onos import OnosInfraCollector from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector @@ -49,99 +54,141 @@ VIM_INFRA_COLLECTORS = { "vio": VIOInfraCollector } SDN_INFRA_COLLECTORS = { - "onos": OnosInfraCollector + "onosof": OnosInfraCollector, + "onos_vpls": OnosInfraCollector } class CollectorService: + # The processes getting metrics will store the results in this queue + queue = multiprocessing.Queue() + def __init__(self, config: Config): self.conf = config self.common_db = CommonDbClient(self.conf) - self.queue = multiprocessing.Queue() + return + + # static methods to be executed in the Processes + @staticmethod + def _get_vim_type(conf: Config, vim_account_id: str) -> str: + common_db = CommonDbClient(conf) + vim_account = common_db.get_vim_account(vim_account_id) + vim_type = vim_account['vim_type'] + if 'config' in vim_account and 'vim_type' in vim_account['config']: + vim_type = vim_account['config']['vim_type'].lower() + if vim_type == 'vio' and 'vrops_site' not in vim_account['config']: + vim_type = 'openstack' + return vim_type - def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str): + @staticmethod + def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str): # TODO(diazb) Add support for aws - vim_type = self._get_vim_type(vim_account_id) + vim_type = CollectorService._get_vim_type(conf, vim_account_id) + log.debug("vim type.....{}".format(vim_type)) if vim_type in VIM_COLLECTORS: - collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id) + collector = VIM_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect(vnfr) + log.debug("Collecting vim metrics.....{}".format(metrics)) for metric in metrics: - self.queue.put(metric) + pass + CollectorService.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) + return + + @staticmethod + def _collect_vca_metrics(conf: Config, vnfr: dict): + vca_collector = VCACollector(conf) + metrics = vca_collector.collect(vnfr) + log.debug("Collecting vca metrics.....{}".format(metrics)) + for metric in metrics: + CollectorService.queue.put(metric) + return - def _collect_vim_infra_metrics(self, vim_account_id: str): - vim_type = self._get_vim_type(vim_account_id) + @staticmethod + def _collect_vim_infra_metrics(conf: Config, vim_account_id: str): + log.info("Collecting vim infra metrics") + vim_type = CollectorService._get_vim_type(conf, vim_account_id) if vim_type in VIM_INFRA_COLLECTORS: - collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id) + collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect() + log.debug("Collecting vim infra metrics.....{}".format(metrics)) for metric in metrics: - self.queue.put(metric) + CollectorService.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) + return - def _collect_sdnc_infra_metrics(self, sdnc_id: str): - common_db = CommonDbClient(self.conf) + @staticmethod + def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str): + log.info("Collecting sdnc metrics") + common_db = CommonDbClient(conf) sdn_type = common_db.get_sdnc(sdnc_id)['type'] if sdn_type in SDN_INFRA_COLLECTORS: - collector = SDN_INFRA_COLLECTORS[sdn_type](self.conf, sdnc_id) + collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id) metrics = collector.collect() + log.debug("Collecting sdnc metrics.....{}".format(metrics)) for metric in metrics: - self.queue.put(metric) + CollectorService.queue.put(metric) else: log.debug("sdn_type %s is not supported.", sdn_type) - - def _collect_vca_metrics(self, vnfr: dict): - log.debug('_collect_vca_metrics') - log.debug('vnfr: %s', vnfr) - vca_collector = VCACollector(self.conf) - metrics = vca_collector.collect(vnfr) - for metric in metrics: - self.queue.put(metric) + return + + @staticmethod + def _stop_process_pool(executor): + log.info('Stopping all processes in the process pool') + try: + for pid, process in executor._processes.items(): + if process.is_alive(): + process.terminate() + except Exception as e: + log.info("Exception during process termination") + log.debug("Exception %s" % (e)) + executor.shutdown() + return def collect_metrics(self) -> List[Metric]: vnfrs = self.common_db.get_vnfrs() - processes = [] - for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index) - p = multiprocessing.Process(target=self._collect_vim_metrics, - args=(vnfr, vim_account_id)) - processes.append(p) - p.start() - p = multiprocessing.Process(target=self._collect_vca_metrics, - args=(vnfr,)) - processes.append(p) - p.start() - vims = self.common_db.get_vim_accounts() - for vim in vims: - p = multiprocessing.Process(target=self._collect_vim_infra_metrics, - args=(vim['_id'],)) - processes.append(p) - p.start() - sdncs = self.common_db.get_sdncs() - for sdnc in sdncs: - p = multiprocessing.Process(target=self._collect_sdnc_infra_metrics, - args=(sdnc['_id'],)) - processes.append(p) - p.start() - for process in processes: - process.join(timeout=20) - for process in processes: - if process.is_alive(): - process.kill() metrics = [] - while not self.queue.empty(): - metrics.append(self.queue.get()) - return metrics - def _get_vim_type(self, vim_account_id: str) -> str: - common_db = CommonDbClient(self.conf) - vim_account = common_db.get_vim_account(vim_account_id) - vim_type = vim_account['vim_type'] - if 'config' in vim_account and 'vim_type' in vim_account['config']: - vim_type = vim_account['config']['vim_type'].lower() - if vim_type == 'vio' and 'vrops_site' not in vim_account['config']: - vim_type = 'openstack' - return vim_type + start_time = time.time() + # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20 + with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor: + log.debug('Started metric collector process pool with pool size %s' % (self.conf.get('collector', + 'process_pool_size'))) + futures = [] + for vnfr in vnfrs: + nsr_id = vnfr['nsr-id-ref'] + vnf_member_index = vnfr['member-vnf-index-ref'] + vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index) + futures.append(executor.submit(CollectorService._collect_vim_metrics, self.conf, vnfr, vim_account_id)) + futures.append(executor.submit(CollectorService._collect_vca_metrics, self.conf, vnfr)) + + vims = self.common_db.get_vim_accounts() + for vim in vims: + futures.append(executor.submit(CollectorService._collect_vim_infra_metrics, self.conf, vim['_id'])) + + sdncs = self.common_db.get_sdncs() + for sdnc in sdncs: + futures.append(executor.submit(CollectorService._collect_sdnc_infra_metrics, self.conf, sdnc['_id'])) + + try: + # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds + for future in concurrent.futures.as_completed(futures, self.conf.get('collector', + 'process_execution_timeout')): + result = future.result(timeout=int(self.conf.get('collector', + 'process_execution_timeout'))) + log.debug('result = %s' % (result)) + except concurrent.futures.TimeoutError as e: + # Some processes have not completed due to timeout error + log.info(' Some processes have not finished due to TimeoutError exception') + log.debug('concurrent.futures.TimeoutError exception %s' % (e)) + CollectorService._stop_process_pool(executor) + + while not self.queue.empty(): + metrics.append(self.queue.get()) + + end_time = time.time() + log.info("Collection completed in %s seconds", end_time - start_time) + + return metrics