From: palsus Date: Mon, 1 Mar 2021 19:59:41 +0000 (+0000) Subject: Fix for bug 1450 high memory consumption X-Git-Tag: branch-sol006v331-start~15 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F11%2F10411%2F9;p=osm%2FMON.git Fix for bug 1450 high memory consumption Change-Id: Ic95aa63bdd8713571826a7a7963f2d33ce80d325 Signed-off-by: palsus --- diff --git a/osm_mon/collector/service.py b/osm_mon/collector/service.py index 9dd1683..5eb65a9 100644 --- a/osm_mon/collector/service.py +++ b/osm_mon/collector/service.py @@ -24,7 +24,6 @@ # This version uses a ProcessThreadPoolExecutor to limit the number of processes launched import logging -import multiprocessing from typing import List import concurrent.futures import time @@ -60,8 +59,6 @@ SDN_INFRA_COLLECTORS = { class CollectorService: - # The processes getting metrics will store the results in this queue - queue = multiprocessing.Queue() def __init__(self, config: Config): self.conf = config @@ -83,68 +80,71 @@ class CollectorService: @staticmethod def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str): # TODO(diazb) Add support for aws + metrics = [] vim_type = CollectorService._get_vim_type(conf, vim_account_id) log.debug("vim type.....{}".format(vim_type)) if vim_type in VIM_COLLECTORS: collector = VIM_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect(vnfr) log.debug("Collecting vim metrics.....{}".format(metrics)) - for metric in metrics: - pass - CollectorService.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) - return + return metrics @staticmethod def _collect_vca_metrics(conf: Config, vnfr: dict): + metrics = [] vca_collector = VCACollector(conf) metrics = vca_collector.collect(vnfr) log.debug("Collecting vca metrics.....{}".format(metrics)) - for metric in metrics: - CollectorService.queue.put(metric) - return + return metrics @staticmethod def _collect_vim_infra_metrics(conf: Config, vim_account_id: str): log.info("Collecting vim infra metrics") + metrics = [] vim_type = CollectorService._get_vim_type(conf, vim_account_id) if vim_type in VIM_INFRA_COLLECTORS: collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect() log.debug("Collecting vim infra metrics.....{}".format(metrics)) - for metric in metrics: - CollectorService.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) - return + return metrics @staticmethod def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str): log.info("Collecting sdnc metrics") + metrics = [] common_db = CommonDbClient(conf) sdn_type = common_db.get_sdnc(sdnc_id)['type'] if sdn_type in SDN_INFRA_COLLECTORS: collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id) metrics = collector.collect() log.debug("Collecting sdnc metrics.....{}".format(metrics)) - for metric in metrics: - CollectorService.queue.put(metric) else: log.debug("sdn_type %s is not supported.", sdn_type) - return + return metrics @staticmethod def _stop_process_pool(executor): - log.info('Stopping all processes in the process pool') + log.info('Shutting down process pool') try: + log.debug('Stopping residual processes in the process pool') for pid, process in executor._processes.items(): if process.is_alive(): process.terminate() except Exception as e: log.info("Exception during process termination") log.debug("Exception %s" % (e)) - executor.shutdown() + + try: + # Shutting down executor + log.debug('Shutting down process pool executor') + executor.shutdown() + except RuntimeError as e: + log.info('RuntimeError in shutting down executer') + log.debug('RuntimeError %s' % (e)) return def collect_metrics(self) -> List[Metric]: @@ -154,8 +154,8 @@ class CollectorService: start_time = time.time() # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20 with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor: - log.debug('Started metric collector process pool with pool size %s' % (self.conf.get('collector', - 'process_pool_size'))) + log.info('Started metric collector process pool with pool size %s' % (self.conf.get('collector', + 'process_pool_size'))) futures = [] for vnfr in vnfrs: nsr_id = vnfr['nsr-id-ref'] @@ -178,15 +178,15 @@ class CollectorService: 'process_execution_timeout')): result = future.result(timeout=int(self.conf.get('collector', 'process_execution_timeout'))) + metrics.extend(result) log.debug('result = %s' % (result)) except concurrent.futures.TimeoutError as e: # Some processes have not completed due to timeout error - log.info(' Some processes have not finished due to TimeoutError exception') + log.info('Some processes have not finished due to TimeoutError exception') log.debug('concurrent.futures.TimeoutError exception %s' % (e)) - CollectorService._stop_process_pool(executor) - while not self.queue.empty(): - metrics.append(self.queue.get()) + # Shutting down process pool executor + CollectorService._stop_process_pool(executor) end_time = time.time() log.info("Collection completed in %s seconds", end_time - start_time) diff --git a/osm_mon/core/mon.yaml b/osm_mon/core/mon.yaml index 3c9e27f..b0739ee 100644 --- a/osm_mon/core/mon.yaml +++ b/osm_mon/core/mon.yaml @@ -41,13 +41,13 @@ sql: collector: interval: 30 - process_pool_size: 20 + process_pool_size: 10 process_execution_timeout: 50 evaluator: interval: 30 backend: prometheus - process_pool_size: 20 + process_pool_size: 10 process_timeout: 50 dashboarder: diff --git a/osm_mon/evaluator/evaluator.py b/osm_mon/evaluator/evaluator.py index d8589bb..6ca0dc5 100644 --- a/osm_mon/evaluator/evaluator.py +++ b/osm_mon/evaluator/evaluator.py @@ -75,20 +75,30 @@ class Evaluator: # Some processes have not completed due to timeout error log.info('Some processes have not finished due to TimeoutError exception') log.debug('concurrent.futures.TimeoutError exception %s' % (e)) - Evaluator._stop_process_pool(executor) + + # Shutting down process pool executor + Evaluator._stop_process_pool(executor) @staticmethod def _stop_process_pool(executor): log.debug("_stop_process_pool") - log.info('Stopping all processes in the process pool') + log.info('Shutting down process pool') try: + log.debug('Stopping residual processes in the process pool') for pid, process in executor._processes.items(): if process.is_alive(): process.terminate() except Exception as e: log.info("Exception during process termination") log.debug("Exception %s" % (e)) - executor.shutdown() + + try: + # Shutting down executor + log.debug('Shutting down process pool executor') + executor.shutdown() + except RuntimeError as e: + log.info('RuntimeError in shutting down executer') + log.debug('RuntimeError %s' % (e)) return @staticmethod