978ab3f8746c97b7545d0748a841433a379abfae
[osm/NG-SA.git] / src / osm_ngsa / dags / scaleout_vdu.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 import asyncio
18 from datetime import datetime, timedelta
19 import logging
20 import time
21 import uuid
22
23 from airflow.decorators import dag, task
24 from airflow.operators.python import get_current_context
25 from osm_mon.core.common_db import CommonDbClient
26 from osm_mon.core.config import Config
27 from osm_mon.core.message_bus_client import MessageBusClient
28
29 # Logging
30 logger = logging.getLogger("airflow.task")
31
32
33 @dag(
34 catchup=False,
35 default_args={
36 "depends_on_past": False,
37 "retries": 1,
38 "retry_delay": timedelta(seconds=15),
39 },
40 description="Webhook callback for scale-out alarm from Prometheus AlertManager",
41 is_paused_upon_creation=False,
42 schedule_interval=None,
43 start_date=datetime(2022, 1, 1),
44 tags=["osm", "webhook"],
45 )
46 def scaleout_vdu():
47 @task(task_id="main_task")
48 def main_task():
49 logger.debug("Running main task...")
50 # Read input parameters
51 context = get_current_context()
52 conf = context["dag_run"].conf
53 for alarm in conf["alerts"]:
54 logger.info("Scale-out alarm:")
55 status = alarm["status"]
56 logger.info(f" status: {status}")
57 logger.info(f' annotations: {alarm["annotations"]}')
58 logger.info(f' startsAt: {alarm["startsAt"]}')
59 logger.info(f' endsAt: {alarm["endsAt"]}')
60 logger.info(f' labels: {alarm["labels"]}')
61 alertname = alarm["labels"].get("alertname")
62 if not alertname.startswith("scaleout_"):
63 continue
64 # scaleout_vdu alert type
65 config = Config()
66 common_db = CommonDbClient(config)
67 ns_id = alarm["labels"]["ns_id"]
68 vdu_id = alarm["labels"]["vdu_id"]
69 vnf_member_index = alarm["labels"]["vnf_member_index"]
70 if status == "firing":
71 # Searching alerting rule in MongoDB
72 logger.info(
73 f"Searching scale-out alert rule in MongoDB: ns_id {ns_id}, "
74 f"vnf_member_index {vnf_member_index}, "
75 f"vdu_id {vdu_id}, "
76 )
77 alert = common_db.get_alert(
78 nsr_id=ns_id,
79 vnf_member_index=vnf_member_index,
80 vdu_id=vdu_id,
81 vdu_name=None,
82 action_type="scale_out",
83 )
84 if alert:
85 logger.info("Found an alert rule:")
86 logger.info(alert)
87 # Update alert status
88 common_db.update_alert_status(
89 uuid=alert["uuid"], alarm_status="alarm"
90 )
91 # Get VNFR from MongoDB
92 vnfr = common_db.get_vnfr(
93 nsr_id=ns_id, member_index=vnf_member_index
94 )
95 logger.info(
96 f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
97 )
98 # Check cooldown-time before scale-out
99 send_lcm = 1
100 if "cooldown-time" in alert["action"]:
101 cooldown_time = alert["action"]["cooldown-time"]
102 cooldown_time = cooldown_time * 60
103 now = time.time()
104 since = now - cooldown_time
105 logger.info(
106 f"Looking for scale operations in cooldown interval ({cooldown_time} s)"
107 )
108 nslcmops = common_db.get_nslcmop(
109 nsr_id=ns_id, operation_type="scale", since=since
110 )
111 op = next(
112 (
113 sub
114 for sub in nslcmops
115 if ("scaleVnfData" in sub["operationParams"])
116 and (
117 "scaleByStepData"
118 in sub["operationParams"]["scaleVnfData"]
119 )
120 and (
121 "member-vnf-index"
122 in sub["operationParams"]["scaleVnfData"][
123 "scaleByStepData"
124 ]
125 )
126 and (
127 sub["operationParams"]["scaleVnfData"][
128 "scaleByStepData"
129 ]["member-vnf-index"]
130 == vnf_member_index
131 )
132 ),
133 None,
134 )
135 if op:
136 logger.info(
137 f"No scale-out will be launched, found a previous scale operation in cooldown interval: {op}"
138 )
139 send_lcm = 0
140
141 if send_lcm:
142 # Save nslcmop object in MongoDB
143 msg_bus = MessageBusClient(config)
144 loop = asyncio.get_event_loop()
145 _id = str(uuid.uuid4())
146 projects_read = vnfr["_admin"]["projects_read"]
147 projects_write = vnfr["_admin"]["projects_write"]
148 scaling_group = alert["action"]["scaling-group"]
149 params = {
150 "scaleType": "SCALE_VNF",
151 "scaleVnfData": {
152 "scaleVnfType": "SCALE_OUT",
153 "scaleByStepData": {
154 "scaling-group-descriptor": scaling_group,
155 "member-vnf-index": vnf_member_index,
156 },
157 },
158 "scaleTime": "{}Z".format(datetime.utcnow().isoformat()),
159 }
160 nslcmop = {
161 "id": _id,
162 "_id": _id,
163 "operationState": "PROCESSING",
164 "statusEnteredTime": now,
165 "nsInstanceId": ns_id,
166 "lcmOperationType": "scale",
167 "startTime": now,
168 "isAutomaticInvocation": True,
169 "operationParams": params,
170 "isCancelPending": False,
171 "links": {
172 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
173 "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
174 },
175 "_admin": {
176 "projects_read": projects_read,
177 "projects_write": projects_write,
178 },
179 }
180 common_db.create_nslcmop(nslcmop)
181 # Send Kafka message to LCM
182 logger.info("Sending scale-out action message:")
183 logger.info(nslcmop)
184 loop.run_until_complete(
185 msg_bus.aiowrite("ns", "scale", nslcmop)
186 )
187 else:
188 logger.info("No alert rule was found")
189 elif status == "resolved":
190 # Searching alerting rule in MongoDB
191 logger.info(
192 f"Searching alert rule in MongoDB: ns_id {ns_id}, "
193 f"vnf_member_index {vnf_member_index}, "
194 )
195 alert = common_db.get_alert(
196 nsr_id=ns_id,
197 vnf_member_index=vnf_member_index,
198 vdu_id=vdu_id,
199 vdu_name=None,
200 action_type="scale_out",
201 )
202 if alert:
203 logger.info("Found an alert rule, updating status")
204 # Update alert status
205 common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
206
207 main_task()
208
209
210 dag = scaleout_vdu()