- vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
- except StopIteration:
- log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
- continue
- vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
- vnf_monitoring_param = next(
- filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
- nsr_id = vnfr['nsr-id-ref']
- vnf_member_index = vnfr['member-vnf-index-ref']
- vdur_name = vdur['name']
- if 'vdu-monitoring-param' in vnf_monitoring_param:
- vdu_monitoring_param = next(filter(
- lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
- 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
- nfvi_metric = vdu_monitoring_param['nfvi-metric']
-
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- vdur_name,
- nfvi_metric,
- alarm))
- processes.append(p)
- p.start()
- if 'vdu-metric' in vnf_monitoring_param:
- vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- vdur_name,
- vnf_metric_name,
- alarm))
- processes.append(p)
- p.start()
- if 'vnf-metric' in vnf_monitoring_param:
- vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- '',
- vnf_metric_name,
- alarm))
- processes.append(p)
- p.start()
-
- for process in processes:
- process.join(timeout=10)
- alarms_tuples = []
- while not self.queue.empty():
- alarms_tuples.append(self.queue.get())
- for alarm, status in alarms_tuples:
- p = multiprocessing.Process(target=self.notify_alarm,
- args=(alarm, status))
- p.start()
-
- def notify_alarm(self, alarm: Alarm, status: AlarmStatus):
- log.debug("notify_alarm")
- resp_message = self._build_alarm_response(alarm, status)
+ # Wait for future calls to complete till process_timeout. Default is 50 seconds
+ for evaluate_future in concurrent.futures.as_completed(evaluate_futures,
+ self.conf.get('evaluator', 'process_timeout')):
+ result = evaluate_future.result(timeout=int(self.conf.get('evaluator',
+ 'process_timeout')))
+ log.debug('result = %s' % (result))
+ except concurrent.futures.TimeoutError as e:
+ # Some processes have not completed due to timeout error
+ log.info('Some processes have not finished due to TimeoutError exception')
+ log.debug('concurrent.futures.TimeoutError exception %s' % (e))
+ Evaluator._stop_process_pool(executor)
+
+ @staticmethod
+ def _stop_process_pool(executor):
+ log.debug("_stop_process_pool")
+ log.info('Stopping all processes in the process pool')
+ try:
+ for pid, process in executor._processes.items():
+ if process.is_alive():
+ process.terminate()
+ except Exception as e:
+ log.info("Exception during process termination")
+ log.debug("Exception %s" % (e))
+ executor.shutdown()
+ return
+
+ @staticmethod
+ def _notify_alarm(conf: Config, alarm: Alarm, status: AlarmStatus):
+ log.debug("_notify_alarm")
+ resp_message = Evaluator._build_alarm_response(alarm, status)
+ msg_bus = MessageBusClient(conf)
+ loop = asyncio.get_event_loop()