X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcollector%2Fservice.py;h=005f844e3ffdbb428dc10c06c28cea89db816534;hb=02f4d10c2f14cf08310853c4744dc96aca3d1ceb;hp=5eb65a9892a236b1ceb941931b78c01dce996362;hpb=e57f2f18fddd272c39d0086e94fa10eca5b86029;p=osm%2FMON.git diff --git a/osm_mon/collector/service.py b/osm_mon/collector/service.py index 5eb65a9..005f844 100644 --- a/osm_mon/collector/service.py +++ b/osm_mon/collector/service.py @@ -27,6 +27,7 @@ import logging from typing import List import concurrent.futures import time +import keystoneauth1.exceptions from osm_mon.collector.infra_collectors.onos import OnosInfraCollector from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector @@ -45,21 +46,17 @@ log = logging.getLogger(__name__) VIM_COLLECTORS = { "openstack": OpenstackCollector, "vmware": VMwareCollector, - "vio": VIOCollector + "vio": VIOCollector, } VIM_INFRA_COLLECTORS = { "openstack": OpenstackInfraCollector, "vmware": VMwareInfraCollector, - "vio": VIOInfraCollector -} -SDN_INFRA_COLLECTORS = { - "onosof": OnosInfraCollector, - "onos_vpls": OnosInfraCollector + "vio": VIOInfraCollector, } +SDN_INFRA_COLLECTORS = {"onosof": OnosInfraCollector, "onos_vpls": OnosInfraCollector} class CollectorService: - def __init__(self, config: Config): self.conf = config self.common_db = CommonDbClient(self.conf) @@ -70,11 +67,11 @@ class CollectorService: def _get_vim_type(conf: Config, vim_account_id: str) -> str: common_db = CommonDbClient(conf) vim_account = common_db.get_vim_account(vim_account_id) - vim_type = vim_account['vim_type'] - if 'config' in vim_account and 'vim_type' in vim_account['config']: - vim_type = vim_account['config']['vim_type'].lower() - if vim_type == 'vio' and 'vrops_site' not in vim_account['config']: - vim_type = 'openstack' + vim_type = vim_account["vim_type"] + if "config" in vim_account and "vim_type" in vim_account["config"]: + vim_type = vim_account["config"]["vim_type"].lower() + if vim_type == "vio" and "vrops_site" not in vim_account["config"]: + vim_type = "openstack" return vim_type @staticmethod @@ -117,7 +114,7 @@ class CollectorService: log.info("Collecting sdnc metrics") metrics = [] common_db = CommonDbClient(conf) - sdn_type = common_db.get_sdnc(sdnc_id)['type'] + sdn_type = common_db.get_sdnc(sdnc_id)["type"] if sdn_type in SDN_INFRA_COLLECTORS: collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id) metrics = collector.collect() @@ -128,9 +125,9 @@ class CollectorService: @staticmethod def _stop_process_pool(executor): - log.info('Shutting down process pool') + log.info("Shutting down process pool") try: - log.debug('Stopping residual processes in the process pool') + log.debug("Stopping residual processes in the process pool") for pid, process in executor._processes.items(): if process.is_alive(): process.terminate() @@ -140,11 +137,11 @@ class CollectorService: try: # Shutting down executor - log.debug('Shutting down process pool executor') + log.debug("Shutting down process pool executor") executor.shutdown() except RuntimeError as e: - log.info('RuntimeError in shutting down executer') - log.debug('RuntimeError %s' % (e)) + log.info("RuntimeError in shutting down executer") + log.debug("RuntimeError %s" % (e)) return def collect_metrics(self) -> List[Metric]: @@ -153,37 +150,76 @@ class CollectorService: start_time = time.time() # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20 - with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor: - log.info('Started metric collector process pool with pool size %s' % (self.conf.get('collector', - 'process_pool_size'))) + with concurrent.futures.ProcessPoolExecutor( + self.conf.get("collector", "process_pool_size") + ) as executor: + log.info( + "Started metric collector process pool with pool size %s" + % (self.conf.get("collector", "process_pool_size")) + ) futures = [] for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index) - futures.append(executor.submit(CollectorService._collect_vim_metrics, self.conf, vnfr, vim_account_id)) - futures.append(executor.submit(CollectorService._collect_vca_metrics, self.conf, vnfr)) + nsr_id = vnfr["nsr-id-ref"] + vnf_member_index = vnfr["member-vnf-index-ref"] + vim_account_id = self.common_db.get_vim_account_id( + nsr_id, vnf_member_index + ) + futures.append( + executor.submit( + CollectorService._collect_vim_metrics, + self.conf, + vnfr, + vim_account_id, + ) + ) + futures.append( + executor.submit( + CollectorService._collect_vca_metrics, self.conf, vnfr + ) + ) vims = self.common_db.get_vim_accounts() for vim in vims: - futures.append(executor.submit(CollectorService._collect_vim_infra_metrics, self.conf, vim['_id'])) + futures.append( + executor.submit( + CollectorService._collect_vim_infra_metrics, + self.conf, + vim["_id"], + ) + ) sdncs = self.common_db.get_sdncs() for sdnc in sdncs: - futures.append(executor.submit(CollectorService._collect_sdnc_infra_metrics, self.conf, sdnc['_id'])) + futures.append( + executor.submit( + CollectorService._collect_sdnc_infra_metrics, + self.conf, + sdnc["_id"], + ) + ) try: # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds - for future in concurrent.futures.as_completed(futures, self.conf.get('collector', - 'process_execution_timeout')): - result = future.result(timeout=int(self.conf.get('collector', - 'process_execution_timeout'))) - metrics.extend(result) - log.debug('result = %s' % (result)) + for future in concurrent.futures.as_completed( + futures, self.conf.get("collector", "process_execution_timeout") + ): + try: + result = future.result( + timeout=int( + self.conf.get("collector", "process_execution_timeout") + ) + ) + metrics.extend(result) + log.debug("result = %s" % (result)) + except keystoneauth1.exceptions.connection.ConnectTimeout as e: + log.info("Keystone connection timeout during metric collection") + log.debug("Keystone connection timeout exception %s" % (e)) except concurrent.futures.TimeoutError as e: # Some processes have not completed due to timeout error - log.info('Some processes have not finished due to TimeoutError exception') - log.debug('concurrent.futures.TimeoutError exception %s' % (e)) + log.info( + "Some processes have not finished due to TimeoutError exception" + ) + log.debug("concurrent.futures.TimeoutError exception %s" % (e)) # Shutting down process pool executor CollectorService._stop_process_pool(executor)