1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
25 import multiprocessing
30 from osm_common
.dbbase
import DbException
32 from osm_mon
.collector
.backends
.prometheus
import OSM_METRIC_PREFIX
33 from osm_mon
.core
.common_db
import CommonDbClient
34 from osm_mon
.core
.config
import Config
35 from osm_mon
.core
.database
import DatabaseManager
, Alarm
36 from osm_mon
.core
.message_bus_client
import MessageBusClient
37 from osm_mon
.core
.response
import ResponseBuilder
39 log
= logging
.getLogger(__name__
)
43 def __init__(self
, config
: Config
, loop
=None):
46 loop
= asyncio
.get_event_loop()
48 self
.common_db
= CommonDbClient(self
.conf
)
50 self
.database_manager
= DatabaseManager(self
.conf
)
51 self
.database_manager
.create_tables()
52 self
.queue
= multiprocessing
.Queue()
53 self
.msg_bus
= MessageBusClient(config
)
55 def _evaluate_metric(self
,
57 vnf_member_index
: int,
61 log
.debug("_evaluate_metric")
62 # TODO: Refactor to fit backend plugin model
63 query_section
= "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
64 OSM_METRIC_PREFIX
+ metric_name
, nsr_id
, vdur_name
, vnf_member_index
)
65 request_url
= self
.conf
.get('prometheus', 'url') + "/api/v1/query?" + query_section
66 log
.info("Querying Prometheus: %s", request_url
)
67 r
= requests
.get(request_url
, timeout
=int(self
.conf
.get('global', 'request_timeout')))
68 if r
.status_code
== 200:
69 json_response
= r
.json()
70 if json_response
['status'] == 'success':
71 result
= json_response
['data']['result']
73 metric_value
= float(result
[0]['value'][1])
74 log
.info("Metric value: %s", metric_value
)
75 if alarm
.operation
.upper() == 'GT':
76 if metric_value
> alarm
.threshold
:
78 elif alarm
.operation
.upper() == 'LT':
79 if metric_value
< alarm
.threshold
:
82 log
.warning("No metric result for alarm %s", alarm
.id)
84 log
.warning("Prometheus response is not success. Got status %s", json_response
['status'])
86 log
.warning("Error contacting Prometheus. Got status code %s: %s", r
.status_code
, r
.text
)
88 def evaluate_forever(self
):
89 log
.debug('evaluate_forever')
93 time
.sleep(int(self
.conf
.get('evaluator', 'interval')))
94 except peewee
.PeeweeException
:
95 log
.exception("Database error evaluating alarms: ")
98 log
.exception("Error evaluating alarms")
101 log
.debug('evaluate')
103 for alarm
in Alarm
.select():
105 vnfr
= self
.common_db
.get_vnfr(alarm
.nsr_id
, alarm
.vnf_member_index
)
107 log
.exception("Error getting vnfr: ")
109 vnfd
= self
.common_db
.get_vnfd(vnfr
['vnfd-id'])
111 vdur
= next(filter(lambda vdur
: vdur
['name'] == alarm
.vdur_name
, vnfr
['vdur']))
112 except StopIteration:
113 log
.warning("No vdur found with name %s for alarm %s", alarm
.vdur_name
, alarm
.id)
115 vdu
= next(filter(lambda vdu
: vdu
['id'] == vdur
['vdu-id-ref'], vnfd
['vdu']))
116 vnf_monitoring_param
= next(
117 filter(lambda param
: param
['id'] == alarm
.monitoring_param
, vnfd
['monitoring-param']))
118 nsr_id
= vnfr
['nsr-id-ref']
119 vnf_member_index
= vnfr
['member-vnf-index-ref']
120 vdur_name
= vdur
['name']
121 if 'vdu-monitoring-param' in vnf_monitoring_param
:
122 vdu_monitoring_param
= next(filter(
123 lambda param
: param
['id'] == vnf_monitoring_param
['vdu-monitoring-param'][
124 'vdu-monitoring-param-ref'], vdu
['monitoring-param']))
125 nfvi_metric
= vdu_monitoring_param
['nfvi-metric']
127 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
135 if 'vdu-metric' in vnf_monitoring_param
:
136 vnf_metric_name
= vnf_monitoring_param
['vdu-metric']['vdu-metric-name-ref']
137 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
145 if 'vnf-metric' in vnf_monitoring_param
:
146 vnf_metric_name
= vnf_monitoring_param
['vnf-metric']['vnf-metric-name-ref']
147 p
= multiprocessing
.Process(target
=self
._evaluate
_metric
,
156 for process
in processes
:
157 process
.join(timeout
=10)
158 triggered_alarms
= []
159 while not self
.queue
.empty():
160 triggered_alarms
.append(self
.queue
.get())
161 for alarm
in triggered_alarms
:
162 p
= multiprocessing
.Process(target
=self
.notify_alarm
,
166 def notify_alarm(self
, alarm
: Alarm
):
167 log
.debug("notify_alarm")
168 response
= ResponseBuilder()
169 now
= time
.strftime("%d-%m-%Y") + " " + time
.strftime("%X")
170 # Generate and send response
171 resp_message
= response
.generate_response(
174 vdu_name
=alarm
.vdur_name
,
175 vnf_member_index
=alarm
.vnf_member_index
,
177 metric_name
=alarm
.monitoring_param
,
178 operation
=alarm
.operation
,
179 threshold_value
=alarm
.threshold
,
183 self
.loop
.run_until_complete(self
.msg_bus
.aiowrite('alarm_response', 'notify_alarm', resp_message
))
184 log
.info("Sent alarm notification: %s", resp_message
)