- p = multiprocessing.Process(target=self._collect_vim_infra_metrics,
- args=(vim['_id'],))
- processes.append(p)
- p.start()
- sdncs = self.common_db.get_sdncs()
- for sdnc in sdncs:
- p = multiprocessing.Process(target=self._collect_sdnc_infra_metrics,
- args=(sdnc['_id'],))
- processes.append(p)
- p.start()
- for process in processes:
- process.join(timeout=20)
- for process in processes:
- if process.is_alive():
- process.terminate()
- metrics = []
- while not self.queue.empty():
- metrics.append(self.queue.get())
- return metrics
+ vim_type = CollectorService._get_vim_type(self.conf, vim["_id"])
+ if vim_type in VIM_INFRA_COLLECTORS:
+ collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim["_id"])
+ vim_sess = collector.vim_session if vim_type == "openstack" else None
+ # Populate the vim session map with vim ids and corresponding session objects
+ # vim session objects are stopred only for vim type openstack
+ if vim_sess:
+ vim_sess_map[vim["_id"]] = vim_sess
+
+ start_time = time.time()
+ # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20
+ # init_session is called to assign the session map to the gloabal vim session map variable
+ with concurrent.futures.ProcessPoolExecutor(
+ self.conf.get("collector", "process_pool_size"), initializer=init_session, initargs=(vim_sess_map,)
+ ) as executor:
+ log.info(
+ "Started metric collector process pool with pool size %s"
+ % (self.conf.get("collector", "process_pool_size"))
+ )
+ futures = []
+ for vnfr in vnfrs:
+ nsr_id = vnfr["nsr-id-ref"]
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ vim_account_id = self.common_db.get_vim_account_id(
+ nsr_id, vnf_member_index
+ )
+ futures.append(
+ executor.submit(
+ CollectorService._collect_vim_metrics,
+ self.conf,
+ vnfr,
+ vim_account_id,
+ )
+ )
+ futures.append(
+ executor.submit(
+ CollectorService._collect_vca_metrics, self.conf, vnfr
+ )
+ )
+
+ for vim in vims:
+ futures.append(
+ executor.submit(
+ CollectorService._collect_vim_infra_metrics,
+ self.conf,
+ vim["_id"],
+ )
+ )
+
+ sdncs = self.common_db.get_sdncs()
+ for sdnc in sdncs:
+ futures.append(
+ executor.submit(
+ CollectorService._collect_sdnc_infra_metrics,
+ self.conf,
+ sdnc["_id"],
+ )
+ )
+
+ try:
+ # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds
+ for future in concurrent.futures.as_completed(
+ futures, self.conf.get("collector", "process_execution_timeout")
+ ):
+ try:
+ result = future.result(
+ timeout=int(
+ self.conf.get("collector", "process_execution_timeout")
+ )
+ )
+ metrics.extend(result)
+ log.debug("result = %s" % (result))
+ except keystoneauth1.exceptions.connection.ConnectionError as e:
+ log.info("Keystone connection error during metric collection")
+ log.debug("Keystone connection error exception %s" % (e))
+ except concurrent.futures.TimeoutError as e:
+ # Some processes have not completed due to timeout error
+ log.info(
+ "Some processes have not finished due to TimeoutError exception"
+ )
+ log.debug("concurrent.futures.TimeoutError exception %s" % (e))
+
+ # Shutting down process pool executor
+ CollectorService._stop_process_pool(executor)
+
+ end_time = time.time()
+ log.info("Collection completed in %s seconds", end_time - start_time)