1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime
import datetime
, timedelta
21 from airflow
.decorators
import dag
, task
22 from airflow
.operators
.python
import get_current_context
23 from osm_mon
.core
.common_db
import CommonDbClient
24 from osm_mon
.core
.config
import Config
26 from requests
.exceptions
import ConnectionError
, RequestException
29 logger
= logging
.getLogger("airflow.task")
35 "depends_on_past": False,
37 "retry_delay": timedelta(seconds
=15),
39 description
="Webhook callback for VDU alarm from Prometheus AlertManager",
40 is_paused_upon_creation
=False,
41 schedule_interval
=None,
42 start_date
=datetime(2022, 1, 1),
43 tags
=["osm", "webhook"],
46 @task(task_id
="main_task")
48 logger
.debug("Running main task...")
49 # Read input parameters
50 context
= get_current_context()
51 conf
= context
["dag_run"].conf
52 for alarm
in conf
["alerts"]:
54 status
= alarm
["status"]
55 logger
.info(f
" status: {status}")
56 logger
.info(f
' annotations: {alarm["annotations"]}')
57 logger
.info(f
' startsAt: {alarm["startsAt"]}')
58 logger
.info(f
' endsAt: {alarm["endsAt"]}')
59 logger
.info(f
' labels: {alarm["labels"]}')
60 alertname
= alarm
["labels"].get("alertname")
61 # Check vdu_alarm alert type
62 if not alertname
.startswith("vdu_alarm_"):
65 common_db
= CommonDbClient(config
)
66 ns_id
= alarm
["labels"]["ns_id"]
67 vdu_id
= alarm
["labels"]["vdu_id"]
68 vnf_member_index
= alarm
["labels"]["vnf_member_index"]
69 # Searching alerting rule in MongoDB
71 f
"Searching alarm rule in MongoDB: ns_id {ns_id}, "
72 f
"vnf_member_index {vnf_member_index}, "
75 alert
= common_db
.get_alert(
77 vnf_member_index
=vnf_member_index
,
80 action_type
="vdu_alarm",
83 logger
.info("Found an alert rule:")
85 if status
== "firing":
86 alarm_status
= "alarm"
87 elif status
== "resolved":
92 common_db
.update_alert_status(
93 uuid
=alert
["uuid"], alarm_status
=alarm_status
95 if alarm_status
in alert
["action"]:
97 for item
in alert
["action"][alarm_status
]:
98 urls
.append(item
["url"])
100 logger
.info(f
"No '{alarm_status}' action in the alert rule")
104 "alarm_uuid": alert
["uuid"],
105 "status": alarm_status
,
106 "metric_name": alert
["metric"],
107 "start_date": alarm
["startsAt"],
108 "tags": alarm
["labels"],
111 "schema_type": "notify_alarm",
112 "schema_version": "1.1",
113 "notify_details": notify_details
,
115 headers
= {"content-type": "application/json"}
117 logger
.info(f
"Sending HTTP POST to {url}...")
119 resp
= requests
.post(
121 data
=json
.dumps(payload
),
126 logger
.info(f
"Response {resp}")
127 except ConnectionError
:
128 logger
.info(f
"Error connecting to url {url}")
129 except RequestException
as e
:
131 f
"RequestException while connecting to url {url}: {e}"