# This version uses a ProcessThreadPoolExecutor to limit the number of processes launched
import logging
-import multiprocessing
from typing import List
import concurrent.futures
import time
class CollectorService:
- # The processes getting metrics will store the results in this queue
- queue = multiprocessing.Queue()
def __init__(self, config: Config):
self.conf = config
@staticmethod
def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str):
# TODO(diazb) Add support for aws
+ metrics = []
vim_type = CollectorService._get_vim_type(conf, vim_account_id)
log.debug("vim type.....{}".format(vim_type))
if vim_type in VIM_COLLECTORS:
collector = VIM_COLLECTORS[vim_type](conf, vim_account_id)
metrics = collector.collect(vnfr)
log.debug("Collecting vim metrics.....{}".format(metrics))
- for metric in metrics:
- pass
- CollectorService.queue.put(metric)
else:
log.debug("vimtype %s is not supported.", vim_type)
- return
+ return metrics
@staticmethod
def _collect_vca_metrics(conf: Config, vnfr: dict):
+ metrics = []
vca_collector = VCACollector(conf)
metrics = vca_collector.collect(vnfr)
log.debug("Collecting vca metrics.....{}".format(metrics))
- for metric in metrics:
- CollectorService.queue.put(metric)
- return
+ return metrics
@staticmethod
def _collect_vim_infra_metrics(conf: Config, vim_account_id: str):
log.info("Collecting vim infra metrics")
+ metrics = []
vim_type = CollectorService._get_vim_type(conf, vim_account_id)
if vim_type in VIM_INFRA_COLLECTORS:
collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id)
metrics = collector.collect()
log.debug("Collecting vim infra metrics.....{}".format(metrics))
- for metric in metrics:
- CollectorService.queue.put(metric)
else:
log.debug("vimtype %s is not supported.", vim_type)
- return
+ return metrics
@staticmethod
def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str):
log.info("Collecting sdnc metrics")
+ metrics = []
common_db = CommonDbClient(conf)
sdn_type = common_db.get_sdnc(sdnc_id)['type']
if sdn_type in SDN_INFRA_COLLECTORS:
collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id)
metrics = collector.collect()
log.debug("Collecting sdnc metrics.....{}".format(metrics))
- for metric in metrics:
- CollectorService.queue.put(metric)
else:
log.debug("sdn_type %s is not supported.", sdn_type)
- return
+ return metrics
@staticmethod
def _stop_process_pool(executor):
- log.info('Stopping all processes in the process pool')
+ log.info('Shutting down process pool')
try:
+ log.debug('Stopping residual processes in the process pool')
for pid, process in executor._processes.items():
if process.is_alive():
process.terminate()
except Exception as e:
log.info("Exception during process termination")
log.debug("Exception %s" % (e))
- executor.shutdown()
+
+ try:
+ # Shutting down executor
+ log.debug('Shutting down process pool executor')
+ executor.shutdown()
+ except RuntimeError as e:
+ log.info('RuntimeError in shutting down executer')
+ log.debug('RuntimeError %s' % (e))
return
def collect_metrics(self) -> List[Metric]:
start_time = time.time()
# Starting executor pool with pool size process_pool_size. Default process_pool_size is 20
with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor:
- log.debug('Started metric collector process pool with pool size %s' % (self.conf.get('collector',
- 'process_pool_size')))
+ log.info('Started metric collector process pool with pool size %s' % (self.conf.get('collector',
+ 'process_pool_size')))
futures = []
for vnfr in vnfrs:
nsr_id = vnfr['nsr-id-ref']
'process_execution_timeout')):
result = future.result(timeout=int(self.conf.get('collector',
'process_execution_timeout')))
+ metrics.extend(result)
log.debug('result = %s' % (result))
except concurrent.futures.TimeoutError as e:
# Some processes have not completed due to timeout error
- log.info(' Some processes have not finished due to TimeoutError exception')
+ log.info('Some processes have not finished due to TimeoutError exception')
log.debug('concurrent.futures.TimeoutError exception %s' % (e))
- CollectorService._stop_process_pool(executor)
- while not self.queue.empty():
- metrics.append(self.queue.get())
+ # Shutting down process pool executor
+ CollectorService._stop_process_pool(executor)
end_time = time.time()
log.info("Collection completed in %s seconds", end_time - start_time)
# Some processes have not completed due to timeout error
log.info('Some processes have not finished due to TimeoutError exception')
log.debug('concurrent.futures.TimeoutError exception %s' % (e))
- Evaluator._stop_process_pool(executor)
+
+ # Shutting down process pool executor
+ Evaluator._stop_process_pool(executor)
@staticmethod
def _stop_process_pool(executor):
log.debug("_stop_process_pool")
- log.info('Stopping all processes in the process pool')
+ log.info('Shutting down process pool')
try:
+ log.debug('Stopping residual processes in the process pool')
for pid, process in executor._processes.items():
if process.is_alive():
process.terminate()
except Exception as e:
log.info("Exception during process termination")
log.debug("Exception %s" % (e))
- executor.shutdown()
+
+ try:
+ # Shutting down executor
+ log.debug('Shutting down process pool executor')
+ executor.shutdown()
+ except RuntimeError as e:
+ log.info('RuntimeError in shutting down executer')
+ log.debug('RuntimeError %s' % (e))
return
@staticmethod