1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
25 import multiprocessing
30 from osm_common
.dbbase
import DbException
32 from osm_mon
.collector
.backends
.prometheus
import OSM_METRIC_PREFIX
33 from osm_mon
.core
.common_db
import CommonDbClient
34 from osm_mon
.core
.database
import DatabaseManager
, Alarm
35 from osm_mon
.core
.message_bus
.producer
import Producer
36 from osm_mon
.core
.response
import ResponseBuilder
37 from osm_mon
.core
.settings
import Config
39 log
= logging
.getLogger(__name__
)
44 self
.common_db
= CommonDbClient()
46 self
.database_manager
= DatabaseManager()
47 self
.database_manager
.create_tables()
48 self
.queue
= multiprocessing
.Queue()
50 def _evaluate_metric(self
,
52 vnf_member_index
: int,
56 log
.debug("_evaluate_metric")
57 # TODO: Refactor to fit backend plugin model
58 cfg
= Config
.instance()
59 query_section
= "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
60 OSM_METRIC_PREFIX
+ metric_name
, nsr_id
, vdur_name
, vnf_member_index
)
61 request_url
= cfg
.OSMMON_PROMETHEUS_URL
+ "/api/v1/query?" + query_section
62 log
.info("Querying Prometheus: %s", request_url
)
63 r
= requests
.get(request_url
, timeout
=cfg
.OSMMON_REQUEST_TIMEOUT
)
64 if r
.status_code
== 200:
65 json_response
= r
.json()
66 if json_response
['status'] == 'success':
67 result
= json_response
['data']['result']
69 metric_value
= float(result
[0]['value'][1])
70 log
.info("Metric value: %s", metric_value
)
71 if alarm
.operation
.upper() == 'GT':
72 if metric_value
> alarm
.threshold
:
74 elif alarm
.operation
.upper() == 'LT':
75 if metric_value
< alarm
.threshold
:
78 log
.warning("No metric result for alarm %s", alarm
.id)
80 log
.warning("Prometheus response is not success. Got status %s", json_response
['status'])
82 log
.warning("Error contacting Prometheus. Got status code %s: %s", r
.status_code
, r
.text
)
84 def evaluate_forever(self
):
85 log
.debug('evaluate_forever')
86 cfg
= Config
.instance()
90 time
.sleep(cfg
.OSMMON_EVALUATOR_INTERVAL
)
91 except peewee
.PeeweeException
:
92 log
.exception("Database error evaluating alarms: ")
95 log
.exception("Error evaluating alarms")
100 for alarm
in Alarm
.select():
102 vnfr
= self
.common_db
.get_vnfr(alarm
.nsr_id
, alarm
.vnf_member_index
)
104 log
.exception("Error getting vnfr: ")
106 vnfd
= self
.common_db
.get_vnfd(vnfr
['vnfd-id'])
108 vdur
= next(filter(lambda vdur
: vdur
['name'] == alarm
.vdur_name
, vnfr
['vdur']))
109 except StopIteration:
110 log
.warning("No vdur found with name %s for alarm %s", alarm
.vdur_name
, alarm
.id)
112 vdu
= next(filter(lambda vdu
: vdu
['id'] == vdur
['vdu-id-ref'], vnfd
['vdu']))
113 vnf_monitoring_param
= next(
114 filter(lambda param
: param
['id'] == alarm
.monitoring_param
, vnfd
['monitoring-param']))
115 nsr_id
= vnfr
['nsr-id-ref']
116 vnf_member_index
= vnfr
['member-vnf-index-ref']
117 vdur_name
= vdur
['name']
118 if 'vdu-monitoring-param' in vnf_monitoring_param
:
119 vdu_monitoring_param
= next(filter(
120 lambda param
: param
['id'] == vnf_monitoring_param
['vdu-monitoring-param'][
121 'vdu-monitoring-param-ref'], vdu
['monitoring-param']))
122 nfvi_metric
= vdu_monitoring_param
['nfvi-metric']
124 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
132 if 'vdu-metric' in vnf_monitoring_param
:
133 vnf_metric_name
= vnf_monitoring_param
['vdu-metric']['vdu-metric-name-ref']
134 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
142 if 'vnf-metric' in vnf_monitoring_param
:
143 vnf_metric_name
= vnf_monitoring_param
['vnf-metric']['vnf-metric-name-ref']
144 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
153 for process
in processes
:
155 triggered_alarms
= []
156 while not self
.queue
.empty():
157 triggered_alarms
.append(self
.queue
.get())
158 for alarm
in triggered_alarms
:
159 self
.notify_alarm(alarm
)
160 p
= multiprocessing
.Process(target
=self
.notify_alarm
,
164 def notify_alarm(self
, alarm
: Alarm
):
165 log
.debug("notify_alarm")
166 response
= ResponseBuilder()
167 now
= time
.strftime("%d-%m-%Y") + " " + time
.strftime("%X")
168 # Generate and send response
169 resp_message
= response
.generate_response(
172 vdu_name
=alarm
.vdur_name
,
173 vnf_member_index
=alarm
.vnf_member_index
,
175 metric_name
=alarm
.monitoring_param
,
176 operation
=alarm
.operation
,
177 threshold_value
=alarm
.threshold
,
181 producer
= Producer()
182 producer
.send(topic
='alarm_response', key
='notify_alarm', value
=json
.dumps(resp_message
))
184 log
.info("Sent alarm notification: %s", resp_message
)