6f03292da65a3f8ed80bc69a855182f4234d973c
[osm/NG-SA.git] / src / osm_ngsa / dags / multivim_vm_metrics.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime import datetime, timedelta
18 import logging
19 from math import ceil
20 from typing import Dict, List
21
22 from airflow import DAG
23 from airflow.decorators import task
24 from osm_mon.core.common_db import CommonDbClient
25 from osm_mon.core.config import Config
26 from osm_mon.vim_connectors.openstack import OpenStackCollector
27 from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
28
29
30 SCHEDULE_INTERVAL = 5
31 COLLECTOR_MAX_METRICS_PER_TASK = 100
32 SUPPORTED_VIM_TYPES = ["openstack", "vio"]
33 PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
34 PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
35 PROMETHEUS_METRICS = {
36 "cpu_utilization": {
37 "metric_name": "cpu_utilization",
38 "metric_descr": "CPU usage percentage",
39 },
40 "average_memory_utilization": {
41 "metric_name": "average_memory_utilization",
42 "metric_descr": "Volume of RAM in MB used by the VM",
43 },
44 "disk_read_ops": {
45 "metric_name": "disk_read_ops",
46 "metric_descr": "Number of read requests",
47 },
48 "disk_write_ops": {
49 "metric_name": "disk_write_ops",
50 "metric_descr": "Number of write requests",
51 },
52 "disk_read_bytes": {
53 "metric_name": "disk_read_bytes",
54 "metric_descr": "Volume of reads in bytes",
55 },
56 "disk_write_bytes": {
57 "metric_name": "disk_write_bytes",
58 "metric_descr": "Volume of writes in bytes",
59 },
60 "packets_received": {
61 "metric_name": "packets_received",
62 "metric_descr": "Number of incoming packets",
63 },
64 "packets_sent": {
65 "metric_name": "packets_sent",
66 "metric_descr": "Number of outgoing packets",
67 },
68 "packets_in_dropped": {
69 "metric_name": "packets_in_dropped",
70 "metric_descr": "Number of incoming dropped packets",
71 },
72 "packets_out_dropped": {
73 "metric_name": "packets_out_dropped",
74 "metric_descr": "Number of outgoing dropped packets",
75 },
76 }
77
78 # Logging
79 logger = logging.getLogger("airflow.task")
80
81
82 def get_all_vim():
83 """Get VIMs from MongoDB"""
84 logger.info("Getting VIM list")
85
86 cfg = Config()
87 logger.info(cfg.conf)
88 common_db = CommonDbClient(cfg)
89 vim_accounts = common_db.get_vim_accounts()
90 vim_list = []
91 for vim in vim_accounts:
92 logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
93 vim_list.append(
94 {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
95 )
96
97 logger.info(vim_list)
98 logger.info("Getting VIM list OK")
99 return vim_list
100
101
102 def create_dag(dag_id, dag_number, dag_description, vim_id):
103 dag = DAG(
104 dag_id,
105 catchup=False,
106 default_args={
107 "depends_on_past": False,
108 "retries": 1,
109 # "retry_delay": timedelta(minutes=1),
110 "retry_delay": timedelta(seconds=10),
111 },
112 description=dag_description,
113 is_paused_upon_creation=False,
114 # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
115 schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
116 start_date=datetime(2022, 1, 1),
117 tags=["osm", "vdu"],
118 )
119
120 with dag:
121
122 @task(task_id="extract_metrics_from_vnfrs")
123 def extract_metrics_from_vnfrs(vim_id: str):
124 """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
125
126 # Get VNFDs that include "monitoring-parameter" from MongoDB
127 cfg = Config()
128 common_db = CommonDbClient(cfg)
129 logger.info("Getting VNFDs with monitoring parameters from MongoDB")
130 vnfd_list = common_db.get_monitoring_vnfds()
131 # Get VNFR list from MongoDB
132 logger.info("Getting VNFR list from MongoDB")
133 vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id)
134 # Only read metrics if ns state is one of the nsAllowedStatesSet
135 nsAllowedStatesSet = {"INSTANTIATED"}
136 metric_list = []
137 for vnfr in vnfr_list:
138 if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet:
139 continue
140 # Check if VNFR is in "monitoring-parameter" VNFDs list
141 vnfd_id = vnfr["vnfd-id"]
142 vnfd = next(
143 (item for item in vnfd_list if item["_id"] == vnfd_id), None
144 )
145 if not vnfd:
146 continue
147 ns_id = vnfr["nsr-id-ref"]
148 vnf_index = vnfr["member-vnf-index-ref"]
149 logger.info(
150 f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
151 )
152 project_list = vnfr.get("_admin", {}).get("projects_read", [])
153 project_id = "None"
154 if project_list:
155 project_id = project_list[0]
156 for vdur in vnfr.get("vdur", []):
157 vim_info = vdur.get("vim_info")
158 if not vim_info:
159 logger.error("Error: vim_info not available in vdur")
160 continue
161 if len(vim_info) != 1:
162 logger.error("Error: more than one vim_info in vdur")
163 continue
164 vim_id = next(iter(vim_info))[4:]
165 vm_id = vdur.get("vim-id")
166 if not vm_id:
167 logger.error("Error: vim-id not available in vdur")
168 continue
169 vdu_name = vdur.get("name", "UNKNOWN")
170 vdu = next(
171 filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"])
172 )
173 if "monitoring-parameter" not in vdu:
174 logger.error("Error: no monitoring-parameter in descriptor")
175 continue
176 for param in vdu["monitoring-parameter"]:
177 metric_name = param["performance-metric"]
178 metric_id = param["id"]
179 metric = {
180 "metric": metric_name,
181 "metric_id": metric_id,
182 "vm_id": vm_id,
183 "ns_id": ns_id,
184 "project_id": project_id,
185 "vdu_name": vdu_name,
186 "vnf_member_index": vnf_index,
187 "vdu_id": vdu["id"],
188 }
189 metric_list.append(metric)
190
191 logger.info(f"Metrics to collect: {len(metric_list)}")
192 return metric_list
193
194 @task(task_id="split_metrics")
195 def split_metrics(metric_list: List[Dict]):
196 """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
197 n_metrics = len(metric_list)
198 if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK:
199 return [metric_list]
200 metrics_per_chunk = ceil(
201 n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK)
202 )
203 logger.info(
204 f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
205 )
206 chunks = []
207 for i in range(0, n_metrics, metrics_per_chunk):
208 chunks.append(metric_list[i : i + metrics_per_chunk])
209 return chunks
210
211 @task(task_id="collect_metrics")
212 def collect_metrics(vim_id: str, metric_list: List[Dict]):
213 """Collect servers metrics"""
214 if not metric_list:
215 return []
216
217 # Get VIM account info from MongoDB
218 logger.info(f"Reading VIM info, id: {vim_id}")
219 cfg = Config()
220 common_db = CommonDbClient(cfg)
221 vim_account = common_db.get_vim_account(vim_account_id=vim_id)
222 # Create VIM metrics collector
223 vim_type = vim_account["vim_type"]
224 if "config" in vim_account and "vim_type" in vim_account["config"]:
225 vim_type = vim_account["config"]["vim_type"].lower()
226 if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
227 vim_type = "openstack"
228 if vim_type == "openstack":
229 collector = OpenStackCollector(vim_account)
230 else:
231 logger.error(f"VIM type '{vim_type}' not supported")
232 return None
233 # Get metrics
234 results = []
235 if collector:
236 results = collector.collect_metrics(metric_list)
237 logger.info(results)
238 return results
239
240 @task(task_id="send_prometheus")
241 def send_prometheus(metric_lists: List[List[Dict]]):
242 """Send servers metrics to Prometheus Push Gateway"""
243 logger.info(metric_lists)
244
245 # Define Prometheus metrics
246 registry = CollectorRegistry()
247 prom_metrics = {}
248 prom_metrics_keys = PROMETHEUS_METRICS.keys()
249 for key in prom_metrics_keys:
250 prom_metrics[key] = Gauge(
251 PROMETHEUS_METRICS[key]["metric_name"],
252 PROMETHEUS_METRICS[key]["metric_descr"],
253 labelnames=[
254 "metric_id",
255 "ns_id",
256 "project_id",
257 "vnf_member_index",
258 "vm_id",
259 "vim_id",
260 "vdu_name",
261 "vdu_id",
262 ],
263 registry=registry,
264 )
265
266 for metric_list in metric_lists:
267 for metric in metric_list:
268 metric_name = metric["metric"]
269 metric_id = metric["metric_id"]
270 value = metric["value"]
271 vm_id = metric["vm_id"]
272 vm_name = metric.get("vdu_name", "")
273 ns_id = metric["ns_id"]
274 project_id = metric["project_id"]
275 vnf_index = metric["vnf_member_index"]
276 vdu_id = metric["vdu_id"]
277 logger.info(
278 f" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
279 )
280 if metric_name in prom_metrics_keys:
281 prom_metrics[metric_name].labels(
282 metric_id,
283 ns_id,
284 project_id,
285 vnf_index,
286 vm_id,
287 vim_id,
288 vm_name,
289 vdu_id,
290 ).set(value)
291
292 # Push to Prometheus
293 push_to_gateway(
294 gateway=PROMETHEUS_PUSHGW,
295 job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
296 registry=registry,
297 )
298 return
299
300 chunks = split_metrics(extract_metrics_from_vnfrs(vim_id))
301 send_prometheus(
302 collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks)
303 )
304
305 return dag
306
307
308 vim_list = get_all_vim()
309 for index, vim in enumerate(vim_list):
310 vim_type = vim["vim_type"]
311 if vim_type in SUPPORTED_VIM_TYPES:
312 vim_id = vim["_id"]
313 vim_name = vim["name"]
314 dag_description = f"Dag for collecting VM metrics from VIM {vim_name}"
315 dag_id = f"vm_metrics_vim_{vim_id}"
316 logger.info(f"Creating DAG {dag_id}")
317 globals()[dag_id] = create_dag(
318 dag_id=dag_id,
319 dag_number=index,
320 dag_description=dag_description,
321 vim_id=vim_id,
322 )
323 else:
324 logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics")