X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcollector%2Fservice.py;fp=osm_mon%2Fcollector%2Fservice.py;h=aa270833881f9a50e6bd357c812a59d4fedb3771;hb=8e4179facf22c8096992f0a83caeec9f2f4996c7;hp=5eb65a9892a236b1ceb941931b78c01dce996362;hpb=a2eeb474200b8f9ebcaee6fa68fe52b6e1a5e337;p=osm%2FMON.git diff --git a/osm_mon/collector/service.py b/osm_mon/collector/service.py index 5eb65a9..aa27083 100644 --- a/osm_mon/collector/service.py +++ b/osm_mon/collector/service.py @@ -45,21 +45,17 @@ log = logging.getLogger(__name__) VIM_COLLECTORS = { "openstack": OpenstackCollector, "vmware": VMwareCollector, - "vio": VIOCollector + "vio": VIOCollector, } VIM_INFRA_COLLECTORS = { "openstack": OpenstackInfraCollector, "vmware": VMwareInfraCollector, - "vio": VIOInfraCollector -} -SDN_INFRA_COLLECTORS = { - "onosof": OnosInfraCollector, - "onos_vpls": OnosInfraCollector + "vio": VIOInfraCollector, } +SDN_INFRA_COLLECTORS = {"onosof": OnosInfraCollector, "onos_vpls": OnosInfraCollector} class CollectorService: - def __init__(self, config: Config): self.conf = config self.common_db = CommonDbClient(self.conf) @@ -70,11 +66,11 @@ class CollectorService: def _get_vim_type(conf: Config, vim_account_id: str) -> str: common_db = CommonDbClient(conf) vim_account = common_db.get_vim_account(vim_account_id) - vim_type = vim_account['vim_type'] - if 'config' in vim_account and 'vim_type' in vim_account['config']: - vim_type = vim_account['config']['vim_type'].lower() - if vim_type == 'vio' and 'vrops_site' not in vim_account['config']: - vim_type = 'openstack' + vim_type = vim_account["vim_type"] + if "config" in vim_account and "vim_type" in vim_account["config"]: + vim_type = vim_account["config"]["vim_type"].lower() + if vim_type == "vio" and "vrops_site" not in vim_account["config"]: + vim_type = "openstack" return vim_type @staticmethod @@ -117,7 +113,7 @@ class CollectorService: log.info("Collecting sdnc metrics") metrics = [] common_db = CommonDbClient(conf) - sdn_type = common_db.get_sdnc(sdnc_id)['type'] + sdn_type = common_db.get_sdnc(sdnc_id)["type"] if sdn_type in SDN_INFRA_COLLECTORS: collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id) metrics = collector.collect() @@ -128,9 +124,9 @@ class CollectorService: @staticmethod def _stop_process_pool(executor): - log.info('Shutting down process pool') + log.info("Shutting down process pool") try: - log.debug('Stopping residual processes in the process pool') + log.debug("Stopping residual processes in the process pool") for pid, process in executor._processes.items(): if process.is_alive(): process.terminate() @@ -140,11 +136,11 @@ class CollectorService: try: # Shutting down executor - log.debug('Shutting down process pool executor') + log.debug("Shutting down process pool executor") executor.shutdown() except RuntimeError as e: - log.info('RuntimeError in shutting down executer') - log.debug('RuntimeError %s' % (e)) + log.info("RuntimeError in shutting down executer") + log.debug("RuntimeError %s" % (e)) return def collect_metrics(self) -> List[Metric]: @@ -153,37 +149,72 @@ class CollectorService: start_time = time.time() # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20 - with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor: - log.info('Started metric collector process pool with pool size %s' % (self.conf.get('collector', - 'process_pool_size'))) + with concurrent.futures.ProcessPoolExecutor( + self.conf.get("collector", "process_pool_size") + ) as executor: + log.info( + "Started metric collector process pool with pool size %s" + % (self.conf.get("collector", "process_pool_size")) + ) futures = [] for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index) - futures.append(executor.submit(CollectorService._collect_vim_metrics, self.conf, vnfr, vim_account_id)) - futures.append(executor.submit(CollectorService._collect_vca_metrics, self.conf, vnfr)) + nsr_id = vnfr["nsr-id-ref"] + vnf_member_index = vnfr["member-vnf-index-ref"] + vim_account_id = self.common_db.get_vim_account_id( + nsr_id, vnf_member_index + ) + futures.append( + executor.submit( + CollectorService._collect_vim_metrics, + self.conf, + vnfr, + vim_account_id, + ) + ) + futures.append( + executor.submit( + CollectorService._collect_vca_metrics, self.conf, vnfr + ) + ) vims = self.common_db.get_vim_accounts() for vim in vims: - futures.append(executor.submit(CollectorService._collect_vim_infra_metrics, self.conf, vim['_id'])) + futures.append( + executor.submit( + CollectorService._collect_vim_infra_metrics, + self.conf, + vim["_id"], + ) + ) sdncs = self.common_db.get_sdncs() for sdnc in sdncs: - futures.append(executor.submit(CollectorService._collect_sdnc_infra_metrics, self.conf, sdnc['_id'])) + futures.append( + executor.submit( + CollectorService._collect_sdnc_infra_metrics, + self.conf, + sdnc["_id"], + ) + ) try: # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds - for future in concurrent.futures.as_completed(futures, self.conf.get('collector', - 'process_execution_timeout')): - result = future.result(timeout=int(self.conf.get('collector', - 'process_execution_timeout'))) + for future in concurrent.futures.as_completed( + futures, self.conf.get("collector", "process_execution_timeout") + ): + result = future.result( + timeout=int( + self.conf.get("collector", "process_execution_timeout") + ) + ) metrics.extend(result) - log.debug('result = %s' % (result)) + log.debug("result = %s" % (result)) except concurrent.futures.TimeoutError as e: # Some processes have not completed due to timeout error - log.info('Some processes have not finished due to TimeoutError exception') - log.debug('concurrent.futures.TimeoutError exception %s' % (e)) + log.info( + "Some processes have not finished due to TimeoutError exception" + ) + log.debug("concurrent.futures.TimeoutError exception %s" % (e)) # Shutting down process pool executor CollectorService._stop_process_pool(executor)