24e8e43f0adbda6793cdbae672ee2cfd684800fe
1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
25 import multiprocessing
31 from osm_common
.dbbase
import DbException
33 from osm_mon
.collector
.backends
.prometheus
import OSM_METRIC_PREFIX
34 from osm_mon
.core
.common_db
import CommonDbClient
35 from osm_mon
.core
.config
import Config
36 from osm_mon
.core
.database
import DatabaseManager
, Alarm
37 from osm_mon
.core
.message_bus_client
import MessageBusClient
38 from osm_mon
.core
.response
import ResponseBuilder
40 log
= logging
.getLogger(__name__
)
43 class AlarmStatus(Enum
):
46 INSUFFICIENT
= 'insufficient-data'
51 def __init__(self
, config
: Config
, loop
=None):
54 loop
= asyncio
.get_event_loop()
56 self
.common_db
= CommonDbClient(self
.conf
)
58 self
.database_manager
= DatabaseManager(self
.conf
)
59 self
.database_manager
.create_tables()
60 self
.queue
= multiprocessing
.Queue()
61 self
.msg_bus
= MessageBusClient(config
)
63 def _evaluate_metric(self
,
65 vnf_member_index
: int,
69 log
.debug("_evaluate_metric")
70 # TODO: Refactor to fit backend plugin model
71 query_section
= "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
72 OSM_METRIC_PREFIX
+ metric_name
, nsr_id
, vdur_name
, vnf_member_index
)
73 request_url
= self
.conf
.get('prometheus', 'url') + "/api/v1/query?" + query_section
74 log
.info("Querying Prometheus: %s", request_url
)
75 r
= requests
.get(request_url
, timeout
=int(self
.conf
.get('global', 'request_timeout')))
76 if r
.status_code
== 200:
77 json_response
= r
.json()
78 if json_response
['status'] == 'success':
79 result
= json_response
['data']['result']
81 metric_value
= float(result
[0]['value'][1])
82 log
.info("Metric value: %s", metric_value
)
83 if alarm
.operation
.upper() == 'GT':
84 if metric_value
> alarm
.threshold
:
85 self
.queue
.put((alarm
, AlarmStatus
.ALARM
))
87 self
.queue
.put((alarm
, AlarmStatus
.OK
))
88 elif alarm
.operation
.upper() == 'LT':
89 if metric_value
< alarm
.threshold
:
90 self
.queue
.put((alarm
, AlarmStatus
.ALARM
))
92 self
.queue
.put((alarm
, AlarmStatus
.OK
))
94 log
.warning("No metric result for alarm %s", alarm
.id)
95 self
.queue
.put((alarm
, AlarmStatus
.INSUFFICIENT
))
98 log
.warning("Prometheus response is not success. Got status %s", json_response
['status'])
100 log
.warning("Error contacting Prometheus. Got status code %s: %s", r
.status_code
, r
.text
)
102 def evaluate_forever(self
):
103 log
.debug('evaluate_forever')
107 time
.sleep(int(self
.conf
.get('evaluator', 'interval')))
108 except peewee
.PeeweeException
:
109 log
.exception("Database error evaluating alarms: ")
112 log
.exception("Error evaluating alarms")
115 log
.debug('evaluate')
117 for alarm
in Alarm
.select():
119 vnfr
= self
.common_db
.get_vnfr(alarm
.nsr_id
, alarm
.vnf_member_index
)
121 log
.exception("Error getting vnfr: ")
123 vnfd
= self
.common_db
.get_vnfd(vnfr
['vnfd-id'])
125 vdur
= next(filter(lambda vdur
: vdur
['name'] == alarm
.vdur_name
, vnfr
['vdur']))
126 except StopIteration:
127 log
.warning("No vdur found with name %s for alarm %s", alarm
.vdur_name
, alarm
.id)
129 vdu
= next(filter(lambda vdu
: vdu
['id'] == vdur
['vdu-id-ref'], vnfd
['vdu']))
130 vnf_monitoring_param
= next(
131 filter(lambda param
: param
['id'] == alarm
.monitoring_param
, vnfd
['monitoring-param']))
132 nsr_id
= vnfr
['nsr-id-ref']
133 vnf_member_index
= vnfr
['member-vnf-index-ref']
134 vdur_name
= vdur
['name']
135 if 'vdu-monitoring-param' in vnf_monitoring_param
:
136 vdu_monitoring_param
= next(filter(
137 lambda param
: param
['id'] == vnf_monitoring_param
['vdu-monitoring-param'][
138 'vdu-monitoring-param-ref'], vdu
['monitoring-param']))
139 nfvi_metric
= vdu_monitoring_param
['nfvi-metric']
141 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
149 if 'vdu-metric' in vnf_monitoring_param
:
150 vnf_metric_name
= vnf_monitoring_param
['vdu-metric']['vdu-metric-name-ref']
151 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
159 if 'vnf-metric' in vnf_monitoring_param
:
160 vnf_metric_name
= vnf_monitoring_param
['vnf-metric']['vnf-metric-name-ref']
161 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
170 for process
in processes
:
171 process
.join(timeout
=10)
173 while not self
.queue
.empty():
174 alarms_tuples
.append(self
.queue
.get())
175 for alarm
, status
in alarms_tuples
:
176 p
= multiprocessing
.Process(target
=self
.notify_alarm
,
177 args
=(alarm
, status
))
180 def notify_alarm(self
, alarm
: Alarm
, status
: AlarmStatus
):
181 log
.debug("notify_alarm")
182 resp_message
= self
._build
_alarm
_response
(alarm
, status
)
183 log
.info("Sent alarm notification: %s", resp_message
)
184 self
.loop
.run_until_complete(self
.msg_bus
.aiowrite('alarm_response', 'notify_alarm', resp_message
))
186 def _build_alarm_response(self
, alarm
: Alarm
, status
: AlarmStatus
):
187 response
= ResponseBuilder()
188 now
= time
.strftime("%d-%m-%Y") + " " + time
.strftime("%X")
189 return response
.generate_response(
192 vdu_name
=alarm
.vdur_name
,
193 vnf_member_index
=alarm
.vnf_member_index
,
195 metric_name
=alarm
.monitoring_param
,
196 operation
=alarm
.operation
,
197 threshold_value
=alarm
.threshold
,