from enum import Enum
from typing import Tuple, List
-from osm_common.dbbase import DbException
-
-from osm_mon.core import database
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
-from osm_mon.core.database import Alarm, AlarmRepository
+from osm_mon.core.models import Alarm
from osm_mon.evaluator.backends.prometheus import PrometheusBackend
log = logging.getLogger(__name__)
self.queue = multiprocessing.Queue()
def _get_metric_value(self,
- nsr_id: str,
- vnf_member_index: str,
- vdur_name: str,
- metric_name: str):
- return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name,
- nsr_id,
- vdur_name,
- vnf_member_index)
+ metric_name: str,
+ tags: dict):
+ return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, tags)
def _evaluate_metric(self,
- nsr_id: str,
- vnf_member_index: str,
- vdur_name: str,
- metric_name: str,
alarm: Alarm):
log.debug("_evaluate_metric")
- metric_value = self._get_metric_value(nsr_id, vnf_member_index, vdur_name, metric_name)
+ metric_value = self._get_metric_value(alarm.metric, alarm.tags)
if metric_value is None:
- log.warning("No metric result for alarm %s", alarm.id)
+ log.warning("No metric result for alarm %s", alarm.uuid)
self.queue.put((alarm, AlarmStatus.INSUFFICIENT))
else:
if alarm.operation.upper() == 'GT':
def evaluate_alarms(self) -> List[Tuple[Alarm, AlarmStatus]]:
log.debug('evaluate_alarms')
processes = []
- database.db.connect()
- try:
- with database.db.atomic():
- for alarm in AlarmRepository.list():
- try:
- vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
- except DbException:
- log.exception("Error getting vnfr: ")
- continue
- vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
- try:
- vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
- except StopIteration:
- log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
- continue
- vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
- vnf_monitoring_param = next(
- filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
- nsr_id = vnfr['nsr-id-ref']
- vnf_member_index = vnfr['member-vnf-index-ref']
- vdur_name = vdur['name']
- if 'vdu-monitoring-param' in vnf_monitoring_param:
- vdu_monitoring_param = next(filter(
- lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
- 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
- nfvi_metric = vdu_monitoring_param['nfvi-metric']
-
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- vdur_name,
- nfvi_metric,
- alarm))
- processes.append(p)
- p.start()
- if 'vdu-metric' in vnf_monitoring_param:
- vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- vdur_name,
- vnf_metric_name,
- alarm))
- processes.append(p)
- p.start()
- if 'vnf-metric' in vnf_monitoring_param:
- vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- '',
- vnf_metric_name,
- alarm))
- processes.append(p)
- p.start()
-
- for process in processes:
- process.join(timeout=10)
- alarms_tuples = []
- while not self.queue.empty():
- alarms_tuples.append(self.queue.get())
- return alarms_tuples
- finally:
- database.db.close()
+ for alarm in self.common_db.get_alarms():
+ p = multiprocessing.Process(target=self._evaluate_metric,
+ args=(alarm,))
+ processes.append(p)
+ p.start()
+
+ for process in processes:
+ process.join(timeout=10)
+ alarms_tuples = []
+ while not self.queue.empty():
+ alarms_tuples.append(self.queue.get())
+ return alarms_tuples