Update multivim_vim_status.py to append osm to metric_name to be BWC
[osm/NG-SA.git] / src / osm_ngsa / dags / multivim_vm_metrics.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime import datetime, timedelta
18 import logging
19 from math import ceil
20 from typing import Dict, List
21
22 from airflow import DAG
23 from airflow.decorators import task
24 from osm_mon.core.common_db import CommonDbClient
25 from osm_mon.core.config import Config
26 from osm_mon.vim_connectors.azure import AzureCollector
27 from osm_mon.vim_connectors.openstack import OpenStackCollector
28 from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
29
30
31 SCHEDULE_INTERVAL = 5
32 COLLECTOR_MAX_METRICS_PER_TASK = 100
33 SUPPORTED_VIM_TYPES = ["openstack", "vio", "azure"]
34 PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
35 PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
36 PROMETHEUS_METRICS = {
37 "cpu_utilization": {
38 "metric_name": "osm_cpu_utilization",
39 "metric_descr": "CPU usage percentage",
40 },
41 "average_memory_utilization": {
42 "metric_name": "osm_average_memory_utilization",
43 "metric_descr": "Volume of RAM in MB used by the VM",
44 },
45 "disk_read_ops": {
46 "metric_name": "osm_disk_read_ops",
47 "metric_descr": "Number of read requests",
48 },
49 "disk_write_ops": {
50 "metric_name": "osm_disk_write_ops",
51 "metric_descr": "Number of write requests",
52 },
53 "disk_read_bytes": {
54 "metric_name": "osm_disk_read_bytes",
55 "metric_descr": "Volume of reads in bytes",
56 },
57 "disk_write_bytes": {
58 "metric_name": "osm_disk_write_bytes",
59 "metric_descr": "Volume of writes in bytes",
60 },
61 "packets_received": {
62 "metric_name": "osm_packets_received",
63 "metric_descr": "Number of incoming packets",
64 },
65 "packets_sent": {
66 "metric_name": "osm_packets_sent",
67 "metric_descr": "Number of outgoing packets",
68 },
69 "packets_in_dropped": {
70 "metric_name": "osm_packets_in_dropped",
71 "metric_descr": "Number of incoming dropped packets",
72 },
73 "packets_out_dropped": {
74 "metric_name": "osm_packets_out_dropped",
75 "metric_descr": "Number of outgoing dropped packets",
76 },
77 }
78
79 # Logging
80 logger = logging.getLogger("airflow.task")
81
82
83 def get_all_vim():
84 """Get VIMs from MongoDB"""
85 logger.info("Getting VIM list")
86
87 cfg = Config()
88 logger.info(cfg.conf)
89 common_db = CommonDbClient(cfg)
90 vim_accounts = common_db.get_vim_accounts()
91 vim_list = []
92 for vim in vim_accounts:
93 logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
94 vim_list.append(
95 {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
96 )
97
98 logger.info(vim_list)
99 logger.info("Getting VIM list OK")
100 return vim_list
101
102
103 def create_dag(dag_id, dag_number, dag_description, vim_id):
104 dag = DAG(
105 dag_id,
106 catchup=False,
107 default_args={
108 "depends_on_past": False,
109 "retries": 1,
110 # "retry_delay": timedelta(minutes=1),
111 "retry_delay": timedelta(seconds=10),
112 },
113 description=dag_description,
114 is_paused_upon_creation=False,
115 schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
116 start_date=datetime(2022, 1, 1),
117 tags=["osm", "vdu"],
118 )
119
120 with dag:
121
122 @task(task_id="extract_metrics_from_vnfrs")
123 def extract_metrics_from_vnfrs(vim_id: str):
124 """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
125
126 # Get VNFDs that include "monitoring-parameter" from MongoDB
127 cfg = Config()
128 common_db = CommonDbClient(cfg)
129 logger.info("Getting VNFDs with monitoring parameters from MongoDB")
130 vnfd_list = common_db.get_monitoring_vnfds()
131 # Get VNFR list from MongoDB
132 logger.info("Getting VNFR list from MongoDB")
133 vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id)
134 # Only read metrics if ns state is one of the nsAllowedStatesSet
135 nsAllowedStatesSet = {"INSTANTIATED"}
136 metric_list = []
137 for vnfr in vnfr_list:
138 if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet:
139 continue
140 # Check if VNFR is in "monitoring-parameter" VNFDs list
141 vnfd_id = vnfr["vnfd-id"]
142 vnfd = next(
143 (item for item in vnfd_list if item["_id"] == vnfd_id), None
144 )
145 if not vnfd:
146 continue
147 ns_id = vnfr["nsr-id-ref"]
148 vnf_index = vnfr["member-vnf-index-ref"]
149 logger.info(
150 f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
151 )
152 project_list = vnfr.get("_admin", {}).get("projects_read", [])
153 project_id = "None"
154 if project_list:
155 project_id = project_list[0]
156 for vdur in vnfr.get("vdur", []):
157 vim_info = vdur.get("vim_info")
158 if not vim_info:
159 logger.error("Error: vim_info not available in vdur")
160 continue
161 if len(vim_info) != 1:
162 logger.error("Error: more than one vim_info in vdur")
163 continue
164 vim_id = next(iter(vim_info))[4:]
165 vm_id = vdur.get("vim-id")
166 if not vm_id:
167 logger.error("Error: vim-id not available in vdur")
168 continue
169 vdu_name = vdur.get("name", "UNKNOWN")
170 vdu = next(
171 filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"])
172 )
173 if "monitoring-parameter" not in vdu:
174 logger.error("Error: no monitoring-parameter in descriptor")
175 continue
176 for param in vdu["monitoring-parameter"]:
177 metric_name = param["performance-metric"]
178 metric_id = param["id"]
179 metric = {
180 "metric": metric_name,
181 "metric_id": metric_id,
182 "vm_id": vm_id,
183 "ns_id": ns_id,
184 "project_id": project_id,
185 "vdu_name": vdu_name,
186 "vnf_member_index": vnf_index,
187 "vdu_id": vdu["id"],
188 }
189 metric_list.append(metric)
190
191 logger.info(f"Metrics to collect: {len(metric_list)}")
192 return metric_list
193
194 @task(task_id="split_metrics")
195 def split_metrics(metric_list: List[Dict]):
196 """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
197 n_metrics = len(metric_list)
198 if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK:
199 return [metric_list]
200 metrics_per_chunk = ceil(
201 n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK)
202 )
203 logger.info(
204 f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
205 )
206 chunks = []
207 for i in range(0, n_metrics, metrics_per_chunk):
208 chunks.append(metric_list[i : i + metrics_per_chunk])
209 return chunks
210
211 @task(task_id="collect_metrics")
212 def collect_metrics(vim_id: str, metric_list: List[Dict]):
213 """Collect servers metrics"""
214 if not metric_list:
215 return []
216
217 # Get VIM account info from MongoDB
218 logger.info(f"Reading VIM info, id: {vim_id}")
219 cfg = Config()
220 common_db = CommonDbClient(cfg)
221 vim_account = common_db.get_vim_account(vim_account_id=vim_id)
222 # Create VIM metrics collector
223 vim_type = vim_account["vim_type"]
224 if "config" in vim_account and "vim_type" in vim_account["config"]:
225 vim_type = vim_account["config"]["vim_type"].lower()
226 if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
227 vim_type = "openstack"
228 if vim_type == "openstack":
229 collector = OpenStackCollector(vim_account)
230 elif vim_type == "azure":
231 collector = AzureCollector(vim_account)
232 else:
233 logger.error(f"VIM type '{vim_type}' not supported")
234 return None
235 # Get metrics
236 results = []
237 if collector:
238 results = collector.collect_metrics(metric_list)
239 logger.info(results)
240 return results
241
242 @task(task_id="send_prometheus")
243 def send_prometheus(metric_lists: List[List[Dict]]):
244 """Send servers metrics to Prometheus Push Gateway"""
245 logger.info(metric_lists)
246
247 # Define Prometheus metrics
248 registry = CollectorRegistry()
249 prom_metrics = {}
250 prom_metrics_keys = PROMETHEUS_METRICS.keys()
251 for key in prom_metrics_keys:
252 prom_metrics[key] = Gauge(
253 PROMETHEUS_METRICS[key]["metric_name"],
254 PROMETHEUS_METRICS[key]["metric_descr"],
255 labelnames=[
256 "metric_id",
257 "ns_id",
258 "project_id",
259 "vnf_member_index",
260 "vm_id",
261 "vim_id",
262 "vdu_name",
263 "vdu_id",
264 ],
265 registry=registry,
266 )
267
268 for metric_list in metric_lists:
269 for metric in metric_list:
270 metric_name = metric["metric"]
271 metric_id = metric["metric_id"]
272 value = metric["value"]
273 vm_id = metric["vm_id"]
274 vm_name = metric.get("vdu_name", "")
275 ns_id = metric["ns_id"]
276 project_id = metric["project_id"]
277 vnf_index = metric["vnf_member_index"]
278 vdu_id = metric["vdu_id"]
279 logger.info(
280 f" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
281 )
282 if metric_name in prom_metrics_keys:
283 prom_metrics[metric_name].labels(
284 metric_id,
285 ns_id,
286 project_id,
287 vnf_index,
288 vm_id,
289 vim_id,
290 vm_name,
291 vdu_id,
292 ).set(value)
293
294 # Push to Prometheus
295 push_to_gateway(
296 gateway=PROMETHEUS_PUSHGW,
297 job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
298 registry=registry,
299 )
300 return
301
302 chunks = split_metrics(extract_metrics_from_vnfrs(vim_id))
303 send_prometheus(
304 collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks)
305 )
306
307 return dag
308
309
310 vim_list = get_all_vim()
311 for index, vim in enumerate(vim_list):
312 vim_type = vim["vim_type"]
313 if vim_type in SUPPORTED_VIM_TYPES:
314 vim_id = vim["_id"]
315 vim_name = vim["name"]
316 dag_description = f"Dag for collecting VM metrics from VIM {vim_name}"
317 dag_id = f"vm_metrics_vim_{vim_id}"
318 logger.info(f"Creating DAG {dag_id}")
319 globals()[dag_id] = create_dag(
320 dag_id=dag_id,
321 dag_number=index,
322 dag_description=dag_description,
323 vim_id=vim_id,
324 )
325 else:
326 logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics")