1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
18 from datetime
import datetime
, timedelta
23 from airflow
.decorators
import dag
, task
24 from airflow
.operators
.python
import get_current_context
25 from osm_mon
.core
.common_db
import CommonDbClient
26 from osm_mon
.core
.config
import Config
27 from osm_mon
.core
.message_bus_client
import MessageBusClient
30 logger
= logging
.getLogger("airflow.task")
36 "depends_on_past": False,
38 "retry_delay": timedelta(seconds
=15),
40 description
="Webhook callback for scale-out alarm from Prometheus AlertManager",
41 is_paused_upon_creation
=False,
42 schedule_interval
=None,
43 start_date
=datetime(2022, 1, 1),
44 tags
=["osm", "webhook"],
47 @task(task_id
="main_task")
49 logger
.debug("Running main task...")
50 # Read input parameters
51 context
= get_current_context()
52 conf
= context
["dag_run"].conf
53 for alarm
in conf
["alerts"]:
54 logger
.info("Scale-out alarm:")
55 status
= alarm
["status"]
56 logger
.info(f
" status: {status}")
57 logger
.info(f
' annotations: {alarm["annotations"]}')
58 logger
.info(f
' startsAt: {alarm["startsAt"]}')
59 logger
.info(f
' endsAt: {alarm["endsAt"]}')
60 logger
.info(f
' labels: {alarm["labels"]}')
61 alertname
= alarm
["labels"].get("alertname")
62 if not alertname
.startswith("scaleout_"):
64 # scaleout_vdu alert type
66 common_db
= CommonDbClient(config
)
67 ns_id
= alarm
["labels"]["ns_id"]
68 vdu_id
= alarm
["labels"]["vdu_id"]
69 vnf_member_index
= alarm
["labels"]["vnf_member_index"]
70 if status
== "firing":
71 # Searching alerting rule in MongoDB
73 f
"Searching scale-out alert rule in MongoDB: ns_id {ns_id}, "
74 f
"vnf_member_index {vnf_member_index}, "
77 alert
= common_db
.get_alert(
79 vnf_member_index
=vnf_member_index
,
82 action_type
="scale_out",
85 logger
.info("Found an alert rule:")
88 common_db
.update_alert_status(
89 uuid
=alert
["uuid"], alarm_status
="alarm"
91 # Get VNFR from MongoDB
92 vnfr
= common_db
.get_vnfr(
93 nsr_id
=ns_id
, member_index
=vnf_member_index
96 f
"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
98 # Check cooldown-time before scale-out
100 if "cooldown-time" in alert
["action"]:
101 cooldown_time
= alert
["action"]["cooldown-time"]
102 cooldown_time
= cooldown_time
* 60
104 since
= now
- cooldown_time
106 f
"Looking for scale operations in cooldown interval ({cooldown_time} s)"
108 nslcmops
= common_db
.get_nslcmop(
109 nsr_id
=ns_id
, operation_type
="scale", since
=since
115 if ("scaleVnfData" in sub
["operationParams"])
118 in sub
["operationParams"]["scaleVnfData"]
122 in sub
["operationParams"]["scaleVnfData"][
127 sub
["operationParams"]["scaleVnfData"][
129 ]["member-vnf-index"]
137 f
"No scale-out will be launched, found a previous scale operation in cooldown interval: {op}"
142 # Save nslcmop object in MongoDB
143 msg_bus
= MessageBusClient(config
)
144 loop
= asyncio
.get_event_loop()
145 _id
= str(uuid
.uuid4())
146 projects_read
= vnfr
["_admin"]["projects_read"]
147 projects_write
= vnfr
["_admin"]["projects_write"]
148 scaling_group
= alert
["action"]["scaling-group"]
150 "scaleType": "SCALE_VNF",
152 "scaleVnfType": "SCALE_OUT",
154 "scaling-group-descriptor": scaling_group
,
155 "member-vnf-index": vnf_member_index
,
158 "scaleTime": "{}Z".format(datetime
.utcnow().isoformat()),
163 "operationState": "PROCESSING",
164 "statusEnteredTime": now
,
165 "nsInstanceId": ns_id
,
166 "lcmOperationType": "scale",
168 "isAutomaticInvocation": True,
169 "operationParams": params
,
170 "isCancelPending": False,
172 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
173 "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id
,
176 "projects_read": projects_read
,
177 "projects_write": projects_write
,
180 common_db
.create_nslcmop(nslcmop
)
181 # Send Kafka message to LCM
182 logger
.info("Sending scale-out action message:")
184 loop
.run_until_complete(
185 msg_bus
.aiowrite("ns", "scale", nslcmop
)
188 logger
.info("No alert rule was found")
189 elif status
== "resolved":
190 # Searching alerting rule in MongoDB
192 f
"Searching alert rule in MongoDB: ns_id {ns_id}, "
193 f
"vnf_member_index {vnf_member_index}, "
195 alert
= common_db
.get_alert(
197 vnf_member_index
=vnf_member_index
,
200 action_type
="scale_out",
203 logger
.info("Found an alert rule, updating status")
204 # Update alert status
205 common_db
.update_alert_status(uuid
=alert
["uuid"], alarm_status
="ok")