1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime
import datetime
, timedelta
20 from typing
import Dict
, List
22 from airflow
import DAG
23 from airflow
.decorators
import task
24 from osm_mon
.core
.common_db
import CommonDbClient
25 from osm_mon
.core
.config
import Config
26 from osm_mon
.vim_connectors
.azure
import AzureCollector
27 from osm_mon
.vim_connectors
.openstack
import OpenStackCollector
28 from prometheus_client
import CollectorRegistry
, Gauge
, push_to_gateway
32 COLLECTOR_MAX_METRICS_PER_TASK
= 100
33 SUPPORTED_VIM_TYPES
= ["openstack", "vio", "azure"]
34 PROMETHEUS_PUSHGW
= "pushgateway-prometheus-pushgateway:9091"
35 PROMETHEUS_JOB_PREFIX
= "airflow_osm_vm_metrics_"
36 PROMETHEUS_METRICS
= {
38 "metric_name": "osm_cpu_utilization",
39 "metric_descr": "CPU usage percentage",
41 "average_memory_utilization": {
42 "metric_name": "osm_average_memory_utilization",
43 "metric_descr": "Volume of RAM in MB used by the VM",
46 "metric_name": "osm_disk_read_ops",
47 "metric_descr": "Number of read requests",
50 "metric_name": "osm_disk_write_ops",
51 "metric_descr": "Number of write requests",
54 "metric_name": "osm_disk_read_bytes",
55 "metric_descr": "Volume of reads in bytes",
58 "metric_name": "osm_disk_write_bytes",
59 "metric_descr": "Volume of writes in bytes",
62 "metric_name": "osm_packets_received",
63 "metric_descr": "Number of incoming packets",
66 "metric_name": "osm_packets_sent",
67 "metric_descr": "Number of outgoing packets",
69 "packets_in_dropped": {
70 "metric_name": "osm_packets_in_dropped",
71 "metric_descr": "Number of incoming dropped packets",
73 "packets_out_dropped": {
74 "metric_name": "osm_packets_out_dropped",
75 "metric_descr": "Number of outgoing dropped packets",
80 logger
= logging
.getLogger("airflow.task")
84 """Get VIMs from MongoDB"""
85 logger
.info("Getting VIM list")
89 common_db
= CommonDbClient(cfg
)
90 vim_accounts
= common_db
.get_vim_accounts()
92 for vim
in vim_accounts
:
93 logger
.info(f
'Read VIM {vim["_id"]} ({vim["name"]})')
95 {"_id": vim
["_id"], "name": vim
["name"], "vim_type": vim
["vim_type"]}
99 logger
.info("Getting VIM list OK")
103 def create_dag(dag_id
, dag_number
, dag_description
, vim_id
):
108 "depends_on_past": False,
110 # "retry_delay": timedelta(minutes=1),
111 "retry_delay": timedelta(seconds
=10),
113 description
=dag_description
,
114 is_paused_upon_creation
=False,
115 schedule_interval
=f
"*/{SCHEDULE_INTERVAL} * * * *",
116 start_date
=datetime(2022, 1, 1),
122 @task(task_id
="extract_metrics_from_vnfrs")
123 def extract_metrics_from_vnfrs(vim_id
: str):
124 """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
126 # Get VNFDs that include "monitoring-parameter" from MongoDB
128 common_db
= CommonDbClient(cfg
)
129 logger
.info("Getting VNFDs with monitoring parameters from MongoDB")
130 vnfd_list
= common_db
.get_monitoring_vnfds()
131 # Get VNFR list from MongoDB
132 logger
.info("Getting VNFR list from MongoDB")
133 vnfr_list
= common_db
.get_vnfrs(vim_account_id
=vim_id
)
134 # Only read metrics if ns state is one of the nsAllowedStatesSet
135 nsAllowedStatesSet
= {"INSTANTIATED"}
137 for vnfr
in vnfr_list
:
138 if vnfr
["_admin"]["nsState"] not in nsAllowedStatesSet
:
140 # Check if VNFR is in "monitoring-parameter" VNFDs list
141 vnfd_id
= vnfr
["vnfd-id"]
143 (item
for item
in vnfd_list
if item
["_id"] == vnfd_id
), None
147 ns_id
= vnfr
["nsr-id-ref"]
148 vnf_index
= vnfr
["member-vnf-index-ref"]
150 f
"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
152 project_list
= vnfr
.get("_admin", {}).get("projects_read", [])
155 project_id
= project_list
[0]
156 for vdur
in vnfr
.get("vdur", []):
157 vim_info
= vdur
.get("vim_info")
159 logger
.error("Error: vim_info not available in vdur")
161 if len(vim_info
) != 1:
162 logger
.error("Error: more than one vim_info in vdur")
164 vim_id
= next(iter(vim_info
))[4:]
165 vm_id
= vdur
.get("vim-id")
167 logger
.error("Error: vim-id not available in vdur")
169 vdu_name
= vdur
.get("name", "UNKNOWN")
171 filter(lambda vdu
: vdu
["id"] == vdur
["vdu-id-ref"], vnfd
["vdu"])
173 if "monitoring-parameter" not in vdu
:
174 logger
.error("Error: no monitoring-parameter in descriptor")
176 for param
in vdu
["monitoring-parameter"]:
177 metric_name
= param
["performance-metric"]
178 metric_id
= param
["id"]
180 "metric": metric_name
,
181 "metric_id": metric_id
,
184 "project_id": project_id
,
185 "vdu_name": vdu_name
,
186 "vnf_member_index": vnf_index
,
189 metric_list
.append(metric
)
191 logger
.info(f
"Metrics to collect: {len(metric_list)}")
194 @task(task_id
="split_metrics")
195 def split_metrics(metric_list
: List
[Dict
]):
196 """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
197 n_metrics
= len(metric_list
)
198 if n_metrics
<= COLLECTOR_MAX_METRICS_PER_TASK
:
200 metrics_per_chunk
= ceil(
201 n_metrics
/ ceil(n_metrics
/ COLLECTOR_MAX_METRICS_PER_TASK
)
204 f
"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
207 for i
in range(0, n_metrics
, metrics_per_chunk
):
208 chunks
.append(metric_list
[i
: i
+ metrics_per_chunk
])
211 @task(task_id
="collect_metrics")
212 def collect_metrics(vim_id
: str, metric_list
: List
[Dict
]):
213 """Collect servers metrics"""
217 # Get VIM account info from MongoDB
218 logger
.info(f
"Reading VIM info, id: {vim_id}")
220 common_db
= CommonDbClient(cfg
)
221 vim_account
= common_db
.get_vim_account(vim_account_id
=vim_id
)
222 # Create VIM metrics collector
223 vim_type
= vim_account
["vim_type"]
224 if "config" in vim_account
and "vim_type" in vim_account
["config"]:
225 vim_type
= vim_account
["config"]["vim_type"].lower()
226 if vim_type
== "vio" and "vrops_site" not in vim_account
["config"]:
227 vim_type
= "openstack"
228 if vim_type
== "openstack":
229 collector
= OpenStackCollector(vim_account
)
230 elif vim_type
== "azure":
231 collector
= AzureCollector(vim_account
)
233 logger
.error(f
"VIM type '{vim_type}' not supported")
238 results
= collector
.collect_metrics(metric_list
)
242 @task(task_id
="send_prometheus")
243 def send_prometheus(metric_lists
: List
[List
[Dict
]]):
244 """Send servers metrics to Prometheus Push Gateway"""
245 logger
.info(metric_lists
)
247 # Define Prometheus metrics
248 registry
= CollectorRegistry()
250 prom_metrics_keys
= PROMETHEUS_METRICS
.keys()
251 for key
in prom_metrics_keys
:
252 prom_metrics
[key
] = Gauge(
253 PROMETHEUS_METRICS
[key
]["metric_name"],
254 PROMETHEUS_METRICS
[key
]["metric_descr"],
268 for metric_list
in metric_lists
:
269 for metric
in metric_list
:
270 metric_name
= metric
["metric"]
271 metric_id
= metric
["metric_id"]
272 value
= metric
["value"]
273 vm_id
= metric
["vm_id"]
274 vm_name
= metric
.get("vdu_name", "")
275 ns_id
= metric
["ns_id"]
276 project_id
= metric
["project_id"]
277 vnf_index
= metric
["vnf_member_index"]
278 vdu_id
= metric
["vdu_id"]
280 f
" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
282 if metric_name
in prom_metrics_keys
:
283 prom_metrics
[metric_name
].labels(
296 gateway
=PROMETHEUS_PUSHGW
,
297 job
=f
"{PROMETHEUS_JOB_PREFIX}{vim_id}",
302 chunks
= split_metrics(extract_metrics_from_vnfrs(vim_id
))
304 collect_metrics
.partial(vim_id
=vim_id
).expand(metric_list
=chunks
)
310 vim_list
= get_all_vim()
311 for index
, vim
in enumerate(vim_list
):
312 vim_type
= vim
["vim_type"]
313 if vim_type
in SUPPORTED_VIM_TYPES
:
315 vim_name
= vim
["name"]
316 dag_description
= f
"Dag for collecting VM metrics from VIM {vim_name}"
317 dag_id
= f
"vm_metrics_vim_{vim_id}"
318 logger
.info(f
"Creating DAG {dag_id}")
319 globals()[dag_id
] = create_dag(
322 dag_description
=dag_description
,
326 logger
.info(f
"VIM type '{vim_type}' not supported for collecting VM metrics")