Adds alarm engine
[osm/MON.git] / osm_mon / evaluator / evaluator.py
diff --git a/osm_mon/evaluator/evaluator.py b/osm_mon/evaluator/evaluator.py
new file mode 100644 (file)
index 0000000..b040198
--- /dev/null
@@ -0,0 +1,175 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import json
+import logging
+import multiprocessing
+import time
+
+from osm_common.dbbase import DbException
+
+from osm_mon.collector.collector import VIM_COLLECTORS
+from osm_mon.collector.collectors.juju import VCACollector
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.database import DatabaseManager, Alarm
+from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.response import ResponseBuilder
+from osm_mon.core.settings import Config
+
+log = logging.getLogger(__name__)
+
+
+class Evaluator:
+    def __init__(self):
+        self.common_db = CommonDbClient()
+        self.plugins = []
+        self.database_manager = DatabaseManager()
+        self.database_manager.create_tables()
+        self.queue = multiprocessing.Queue()
+
+    def _evaluate_vim_metric(self,
+                             nsr_id: str,
+                             vnf_member_index: int,
+                             vdur_name: str,
+                             nfvi_metric_name: str,
+                             vim_account_id: str,
+                             alarm: Alarm):
+        vim_type = self.database_manager.get_vim_type(vim_account_id)
+        if vim_type in VIM_COLLECTORS:
+            collector = VIM_COLLECTORS[vim_type](vim_account_id)
+            metric = collector.collect_one(nsr_id, vnf_member_index, vdur_name, nfvi_metric_name)
+            if alarm.operation.upper() == 'GT':
+                if metric.value > alarm.threshold:
+                    self.queue.put(alarm)
+            elif alarm.operation.upper() == 'LT':
+                if metric.value < alarm.threshold:
+                    self.queue.put(alarm)
+
+        else:
+            log.debug("vimtype %s is not supported.", vim_type)
+
+    def _evaluate_vca_metric(self,
+                             nsr_id: str,
+                             vnf_member_index: int,
+                             vdur_name: str,
+                             vnf_metric_name: str,
+                             alarm: Alarm):
+        collector = VCACollector()
+        metric = collector.collect_one(nsr_id, vnf_member_index, vdur_name, vnf_metric_name)
+        if alarm.operation.upper() == 'GT':
+            if metric.value > alarm.threshold:
+                self.queue.put(alarm)
+        elif alarm.operation.upper() == 'LT':
+            if metric.value < alarm.threshold:
+                self.queue.put(alarm)
+
+    def evaluate_forever(self):
+        log.debug('collect_forever')
+        cfg = Config.instance()
+        while True:
+            try:
+                self.evaluate()
+                time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL)
+            except Exception:
+                log.exception("Error evaluating alarms")
+
+    def evaluate(self):
+        processes = []
+        for alarm in Alarm.select():
+            try:
+                vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
+            except DbException:
+                log.exception("Error getting vnfr: ")
+                continue
+            vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
+            try:
+                vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
+            except StopIteration:
+                log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
+                continue
+            vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
+            vnf_monitoring_param = next(
+                filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
+            nsr_id = vnfr['nsr-id-ref']
+            vnf_member_index = vnfr['member-vnf-index-ref']
+            vdur_name = vdur['name']
+            if 'vdu-monitoring-param' in vnf_monitoring_param:
+                vdu_monitoring_param = next(filter(
+                    lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
+                        'vdu-monitoring-param-ref'], vdu['monitoring-param']))
+                nfvi_metric = vdu_monitoring_param['nfvi-metric']
+
+                vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index)
+                p = multiprocessing.Process(target=self._evaluate_vim_metric,
+                                            args=(nsr_id,
+                                                  vnf_member_index,
+                                                  vdur_name,
+                                                  nfvi_metric,
+                                                  vim_account_id,
+                                                  alarm))
+                processes.append(p)
+                p.start()
+            if 'vdu-metric' in vnf_monitoring_param:
+                vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
+                p = multiprocessing.Process(target=self._evaluate_vca_metric,
+                                            args=(nsr_id,
+                                                  vnf_member_index,
+                                                  vdur_name,
+                                                  vnf_metric_name,
+                                                  alarm))
+                processes.append(p)
+                p.start()
+            if 'vnf-metric' in vnf_monitoring_param:
+                log.warning("vnf-metric is not currently supported.")
+                continue
+
+        for process in processes:
+            process.join()
+        triggered_alarms = []
+        while not self.queue.empty():
+            triggered_alarms.append(self.queue.get())
+        for alarm in triggered_alarms:
+            self.notify_alarm(alarm)
+            p = multiprocessing.Process(target=self.notify_alarm,
+                                        args=(alarm,))
+            p.start()
+
+    def notify_alarm(self, alarm: Alarm):
+        response = ResponseBuilder()
+        now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
+        # Generate and send response
+        resp_message = response.generate_response(
+            'notify_alarm',
+            alarm_id=alarm.id,
+            vdu_name=alarm.vdur_name,
+            vnf_member_index=alarm.vnf_member_index,
+            ns_id=alarm.nsr_id,
+            metric_name=alarm.monitoring_param,
+            operation=alarm.operation,
+            threshold_value=alarm.threshold,
+            sev=alarm.severity,
+            status='alarm',
+            date=now)
+        producer = Producer()
+        producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message))
+        producer.flush()
+        log.info("Sent alarm notification: %s", resp_message)