From: palsus Date: Wed, 20 Jan 2021 18:26:13 +0000 (+0000) Subject: Fix for Bug 1174 zombie processes. Replaced multiprocessing with concurrent-futures X-Git-Tag: branch-sol006v331-start~21 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=9a773323e8cec18fb0f2fa204b0cea4e2370c5dd;p=osm%2FMON.git Fix for Bug 1174 zombie processes. Replaced multiprocessing with concurrent-futures Change-Id: I105973aa944fc23a45a50de14bb6d7106a0037fb Signed-off-by: palsus --- diff --git a/osm_mon/collector/service.py b/osm_mon/collector/service.py index eecad4d..9dd1683 100644 --- a/osm_mon/collector/service.py +++ b/osm_mon/collector/service.py @@ -20,9 +20,14 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## + +# This version uses a ProcessThreadPoolExecutor to limit the number of processes launched + import logging import multiprocessing from typing import List +import concurrent.futures +import time from osm_mon.collector.infra_collectors.onos import OnosInfraCollector from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector @@ -55,94 +60,135 @@ SDN_INFRA_COLLECTORS = { class CollectorService: + # The processes getting metrics will store the results in this queue + queue = multiprocessing.Queue() + def __init__(self, config: Config): self.conf = config self.common_db = CommonDbClient(self.conf) - self.queue = multiprocessing.Queue() + return + + # static methods to be executed in the Processes + @staticmethod + def _get_vim_type(conf: Config, vim_account_id: str) -> str: + common_db = CommonDbClient(conf) + vim_account = common_db.get_vim_account(vim_account_id) + vim_type = vim_account['vim_type'] + if 'config' in vim_account and 'vim_type' in vim_account['config']: + vim_type = vim_account['config']['vim_type'].lower() + if vim_type == 'vio' and 'vrops_site' not in vim_account['config']: + vim_type = 'openstack' + return vim_type - def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str): + @staticmethod + def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str): # TODO(diazb) Add support for aws - vim_type = self._get_vim_type(vim_account_id) + vim_type = CollectorService._get_vim_type(conf, vim_account_id) + log.debug("vim type.....{}".format(vim_type)) if vim_type in VIM_COLLECTORS: - collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id) + collector = VIM_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect(vnfr) + log.debug("Collecting vim metrics.....{}".format(metrics)) for metric in metrics: - self.queue.put(metric) + pass + CollectorService.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) + return + + @staticmethod + def _collect_vca_metrics(conf: Config, vnfr: dict): + vca_collector = VCACollector(conf) + metrics = vca_collector.collect(vnfr) + log.debug("Collecting vca metrics.....{}".format(metrics)) + for metric in metrics: + CollectorService.queue.put(metric) + return - def _collect_vim_infra_metrics(self, vim_account_id: str): - vim_type = self._get_vim_type(vim_account_id) + @staticmethod + def _collect_vim_infra_metrics(conf: Config, vim_account_id: str): + log.info("Collecting vim infra metrics") + vim_type = CollectorService._get_vim_type(conf, vim_account_id) if vim_type in VIM_INFRA_COLLECTORS: - collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id) + collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect() + log.debug("Collecting vim infra metrics.....{}".format(metrics)) for metric in metrics: - self.queue.put(metric) + CollectorService.queue.put(metric) else: log.debug("vimtype %s is not supported.", vim_type) + return - def _collect_sdnc_infra_metrics(self, sdnc_id: str): - common_db = CommonDbClient(self.conf) + @staticmethod + def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str): + log.info("Collecting sdnc metrics") + common_db = CommonDbClient(conf) sdn_type = common_db.get_sdnc(sdnc_id)['type'] if sdn_type in SDN_INFRA_COLLECTORS: - collector = SDN_INFRA_COLLECTORS[sdn_type](self.conf, sdnc_id) + collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id) metrics = collector.collect() + log.debug("Collecting sdnc metrics.....{}".format(metrics)) for metric in metrics: - self.queue.put(metric) + CollectorService.queue.put(metric) else: log.debug("sdn_type %s is not supported.", sdn_type) - - def _collect_vca_metrics(self, vnfr: dict): - log.debug('_collect_vca_metrics') - log.debug('vnfr: %s', vnfr) - vca_collector = VCACollector(self.conf) - metrics = vca_collector.collect(vnfr) - for metric in metrics: - self.queue.put(metric) + return + + @staticmethod + def _stop_process_pool(executor): + log.info('Stopping all processes in the process pool') + try: + for pid, process in executor._processes.items(): + if process.is_alive(): + process.terminate() + except Exception as e: + log.info("Exception during process termination") + log.debug("Exception %s" % (e)) + executor.shutdown() + return def collect_metrics(self) -> List[Metric]: vnfrs = self.common_db.get_vnfrs() - processes = [] - for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index) - p = multiprocessing.Process(target=self._collect_vim_metrics, - args=(vnfr, vim_account_id)) - processes.append(p) - p.start() - p = multiprocessing.Process(target=self._collect_vca_metrics, - args=(vnfr,)) - processes.append(p) - p.start() - vims = self.common_db.get_vim_accounts() - for vim in vims: - p = multiprocessing.Process(target=self._collect_vim_infra_metrics, - args=(vim['_id'],)) - processes.append(p) - p.start() - sdncs = self.common_db.get_sdncs() - for sdnc in sdncs: - p = multiprocessing.Process(target=self._collect_sdnc_infra_metrics, - args=(sdnc['_id'],)) - processes.append(p) - p.start() - for process in processes: - process.join(timeout=20) - for process in processes: - if process.is_alive(): - process.terminate() metrics = [] - while not self.queue.empty(): - metrics.append(self.queue.get()) - return metrics - def _get_vim_type(self, vim_account_id: str) -> str: - common_db = CommonDbClient(self.conf) - vim_account = common_db.get_vim_account(vim_account_id) - vim_type = vim_account['vim_type'] - if 'config' in vim_account and 'vim_type' in vim_account['config']: - vim_type = vim_account['config']['vim_type'].lower() - if vim_type == 'vio' and 'vrops_site' not in vim_account['config']: - vim_type = 'openstack' - return vim_type + start_time = time.time() + # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20 + with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor: + log.debug('Started metric collector process pool with pool size %s' % (self.conf.get('collector', + 'process_pool_size'))) + futures = [] + for vnfr in vnfrs: + nsr_id = vnfr['nsr-id-ref'] + vnf_member_index = vnfr['member-vnf-index-ref'] + vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index) + futures.append(executor.submit(CollectorService._collect_vim_metrics, self.conf, vnfr, vim_account_id)) + futures.append(executor.submit(CollectorService._collect_vca_metrics, self.conf, vnfr)) + + vims = self.common_db.get_vim_accounts() + for vim in vims: + futures.append(executor.submit(CollectorService._collect_vim_infra_metrics, self.conf, vim['_id'])) + + sdncs = self.common_db.get_sdncs() + for sdnc in sdncs: + futures.append(executor.submit(CollectorService._collect_sdnc_infra_metrics, self.conf, sdnc['_id'])) + + try: + # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds + for future in concurrent.futures.as_completed(futures, self.conf.get('collector', + 'process_execution_timeout')): + result = future.result(timeout=int(self.conf.get('collector', + 'process_execution_timeout'))) + log.debug('result = %s' % (result)) + except concurrent.futures.TimeoutError as e: + # Some processes have not completed due to timeout error + log.info(' Some processes have not finished due to TimeoutError exception') + log.debug('concurrent.futures.TimeoutError exception %s' % (e)) + CollectorService._stop_process_pool(executor) + + while not self.queue.empty(): + metrics.append(self.queue.get()) + + end_time = time.time() + log.info("Collection completed in %s seconds", end_time - start_time) + + return metrics diff --git a/osm_mon/core/mon.yaml b/osm_mon/core/mon.yaml index e25760b..321e485 100644 --- a/osm_mon/core/mon.yaml +++ b/osm_mon/core/mon.yaml @@ -41,6 +41,8 @@ sql: collector: interval: 30 + process_pool_size: 20 + process_execution_timeout: 50 evaluator: interval: 30 diff --git a/osm_mon/tests/unit/collector/test_collector_service.py b/osm_mon/tests/unit/collector/test_collector_service.py index 9b5f002..fc2146c 100644 --- a/osm_mon/tests/unit/collector/test_collector_service.py +++ b/osm_mon/tests/unit/collector/test_collector_service.py @@ -41,7 +41,7 @@ class CollectorServiceTest(TestCase): def test_init_vim_collector_and_collect_openstack(self, _get_vim_account, collect): _get_vim_account.return_value = {'vim_type': 'openstack'} collector = CollectorService(self.config) - collector._collect_vim_metrics({}, 'test_vim_account_id') + collector._collect_vim_metrics(self.config, {}, 'test_vim_account_id') collect.assert_called_once_with({}) @mock.patch.object(OpenstackCollector, "collect") @@ -49,7 +49,7 @@ class CollectorServiceTest(TestCase): def test_init_vim_collector_and_collect_unknown(self, _get_vim_account, openstack_collect): _get_vim_account.return_value = {'vim_type': 'unknown'} collector = CollectorService(self.config) - collector._collect_vim_metrics({}, 'test_vim_account_id') + collector._collect_vim_metrics(self.config, {}, 'test_vim_account_id') openstack_collect.assert_not_called() @mock.patch.object(OpenstackCollector, "__init__", lambda *args, **kwargs: None) @@ -58,7 +58,7 @@ class CollectorServiceTest(TestCase): def test_init_vim_collector_and_collect_vio_with_openstack_collector(self, _get_vim_account, openstack_collect): _get_vim_account.return_value = {'vim_type': 'openstack', 'config': {'vim_type': 'VIO'}} collector = CollectorService(self.config) - collector._collect_vim_metrics({}, 'test_vim_account_id') + collector._collect_vim_metrics(self.config, {}, 'test_vim_account_id') openstack_collect.assert_called_once_with({}) @mock.patch.object(VIOCollector, "__init__", lambda *args, **kwargs: None) @@ -68,12 +68,12 @@ class CollectorServiceTest(TestCase): _get_vim_account.return_value = {'vim_type': 'openstack', 'config': {'vim_type': 'VIO', 'vrops_site': 'https://vrops'}} collector = CollectorService(self.config) - collector._collect_vim_metrics({}, 'test_vim_account_id') + collector._collect_vim_metrics(self.config, {}, 'test_vim_account_id') vio_collect.assert_called_once_with({}) @mock.patch("osm_mon.collector.service.VCACollector", autospec=True) def test_collect_vca_metrics(self, vca_collector): collector = CollectorService(self.config) - collector._collect_vca_metrics({}) + collector._collect_vca_metrics(self.config, {}) vca_collector.assert_called_once_with(self.config) vca_collector.return_value.collect.assert_called_once_with({}) diff --git a/setup.py b/setup.py index fe8af5a..dfe455e 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ setup( "pyyaml>=5.1.2", "prometheus_client==0.4.*", "gnocchiclient==7.0.*", - "pyvcloud==19.1.1", + "pyvcloud==23.0.*", "python-ceilometerclient==2.9.*", "python-novaclient==12.0.*", "python-neutronclient==5.1.*",