1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime
import datetime
, timedelta
20 from typing
import Dict
, List
22 from airflow
import DAG
23 from airflow
.decorators
import task
24 from osm_mon
.core
.common_db
import CommonDbClient
25 from osm_mon
.core
.config
import Config
26 from osm_mon
.vim_connectors
.openstack
import OpenStackCollector
27 from prometheus_client
import CollectorRegistry
, Gauge
, push_to_gateway
31 COLLECTOR_MAX_METRICS_PER_TASK
= 100
32 SUPPORTED_VIM_TYPES
= ["openstack", "vio"]
33 PROMETHEUS_PUSHGW
= "pushgateway-prometheus-pushgateway:9091"
34 PROMETHEUS_JOB_PREFIX
= "airflow_osm_vm_metrics_"
35 PROMETHEUS_METRICS
= {
37 "metric_name": "cpu_utilization",
38 "metric_descr": "CPU usage percentage",
40 "average_memory_utilization": {
41 "metric_name": "average_memory_utilization",
42 "metric_descr": "Volume of RAM in MB used by the VM",
45 "metric_name": "disk_read_ops",
46 "metric_descr": "Number of read requests",
49 "metric_name": "disk_write_ops",
50 "metric_descr": "Number of write requests",
53 "metric_name": "disk_read_bytes",
54 "metric_descr": "Volume of reads in bytes",
57 "metric_name": "disk_write_bytes",
58 "metric_descr": "Volume of writes in bytes",
61 "metric_name": "packets_received",
62 "metric_descr": "Number of incoming packets",
65 "metric_name": "packets_sent",
66 "metric_descr": "Number of outgoing packets",
68 "packets_in_dropped": {
69 "metric_name": "packets_in_dropped",
70 "metric_descr": "Number of incoming dropped packets",
72 "packets_out_dropped": {
73 "metric_name": "packets_out_dropped",
74 "metric_descr": "Number of outgoing dropped packets",
79 logger
= logging
.getLogger("airflow.task")
83 """Get VIMs from MongoDB"""
84 logger
.info("Getting VIM list")
88 common_db
= CommonDbClient(cfg
)
89 vim_accounts
= common_db
.get_vim_accounts()
91 for vim
in vim_accounts
:
92 logger
.info(f
'Read VIM {vim["_id"]} ({vim["name"]})')
94 {"_id": vim
["_id"], "name": vim
["name"], "vim_type": vim
["vim_type"]}
98 logger
.info("Getting VIM list OK")
102 def create_dag(dag_id
, dag_number
, dag_description
, vim_id
):
107 "depends_on_past": False,
109 # "retry_delay": timedelta(minutes=1),
110 "retry_delay": timedelta(seconds
=10),
112 description
=dag_description
,
113 is_paused_upon_creation
=False,
114 # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
115 schedule_interval
=f
"*/{SCHEDULE_INTERVAL} * * * *",
116 start_date
=datetime(2022, 1, 1),
122 @task(task_id
="extract_metrics_from_vnfrs")
123 def extract_metrics_from_vnfrs(vim_id
: str):
124 """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
126 # Get VNFDs that include "monitoring-parameter" from MongoDB
128 common_db
= CommonDbClient(cfg
)
129 logger
.info("Getting VNFDs with monitoring parameters from MongoDB")
130 vnfd_list
= common_db
.get_monitoring_vnfds()
131 # Get VNFR list from MongoDB
132 logger
.info("Getting VNFR list from MongoDB")
133 vnfr_list
= common_db
.get_vnfrs(vim_account_id
=vim_id
)
134 # Only read metrics if ns state is one of the nsAllowedStatesSet
135 nsAllowedStatesSet
= {"INSTANTIATED"}
137 for vnfr
in vnfr_list
:
138 if vnfr
["_admin"]["nsState"] not in nsAllowedStatesSet
:
140 # Check if VNFR is in "monitoring-parameter" VNFDs list
141 vnfd_id
= vnfr
["vnfd-id"]
143 (item
for item
in vnfd_list
if item
["_id"] == vnfd_id
), None
147 ns_id
= vnfr
["nsr-id-ref"]
148 vnf_index
= vnfr
["member-vnf-index-ref"]
150 f
"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
152 project_list
= vnfr
.get("_admin", {}).get("projects_read", [])
155 project_id
= project_list
[0]
156 for vdur
in vnfr
.get("vdur", []):
157 vim_info
= vdur
.get("vim_info")
159 logger
.error("Error: vim_info not available in vdur")
161 if len(vim_info
) != 1:
162 logger
.error("Error: more than one vim_info in vdur")
164 vim_id
= next(iter(vim_info
))[4:]
165 vm_id
= vdur
.get("vim-id")
167 logger
.error("Error: vim-id not available in vdur")
169 vdu_name
= vdur
.get("name", "UNKNOWN")
171 filter(lambda vdu
: vdu
["id"] == vdur
["vdu-id-ref"], vnfd
["vdu"])
173 if "monitoring-parameter" not in vdu
:
174 logger
.error("Error: no monitoring-parameter in descriptor")
176 for param
in vdu
["monitoring-parameter"]:
177 metric_name
= param
["performance-metric"]
178 metric_id
= param
["id"]
180 "metric": metric_name
,
181 "metric_id": metric_id
,
184 "project_id": project_id
,
185 "vdu_name": vdu_name
,
186 "vnf_member_index": vnf_index
,
189 metric_list
.append(metric
)
191 logger
.info(f
"Metrics to collect: {len(metric_list)}")
194 @task(task_id
="split_metrics")
195 def split_metrics(metric_list
: List
[Dict
]):
196 """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
197 n_metrics
= len(metric_list
)
198 if n_metrics
<= COLLECTOR_MAX_METRICS_PER_TASK
:
200 metrics_per_chunk
= ceil(
201 n_metrics
/ ceil(n_metrics
/ COLLECTOR_MAX_METRICS_PER_TASK
)
204 f
"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
207 for i
in range(0, n_metrics
, metrics_per_chunk
):
208 chunks
.append(metric_list
[i
: i
+ metrics_per_chunk
])
211 @task(task_id
="collect_metrics")
212 def collect_metrics(vim_id
: str, metric_list
: List
[Dict
]):
213 """Collect servers metrics"""
217 # Get VIM account info from MongoDB
218 logger
.info(f
"Reading VIM info, id: {vim_id}")
220 common_db
= CommonDbClient(cfg
)
221 vim_account
= common_db
.get_vim_account(vim_account_id
=vim_id
)
222 # Create VIM metrics collector
223 vim_type
= vim_account
["vim_type"]
224 if "config" in vim_account
and "vim_type" in vim_account
["config"]:
225 vim_type
= vim_account
["config"]["vim_type"].lower()
226 if vim_type
== "vio" and "vrops_site" not in vim_account
["config"]:
227 vim_type
= "openstack"
228 if vim_type
== "openstack":
229 collector
= OpenStackCollector(vim_account
)
231 logger
.error(f
"VIM type '{vim_type}' not supported")
236 results
= collector
.collect_metrics(metric_list
)
240 @task(task_id
="send_prometheus")
241 def send_prometheus(metric_lists
: List
[List
[Dict
]]):
242 """Send servers metrics to Prometheus Push Gateway"""
243 logger
.info(metric_lists
)
245 # Define Prometheus metrics
246 registry
= CollectorRegistry()
248 prom_metrics_keys
= PROMETHEUS_METRICS
.keys()
249 for key
in prom_metrics_keys
:
250 prom_metrics
[key
] = Gauge(
251 PROMETHEUS_METRICS
[key
]["metric_name"],
252 PROMETHEUS_METRICS
[key
]["metric_descr"],
266 for metric_list
in metric_lists
:
267 for metric
in metric_list
:
268 metric_name
= metric
["metric"]
269 metric_id
= metric
["metric_id"]
270 value
= metric
["value"]
271 vm_id
= metric
["vm_id"]
272 vm_name
= metric
.get("vdu_name", "")
273 ns_id
= metric
["ns_id"]
274 project_id
= metric
["project_id"]
275 vnf_index
= metric
["vnf_member_index"]
276 vdu_id
= metric
["vdu_id"]
278 f
" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
280 if metric_name
in prom_metrics_keys
:
281 prom_metrics
[metric_name
].labels(
294 gateway
=PROMETHEUS_PUSHGW
,
295 job
=f
"{PROMETHEUS_JOB_PREFIX}{vim_id}",
300 chunks
= split_metrics(extract_metrics_from_vnfrs(vim_id
))
302 collect_metrics
.partial(vim_id
=vim_id
).expand(metric_list
=chunks
)
308 vim_list
= get_all_vim()
309 for index
, vim
in enumerate(vim_list
):
310 vim_type
= vim
["vim_type"]
311 if vim_type
in SUPPORTED_VIM_TYPES
:
313 vim_name
= vim
["name"]
314 dag_description
= f
"Dag for collecting VM metrics from VIM {vim_name}"
315 dag_id
= f
"vm_metrics_vim_{vim_id}"
316 logger
.info(f
"Creating DAG {dag_id}")
317 globals()[dag_id
] = create_dag(
320 dag_description
=dag_description
,
324 logger
.info(f
"VIM type '{vim_type}' not supported for collecting VM metrics")