X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcollector%2Fservice.py;h=5eb65a9892a236b1ceb941931b78c01dce996362;hb=a2eeb474200b8f9ebcaee6fa68fe52b6e1a5e337;hp=9dd16838cdf92c03ac44a7f539ae49bf01decfe1;hpb=9a773323e8cec18fb0f2fa204b0cea4e2370c5dd;p=osm%2FMON.git diff --git a/osm_mon/collector/service.py b/osm_mon/collector/service.py index 9dd1683..5eb65a9 100644 --- a/osm_mon/collector/service.py +++ b/osm_mon/collector/service.py @@ -24,7 +24,6 @@ # This version uses a ProcessThreadPoolExecutor to limit the number of processes launched import logging -import multiprocessing from typing import List import concurrent.futures import time @@ -60,8 +59,6 @@ SDN_INFRA_COLLECTORS = { class CollectorService: - # The processes getting metrics will store the results in this queue - queue = multiprocessing.Queue() def __init__(self, config: Config): self.conf = config @@ -83,68 +80,71 @@ class CollectorService: @staticmethod def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str): # TODO(diazb) Add support for aws + metrics = [] vim_type = CollectorService._get_vim_type(conf, vim_account_id) log.debug("vim type.....{}".format(vim_type)) if vim_type in VIM_COLLECTORS: collector = VIM_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect(vnfr) log.debug("Collecting vim metrics.....{}".format(metrics)) - for metric in metrics: - pass - CollectorService.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) - return + return metrics @staticmethod def _collect_vca_metrics(conf: Config, vnfr: dict): + metrics = [] vca_collector = VCACollector(conf) metrics = vca_collector.collect(vnfr) log.debug("Collecting vca metrics.....{}".format(metrics)) - for metric in metrics: - CollectorService.queue.put(metric) - return + return metrics @staticmethod def _collect_vim_infra_metrics(conf: Config, vim_account_id: str): log.info("Collecting vim infra metrics") + metrics = [] vim_type = CollectorService._get_vim_type(conf, vim_account_id) if vim_type in VIM_INFRA_COLLECTORS: collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect() log.debug("Collecting vim infra metrics.....{}".format(metrics)) - for metric in metrics: - CollectorService.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) - return + return metrics @staticmethod def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str): log.info("Collecting sdnc metrics") + metrics = [] common_db = CommonDbClient(conf) sdn_type = common_db.get_sdnc(sdnc_id)['type'] if sdn_type in SDN_INFRA_COLLECTORS: collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id) metrics = collector.collect() log.debug("Collecting sdnc metrics.....{}".format(metrics)) - for metric in metrics: - CollectorService.queue.put(metric) else: log.debug("sdn_type %s is not supported.", sdn_type) - return + return metrics @staticmethod def _stop_process_pool(executor): - log.info('Stopping all processes in the process pool') + log.info('Shutting down process pool') try: + log.debug('Stopping residual processes in the process pool') for pid, process in executor._processes.items(): if process.is_alive(): process.terminate() except Exception as e: log.info("Exception during process termination") log.debug("Exception %s" % (e)) - executor.shutdown() + + try: + # Shutting down executor + log.debug('Shutting down process pool executor') + executor.shutdown() + except RuntimeError as e: + log.info('RuntimeError in shutting down executer') + log.debug('RuntimeError %s' % (e)) return def collect_metrics(self) -> List[Metric]: @@ -154,8 +154,8 @@ class CollectorService: start_time = time.time() # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20 with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor: - log.debug('Started metric collector process pool with pool size %s' % (self.conf.get('collector', - 'process_pool_size'))) + log.info('Started metric collector process pool with pool size %s' % (self.conf.get('collector', + 'process_pool_size'))) futures = [] for vnfr in vnfrs: nsr_id = vnfr['nsr-id-ref'] @@ -178,15 +178,15 @@ class CollectorService: 'process_execution_timeout')): result = future.result(timeout=int(self.conf.get('collector', 'process_execution_timeout'))) + metrics.extend(result) log.debug('result = %s' % (result)) except concurrent.futures.TimeoutError as e: # Some processes have not completed due to timeout error - log.info(' Some processes have not finished due to TimeoutError exception') + log.info('Some processes have not finished due to TimeoutError exception') log.debug('concurrent.futures.TimeoutError exception %s' % (e)) - CollectorService._stop_process_pool(executor) - while not self.queue.empty(): - metrics.append(self.queue.get()) + # Shutting down process pool executor + CollectorService._stop_process_pool(executor) end_time = time.time() log.info("Collection completed in %s seconds", end_time - start_time)