Add VIO support in DAGs
[osm/NG-SA.git] / src / osm_ngsa / dags / multivim_vm_metrics.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime import datetime, timedelta
18 import logging
19 from math import ceil
20 from typing import Dict, List
21
22 from airflow import DAG
23 from airflow.decorators import task
24 from osm_mon.core.common_db import CommonDbClient
25 from osm_mon.core.config import Config
26 from osm_mon.vim_connectors.azure import AzureCollector
27 from osm_mon.vim_connectors.gcp import GcpCollector
28 from osm_mon.vim_connectors.openstack import OpenStackCollector
29 from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
30
31
32 SCHEDULE_INTERVAL = 5
33 COLLECTOR_MAX_METRICS_PER_TASK = 100
34 SUPPORTED_VIM_TYPES = ["openstack", "vio", "azure", "gcp"]
35 PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
36 PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
37 PROMETHEUS_METRICS = {
38 "cpu_utilization": {
39 "metric_name": "osm_cpu_utilization",
40 "metric_descr": "CPU usage percentage",
41 },
42 "average_memory_utilization": {
43 "metric_name": "osm_average_memory_utilization",
44 "metric_descr": "Volume of RAM in MB used by the VM",
45 },
46 "disk_read_ops": {
47 "metric_name": "osm_disk_read_ops",
48 "metric_descr": "Number of read requests",
49 },
50 "disk_write_ops": {
51 "metric_name": "osm_disk_write_ops",
52 "metric_descr": "Number of write requests",
53 },
54 "disk_read_bytes": {
55 "metric_name": "osm_disk_read_bytes",
56 "metric_descr": "Volume of reads in bytes",
57 },
58 "disk_write_bytes": {
59 "metric_name": "osm_disk_write_bytes",
60 "metric_descr": "Volume of writes in bytes",
61 },
62 "packets_received": {
63 "metric_name": "osm_packets_received",
64 "metric_descr": "Number of incoming packets",
65 },
66 "packets_sent": {
67 "metric_name": "osm_packets_sent",
68 "metric_descr": "Number of outgoing packets",
69 },
70 "packets_in_dropped": {
71 "metric_name": "osm_packets_in_dropped",
72 "metric_descr": "Number of incoming dropped packets",
73 },
74 "packets_out_dropped": {
75 "metric_name": "osm_packets_out_dropped",
76 "metric_descr": "Number of outgoing dropped packets",
77 },
78 }
79
80 # Logging
81 logger = logging.getLogger("airflow.task")
82
83
84 def get_all_vim():
85 """Get VIMs from MongoDB"""
86 logger.info("Getting VIM list")
87
88 cfg = Config()
89 logger.info(cfg.conf)
90 common_db = CommonDbClient(cfg)
91 vim_accounts = common_db.get_vim_accounts()
92 vim_list = []
93 for vim in vim_accounts:
94 logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
95 vim_list.append(
96 {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
97 )
98
99 logger.info(vim_list)
100 logger.info("Getting VIM list OK")
101 return vim_list
102
103
104 def create_dag(dag_id, dag_number, dag_description, vim_id):
105 dag = DAG(
106 dag_id,
107 catchup=False,
108 default_args={
109 "depends_on_past": False,
110 "retries": 1,
111 # "retry_delay": timedelta(minutes=1),
112 "retry_delay": timedelta(seconds=10),
113 },
114 description=dag_description,
115 is_paused_upon_creation=False,
116 schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
117 start_date=datetime(2022, 1, 1),
118 tags=["osm", "vdu"],
119 )
120
121 with dag:
122
123 @task(task_id="extract_metrics_from_vnfrs")
124 def extract_metrics_from_vnfrs(vim_id: str):
125 """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
126
127 # Get VNFDs that include "monitoring-parameter" from MongoDB
128 cfg = Config()
129 common_db = CommonDbClient(cfg)
130 logger.info("Getting VNFDs with monitoring parameters from MongoDB")
131 vnfd_list = common_db.get_monitoring_vnfds()
132 # Get VNFR list from MongoDB
133 logger.info("Getting VNFR list from MongoDB")
134 vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id)
135 # Only read metrics if ns state is one of the nsAllowedStatesSet
136 nsAllowedStatesSet = {"INSTANTIATED"}
137 metric_list = []
138 for vnfr in vnfr_list:
139 if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet:
140 continue
141 # Check if VNFR is in "monitoring-parameter" VNFDs list
142 vnfd_id = vnfr["vnfd-id"]
143 vnfd = next(
144 (item for item in vnfd_list if item["_id"] == vnfd_id), None
145 )
146 if not vnfd:
147 continue
148 ns_id = vnfr["nsr-id-ref"]
149 vnf_index = vnfr["member-vnf-index-ref"]
150 logger.info(
151 f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
152 )
153 project_list = vnfr.get("_admin", {}).get("projects_read", [])
154 project_id = "None"
155 if project_list:
156 project_id = project_list[0]
157 for vdur in vnfr.get("vdur", []):
158 vim_info = vdur.get("vim_info")
159 if not vim_info:
160 logger.error("Error: vim_info not available in vdur")
161 continue
162 if len(vim_info) != 1:
163 logger.error("Error: more than one vim_info in vdur")
164 continue
165 vim_id = next(iter(vim_info))[4:]
166 vm_id = vdur.get("vim-id")
167 if not vm_id:
168 logger.error("Error: vim-id not available in vdur")
169 continue
170 vdu_name = vdur.get("name", "UNKNOWN")
171 vdu = next(
172 filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"])
173 )
174 if "monitoring-parameter" not in vdu:
175 logger.error("Error: no monitoring-parameter in descriptor")
176 continue
177 for param in vdu["monitoring-parameter"]:
178 metric_name = param["performance-metric"]
179 metric_id = param["id"]
180 metric = {
181 "metric": metric_name,
182 "metric_id": metric_id,
183 "vm_id": vm_id,
184 "ns_id": ns_id,
185 "project_id": project_id,
186 "vdu_name": vdu_name,
187 "vnf_member_index": vnf_index,
188 "vdu_id": vdu["id"],
189 }
190 metric_list.append(metric)
191
192 logger.info(f"Metrics to collect: {len(metric_list)}")
193 return metric_list
194
195 @task(task_id="split_metrics")
196 def split_metrics(metric_list: List[Dict]):
197 """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
198 n_metrics = len(metric_list)
199 if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK:
200 return [metric_list]
201 metrics_per_chunk = ceil(
202 n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK)
203 )
204 logger.info(
205 f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
206 )
207 chunks = []
208 for i in range(0, n_metrics, metrics_per_chunk):
209 chunks.append(metric_list[i : i + metrics_per_chunk])
210 return chunks
211
212 @task(task_id="collect_metrics")
213 def collect_metrics(vim_id: str, metric_list: List[Dict]):
214 """Collect servers metrics"""
215 if not metric_list:
216 return []
217
218 # Get VIM account info from MongoDB
219 logger.info(f"Reading VIM info, id: {vim_id}")
220 cfg = Config()
221 common_db = CommonDbClient(cfg)
222 vim_account = common_db.get_vim_account(vim_account_id=vim_id)
223 # Create VIM metrics collector
224 vim_type = vim_account["vim_type"]
225 if "config" in vim_account and "vim_type" in vim_account["config"]:
226 vim_type = vim_account["config"]["vim_type"].lower()
227 if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
228 vim_type = "openstack"
229 if vim_type == "openstack" or vim_type == "vio":
230 collector = OpenStackCollector(vim_account)
231 elif vim_type == "azure":
232 collector = AzureCollector(vim_account)
233 elif vim_type == "gcp":
234 collector = GcpCollector(vim_account)
235 else:
236 logger.error(f"VIM type '{vim_type}' not supported")
237 return None
238 # Get metrics
239 results = []
240 if collector:
241 results = collector.collect_metrics(metric_list)
242 logger.info(results)
243 return results
244
245 @task(task_id="send_prometheus")
246 def send_prometheus(metric_lists: List[List[Dict]]):
247 """Send servers metrics to Prometheus Push Gateway"""
248 logger.info(metric_lists)
249
250 # Define Prometheus metrics
251 registry = CollectorRegistry()
252 prom_metrics = {}
253 prom_metrics_keys = PROMETHEUS_METRICS.keys()
254 for key in prom_metrics_keys:
255 prom_metrics[key] = Gauge(
256 PROMETHEUS_METRICS[key]["metric_name"],
257 PROMETHEUS_METRICS[key]["metric_descr"],
258 labelnames=[
259 "metric_id",
260 "ns_id",
261 "project_id",
262 "vnf_member_index",
263 "vm_id",
264 "vim_id",
265 "vdu_name",
266 "vdu_id",
267 ],
268 registry=registry,
269 )
270
271 for metric_list in metric_lists:
272 for metric in metric_list:
273 metric_name = metric["metric"]
274 metric_id = metric["metric_id"]
275 value = metric["value"]
276 vm_id = metric["vm_id"]
277 vm_name = metric.get("vdu_name", "")
278 ns_id = metric["ns_id"]
279 project_id = metric["project_id"]
280 vnf_index = metric["vnf_member_index"]
281 vdu_id = metric["vdu_id"]
282 logger.info(
283 f" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
284 )
285 if metric_name in prom_metrics_keys:
286 prom_metrics[metric_name].labels(
287 metric_id,
288 ns_id,
289 project_id,
290 vnf_index,
291 vm_id,
292 vim_id,
293 vm_name,
294 vdu_id,
295 ).set(value)
296
297 # Push to Prometheus
298 push_to_gateway(
299 gateway=PROMETHEUS_PUSHGW,
300 job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
301 registry=registry,
302 )
303 return
304
305 chunks = split_metrics(extract_metrics_from_vnfrs(vim_id))
306 send_prometheus(
307 collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks)
308 )
309
310 return dag
311
312
313 vim_list = get_all_vim()
314 for index, vim in enumerate(vim_list):
315 vim_type = vim["vim_type"]
316 if vim_type in SUPPORTED_VIM_TYPES:
317 vim_id = vim["_id"]
318 vim_name = vim["name"]
319 dag_description = f"Dag for collecting VM metrics from VIM {vim_name}"
320 dag_id = f"vm_metrics_vim_{vim_id}"
321 logger.info(f"Creating DAG {dag_id}")
322 globals()[dag_id] = create_dag(
323 dag_id=dag_id,
324 dag_number=index,
325 dag_description=dag_description,
326 vim_id=vim_id,
327 )
328 else:
329 logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics")