Fix for bug 1450 high memory consumption 11/10411/9
authorpalsus <subhankar.pal@aricent.com>
Mon, 1 Mar 2021 19:59:41 +0000 (19:59 +0000)
committerpalsus <subhankar.pal@aricent.com>
Tue, 2 Mar 2021 05:02:36 +0000 (05:02 +0000)
Change-Id: Ic95aa63bdd8713571826a7a7963f2d33ce80d325
Signed-off-by: palsus <subhankar.pal@aricent.com>
osm_mon/collector/service.py
osm_mon/core/mon.yaml
osm_mon/evaluator/evaluator.py

index 9dd1683..5eb65a9 100644 (file)
@@ -24,7 +24,6 @@
 # This version uses a ProcessThreadPoolExecutor to limit the number of processes launched
 
 import logging
-import multiprocessing
 from typing import List
 import concurrent.futures
 import time
@@ -60,8 +59,6 @@ SDN_INFRA_COLLECTORS = {
 
 
 class CollectorService:
-    # The processes getting metrics will store the results in this queue
-    queue = multiprocessing.Queue()
 
     def __init__(self, config: Config):
         self.conf = config
@@ -83,68 +80,71 @@ class CollectorService:
     @staticmethod
     def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str):
         # TODO(diazb) Add support for aws
+        metrics = []
         vim_type = CollectorService._get_vim_type(conf, vim_account_id)
         log.debug("vim type.....{}".format(vim_type))
         if vim_type in VIM_COLLECTORS:
             collector = VIM_COLLECTORS[vim_type](conf, vim_account_id)
             metrics = collector.collect(vnfr)
             log.debug("Collecting vim metrics.....{}".format(metrics))
-            for metric in metrics:
-                pass
-                CollectorService.queue.put(metric)
         else:
             log.debug("vimtype %s is not supported.", vim_type)
-        return
+        return metrics
 
     @staticmethod
     def _collect_vca_metrics(conf: Config, vnfr: dict):
+        metrics = []
         vca_collector = VCACollector(conf)
         metrics = vca_collector.collect(vnfr)
         log.debug("Collecting vca metrics.....{}".format(metrics))
-        for metric in metrics:
-            CollectorService.queue.put(metric)
-        return
+        return metrics
 
     @staticmethod
     def _collect_vim_infra_metrics(conf: Config, vim_account_id: str):
         log.info("Collecting vim infra metrics")
+        metrics = []
         vim_type = CollectorService._get_vim_type(conf, vim_account_id)
         if vim_type in VIM_INFRA_COLLECTORS:
             collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id)
             metrics = collector.collect()
             log.debug("Collecting vim infra metrics.....{}".format(metrics))
-            for metric in metrics:
-                CollectorService.queue.put(metric)
         else:
             log.debug("vimtype %s is not supported.", vim_type)
-        return
+        return metrics
 
     @staticmethod
     def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str):
         log.info("Collecting sdnc metrics")
+        metrics = []
         common_db = CommonDbClient(conf)
         sdn_type = common_db.get_sdnc(sdnc_id)['type']
         if sdn_type in SDN_INFRA_COLLECTORS:
             collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id)
             metrics = collector.collect()
             log.debug("Collecting sdnc metrics.....{}".format(metrics))
-            for metric in metrics:
-                CollectorService.queue.put(metric)
         else:
             log.debug("sdn_type %s is not supported.", sdn_type)
-        return
+        return metrics
 
     @staticmethod
     def _stop_process_pool(executor):
-        log.info('Stopping all processes in the process pool')
+        log.info('Shutting down process pool')
         try:
+            log.debug('Stopping residual processes in the process pool')
             for pid, process in executor._processes.items():
                 if process.is_alive():
                     process.terminate()
         except Exception as e:
             log.info("Exception during process termination")
             log.debug("Exception %s" % (e))
-        executor.shutdown()
+
+        try:
+            # Shutting down executor
+            log.debug('Shutting down process pool executor')
+            executor.shutdown()
+        except RuntimeError as e:
+            log.info('RuntimeError in shutting down executer')
+            log.debug('RuntimeError %s' % (e))
         return
 
     def collect_metrics(self) -> List[Metric]:
@@ -154,8 +154,8 @@ class CollectorService:
         start_time = time.time()
         # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20
         with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor:
-            log.debug('Started metric collector process pool with pool size %s' % (self.conf.get('collector',
-                                                                                                 'process_pool_size')))
+            log.info('Started metric collector process pool with pool size %s' % (self.conf.get('collector',
+                                                                                                'process_pool_size')))
             futures = []
             for vnfr in vnfrs:
                 nsr_id = vnfr['nsr-id-ref']
@@ -178,15 +178,15 @@ class CollectorService:
                                                                                      'process_execution_timeout')):
                     result = future.result(timeout=int(self.conf.get('collector',
                                                                      'process_execution_timeout')))
+                    metrics.extend(result)
                     log.debug('result = %s' % (result))
             except concurrent.futures.TimeoutError as e:
                 # Some processes have not completed due to timeout error
-                log.info(' Some processes have not finished due to TimeoutError exception')
+                log.info('Some processes have not finished due to TimeoutError exception')
                 log.debug('concurrent.futures.TimeoutError exception %s' % (e))
-                CollectorService._stop_process_pool(executor)
 
-            while not self.queue.empty():
-                metrics.append(self.queue.get())
+            # Shutting down process pool executor
+            CollectorService._stop_process_pool(executor)
 
         end_time = time.time()
         log.info("Collection completed in %s seconds", end_time - start_time)
index 3c9e27f..b0739ee 100644 (file)
@@ -41,13 +41,13 @@ sql:
 
 collector:
   interval: 30
-  process_pool_size: 20
+  process_pool_size: 10
   process_execution_timeout: 50
 
 evaluator:
   interval: 30
   backend: prometheus
-  process_pool_size: 20
+  process_pool_size: 10
   process_timeout: 50
 
 dashboarder:
index d8589bb..6ca0dc5 100644 (file)
@@ -75,20 +75,30 @@ class Evaluator:
                 # Some processes have not completed due to timeout error
                 log.info('Some processes have not finished due to TimeoutError exception')
                 log.debug('concurrent.futures.TimeoutError exception %s' % (e))
-                Evaluator._stop_process_pool(executor)
+
+            # Shutting down process pool executor
+            Evaluator._stop_process_pool(executor)
 
     @staticmethod
     def _stop_process_pool(executor):
         log.debug("_stop_process_pool")
-        log.info('Stopping all processes in the process pool')
+        log.info('Shutting down process pool')
         try:
+            log.debug('Stopping residual processes in the process pool')
             for pid, process in executor._processes.items():
                 if process.is_alive():
                     process.terminate()
         except Exception as e:
             log.info("Exception during process termination")
             log.debug("Exception %s" % (e))
-        executor.shutdown()
+
+        try:
+            # Shutting down executor
+            log.debug('Shutting down process pool executor')
+            executor.shutdown()
+        except RuntimeError as e:
+            log.info('RuntimeError in shutting down executer')
+            log.debug('RuntimeError %s' % (e))
         return
 
     @staticmethod