1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime
import datetime
, timedelta
20 from typing
import Dict
, List
22 from airflow
import DAG
23 from airflow
.decorators
import task
24 from osm_mon
.core
.common_db
import CommonDbClient
25 from osm_mon
.core
.config
import Config
26 from osm_mon
.vim_connectors
.azure
import AzureCollector
27 from osm_mon
.vim_connectors
.gcp
import GcpCollector
28 from osm_mon
.vim_connectors
.openstack
import OpenStackCollector
29 from prometheus_client
import CollectorRegistry
, Gauge
, push_to_gateway
33 COLLECTOR_MAX_METRICS_PER_TASK
= 100
34 SUPPORTED_VIM_TYPES
= ["openstack", "vio", "azure", "gcp"]
35 PROMETHEUS_PUSHGW
= "pushgateway-prometheus-pushgateway:9091"
36 PROMETHEUS_JOB_PREFIX
= "airflow_osm_vm_metrics_"
37 PROMETHEUS_METRICS
= {
39 "metric_name": "osm_cpu_utilization",
40 "metric_descr": "CPU usage percentage",
42 "average_memory_utilization": {
43 "metric_name": "osm_average_memory_utilization",
44 "metric_descr": "Volume of RAM in MB used by the VM",
47 "metric_name": "osm_disk_read_ops",
48 "metric_descr": "Number of read requests",
51 "metric_name": "osm_disk_write_ops",
52 "metric_descr": "Number of write requests",
55 "metric_name": "osm_disk_read_bytes",
56 "metric_descr": "Volume of reads in bytes",
59 "metric_name": "osm_disk_write_bytes",
60 "metric_descr": "Volume of writes in bytes",
63 "metric_name": "osm_packets_received",
64 "metric_descr": "Number of incoming packets",
67 "metric_name": "osm_packets_sent",
68 "metric_descr": "Number of outgoing packets",
70 "packets_in_dropped": {
71 "metric_name": "osm_packets_in_dropped",
72 "metric_descr": "Number of incoming dropped packets",
74 "packets_out_dropped": {
75 "metric_name": "osm_packets_out_dropped",
76 "metric_descr": "Number of outgoing dropped packets",
81 logger
= logging
.getLogger("airflow.task")
85 """Get VIMs from MongoDB"""
86 logger
.info("Getting VIM list")
90 common_db
= CommonDbClient(cfg
)
91 vim_accounts
= common_db
.get_vim_accounts()
93 for vim
in vim_accounts
:
94 logger
.info(f
'Read VIM {vim["_id"]} ({vim["name"]})')
96 {"_id": vim
["_id"], "name": vim
["name"], "vim_type": vim
["vim_type"]}
100 logger
.info("Getting VIM list OK")
104 def create_dag(dag_id
, dag_number
, dag_description
, vim_id
):
109 "depends_on_past": False,
111 # "retry_delay": timedelta(minutes=1),
112 "retry_delay": timedelta(seconds
=10),
114 description
=dag_description
,
115 is_paused_upon_creation
=False,
116 schedule_interval
=f
"*/{SCHEDULE_INTERVAL} * * * *",
117 start_date
=datetime(2022, 1, 1),
123 @task(task_id
="extract_metrics_from_vnfrs")
124 def extract_metrics_from_vnfrs(vim_id
: str):
125 """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
127 # Get VNFDs that include "monitoring-parameter" from MongoDB
129 common_db
= CommonDbClient(cfg
)
130 logger
.info("Getting VNFDs with monitoring parameters from MongoDB")
131 vnfd_list
= common_db
.get_monitoring_vnfds()
132 # Get VNFR list from MongoDB
133 logger
.info("Getting VNFR list from MongoDB")
134 vnfr_list
= common_db
.get_vnfrs(vim_account_id
=vim_id
)
135 # Only read metrics if ns state is one of the nsAllowedStatesSet
136 nsAllowedStatesSet
= {"INSTANTIATED"}
138 for vnfr
in vnfr_list
:
139 if vnfr
["_admin"]["nsState"] not in nsAllowedStatesSet
:
141 # Check if VNFR is in "monitoring-parameter" VNFDs list
142 vnfd_id
= vnfr
["vnfd-id"]
144 (item
for item
in vnfd_list
if item
["_id"] == vnfd_id
), None
148 ns_id
= vnfr
["nsr-id-ref"]
149 vnf_index
= vnfr
["member-vnf-index-ref"]
151 f
"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
153 project_list
= vnfr
.get("_admin", {}).get("projects_read", [])
156 project_id
= project_list
[0]
157 for vdur
in vnfr
.get("vdur", []):
158 vim_info
= vdur
.get("vim_info")
160 logger
.error("Error: vim_info not available in vdur")
162 if len(vim_info
) != 1:
163 logger
.error("Error: more than one vim_info in vdur")
165 vim_id
= next(iter(vim_info
))[4:]
166 vm_id
= vdur
.get("vim-id")
168 logger
.error("Error: vim-id not available in vdur")
170 vdu_name
= vdur
.get("name", "UNKNOWN")
172 filter(lambda vdu
: vdu
["id"] == vdur
["vdu-id-ref"], vnfd
["vdu"])
174 if "monitoring-parameter" not in vdu
:
175 logger
.error("Error: no monitoring-parameter in descriptor")
177 for param
in vdu
["monitoring-parameter"]:
178 metric_name
= param
["performance-metric"]
179 metric_id
= param
["id"]
181 "metric": metric_name
,
182 "metric_id": metric_id
,
185 "project_id": project_id
,
186 "vdu_name": vdu_name
,
187 "vnf_member_index": vnf_index
,
190 metric_list
.append(metric
)
192 logger
.info(f
"Metrics to collect: {len(metric_list)}")
195 @task(task_id
="split_metrics")
196 def split_metrics(metric_list
: List
[Dict
]):
197 """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
198 n_metrics
= len(metric_list
)
199 if n_metrics
<= COLLECTOR_MAX_METRICS_PER_TASK
:
201 metrics_per_chunk
= ceil(
202 n_metrics
/ ceil(n_metrics
/ COLLECTOR_MAX_METRICS_PER_TASK
)
205 f
"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
208 for i
in range(0, n_metrics
, metrics_per_chunk
):
209 chunks
.append(metric_list
[i
: i
+ metrics_per_chunk
])
212 @task(task_id
="collect_metrics")
213 def collect_metrics(vim_id
: str, metric_list
: List
[Dict
]):
214 """Collect servers metrics"""
218 # Get VIM account info from MongoDB
219 logger
.info(f
"Reading VIM info, id: {vim_id}")
221 common_db
= CommonDbClient(cfg
)
222 vim_account
= common_db
.get_vim_account(vim_account_id
=vim_id
)
223 # Create VIM metrics collector
224 vim_type
= vim_account
["vim_type"]
225 if "config" in vim_account
and "vim_type" in vim_account
["config"]:
226 vim_type
= vim_account
["config"]["vim_type"].lower()
227 if vim_type
== "vio" and "vrops_site" not in vim_account
["config"]:
228 vim_type
= "openstack"
229 if vim_type
== "openstack":
230 collector
= OpenStackCollector(vim_account
)
231 elif vim_type
== "azure":
232 collector
= AzureCollector(vim_account
)
233 elif vim_type
== "gcp":
234 collector
= GcpCollector(vim_account
)
236 logger
.error(f
"VIM type '{vim_type}' not supported")
241 results
= collector
.collect_metrics(metric_list
)
245 @task(task_id
="send_prometheus")
246 def send_prometheus(metric_lists
: List
[List
[Dict
]]):
247 """Send servers metrics to Prometheus Push Gateway"""
248 logger
.info(metric_lists
)
250 # Define Prometheus metrics
251 registry
= CollectorRegistry()
253 prom_metrics_keys
= PROMETHEUS_METRICS
.keys()
254 for key
in prom_metrics_keys
:
255 prom_metrics
[key
] = Gauge(
256 PROMETHEUS_METRICS
[key
]["metric_name"],
257 PROMETHEUS_METRICS
[key
]["metric_descr"],
271 for metric_list
in metric_lists
:
272 for metric
in metric_list
:
273 metric_name
= metric
["metric"]
274 metric_id
= metric
["metric_id"]
275 value
= metric
["value"]
276 vm_id
= metric
["vm_id"]
277 vm_name
= metric
.get("vdu_name", "")
278 ns_id
= metric
["ns_id"]
279 project_id
= metric
["project_id"]
280 vnf_index
= metric
["vnf_member_index"]
281 vdu_id
= metric
["vdu_id"]
283 f
" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
285 if metric_name
in prom_metrics_keys
:
286 prom_metrics
[metric_name
].labels(
299 gateway
=PROMETHEUS_PUSHGW
,
300 job
=f
"{PROMETHEUS_JOB_PREFIX}{vim_id}",
305 chunks
= split_metrics(extract_metrics_from_vnfrs(vim_id
))
307 collect_metrics
.partial(vim_id
=vim_id
).expand(metric_list
=chunks
)
313 vim_list
= get_all_vim()
314 for index
, vim
in enumerate(vim_list
):
315 vim_type
= vim
["vim_type"]
316 if vim_type
in SUPPORTED_VIM_TYPES
:
318 vim_name
= vim
["name"]
319 dag_description
= f
"Dag for collecting VM metrics from VIM {vim_name}"
320 dag_id
= f
"vm_metrics_vim_{vim_id}"
321 logger
.info(f
"Creating DAG {dag_id}")
322 globals()[dag_id
] = create_dag(
325 dag_description
=dag_description
,
329 logger
.info(f
"VIM type '{vim_type}' not supported for collecting VM metrics")