1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
25 import multiprocessing
29 from osm_common
.dbbase
import DbException
31 from osm_mon
.collector
.backends
.prometheus
import OSM_METRIC_PREFIX
32 from osm_mon
.core
.common_db
import CommonDbClient
33 from osm_mon
.core
.database
import DatabaseManager
, Alarm
34 from osm_mon
.core
.message_bus
.producer
import Producer
35 from osm_mon
.core
.response
import ResponseBuilder
36 from osm_mon
.core
.settings
import Config
38 log
= logging
.getLogger(__name__
)
43 self
.common_db
= CommonDbClient()
45 self
.database_manager
= DatabaseManager()
46 self
.database_manager
.create_tables()
47 self
.queue
= multiprocessing
.Queue()
49 def _evaluate_metric(self
,
51 vnf_member_index
: int,
55 log
.debug("_evaluate_metric")
56 # TODO: Refactor to fit backend plugin model
57 cfg
= Config
.instance()
58 query_section
= "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
59 OSM_METRIC_PREFIX
+ metric_name
, nsr_id
, vdur_name
, vnf_member_index
)
60 request_url
= cfg
.OSMMON_PROMETHEUS_URL
+ "/api/v1/query?" + query_section
61 log
.info("Querying Prometheus: %s", request_url
)
62 r
= requests
.get(request_url
, timeout
=cfg
.OSMMON_REQUEST_TIMEOUT
)
63 if r
.status_code
== 200:
64 json_response
= r
.json()
65 if json_response
['status'] == 'success':
66 result
= json_response
['data']['result']
68 metric_value
= float(result
[0]['value'][1])
69 log
.info("Metric value: %s", metric_value
)
70 if alarm
.operation
.upper() == 'GT':
71 if metric_value
> alarm
.threshold
:
73 elif alarm
.operation
.upper() == 'LT':
74 if metric_value
< alarm
.threshold
:
77 log
.warning("No metric result for alarm %s", alarm
.id)
79 log
.warning("Prometheus response is not success. Got status %s", json_response
['status'])
81 log
.warning("Error contacting Prometheus. Got status code %s: %s", r
.status_code
, r
.text
)
83 def evaluate_forever(self
):
84 log
.debug('evaluate_forever')
85 cfg
= Config
.instance()
89 time
.sleep(cfg
.OSMMON_EVALUATOR_INTERVAL
)
91 log
.exception("Error evaluating alarms")
96 for alarm
in Alarm
.select():
98 vnfr
= self
.common_db
.get_vnfr(alarm
.nsr_id
, alarm
.vnf_member_index
)
100 log
.exception("Error getting vnfr: ")
102 vnfd
= self
.common_db
.get_vnfd(vnfr
['vnfd-id'])
104 vdur
= next(filter(lambda vdur
: vdur
['name'] == alarm
.vdur_name
, vnfr
['vdur']))
105 except StopIteration:
106 log
.warning("No vdur found with name %s for alarm %s", alarm
.vdur_name
, alarm
.id)
108 vdu
= next(filter(lambda vdu
: vdu
['id'] == vdur
['vdu-id-ref'], vnfd
['vdu']))
109 vnf_monitoring_param
= next(
110 filter(lambda param
: param
['id'] == alarm
.monitoring_param
, vnfd
['monitoring-param']))
111 nsr_id
= vnfr
['nsr-id-ref']
112 vnf_member_index
= vnfr
['member-vnf-index-ref']
113 vdur_name
= vdur
['name']
114 if 'vdu-monitoring-param' in vnf_monitoring_param
:
115 vdu_monitoring_param
= next(filter(
116 lambda param
: param
['id'] == vnf_monitoring_param
['vdu-monitoring-param'][
117 'vdu-monitoring-param-ref'], vdu
['monitoring-param']))
118 nfvi_metric
= vdu_monitoring_param
['nfvi-metric']
120 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
128 if 'vdu-metric' in vnf_monitoring_param
:
129 vnf_metric_name
= vnf_monitoring_param
['vdu-metric']['vdu-metric-name-ref']
130 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
138 if 'vnf-metric' in vnf_monitoring_param
:
139 vnf_metric_name
= vnf_monitoring_param
['vnf-metric']['vnf-metric-name-ref']
140 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
149 for process
in processes
:
151 triggered_alarms
= []
152 while not self
.queue
.empty():
153 triggered_alarms
.append(self
.queue
.get())
154 for alarm
in triggered_alarms
:
155 self
.notify_alarm(alarm
)
156 p
= multiprocessing
.Process(target
=self
.notify_alarm
,
160 def notify_alarm(self
, alarm
: Alarm
):
161 log
.debug("notify_alarm")
162 response
= ResponseBuilder()
163 now
= time
.strftime("%d-%m-%Y") + " " + time
.strftime("%X")
164 # Generate and send response
165 resp_message
= response
.generate_response(
168 vdu_name
=alarm
.vdur_name
,
169 vnf_member_index
=alarm
.vnf_member_index
,
171 metric_name
=alarm
.monitoring_param
,
172 operation
=alarm
.operation
,
173 threshold_value
=alarm
.threshold
,
177 producer
= Producer()
178 producer
.send(topic
='alarm_response', key
='notify_alarm', value
=json
.dumps(resp_message
))
180 log
.info("Sent alarm notification: %s", resp_message
)