1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime
import datetime
, timedelta
20 from airflow
import DAG
21 from airflow
.decorators
import task
22 from osm_mon
.core
.common_db
import CommonDbClient
23 from osm_mon
.core
.config
import Config
24 from osm_mon
.vim_connectors
.azure
import AzureCollector
25 from osm_mon
.vim_connectors
.gcp
import GcpCollector
26 from osm_mon
.vim_connectors
.openstack
import OpenStackCollector
27 from prometheus_client
import CollectorRegistry
, Gauge
, push_to_gateway
30 SUPPORTED_VIM_TYPES
= ["openstack", "vio", "gcp", "azure"]
31 PROMETHEUS_PUSHGW
= "pushgateway-prometheus-pushgateway:9091"
32 PROMETHEUS_JOB_PREFIX
= "airflow_osm_vm_status_"
33 PROMETHEUS_METRIC
= "vm_status"
34 PROMETHEUS_METRIC_DESCRIPTION
= "VM Status from VIM"
38 logger
= logging
.getLogger("airflow.task")
42 """Get VIMs from MongoDB"""
43 logger
.info("Getting VIM list")
47 common_db
= CommonDbClient(cfg
)
48 vim_accounts
= common_db
.get_vim_accounts()
50 for vim
in vim_accounts
:
51 logger
.info(f
'Read VIM {vim["_id"]} ({vim["name"]})')
53 {"_id": vim
["_id"], "name": vim
["name"], "vim_type": vim
["vim_type"]}
57 logger
.info("Getting VIM list OK")
61 def create_dag(dag_id
, dag_number
, dag_description
, vim_id
):
66 "depends_on_past": False,
68 # "retry_delay": timedelta(minutes=1),
69 "retry_delay": timedelta(seconds
=10),
71 description
=dag_description
,
72 is_paused_upon_creation
=False,
73 # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
74 schedule_interval
=f
"*/{SCHEDULE_INTERVAL} * * * *",
75 start_date
=datetime(2022, 1, 1),
81 def get_vim_collector(vim_account
):
82 """Return a VIM collector for the vim_account"""
83 vim_type
= vim_account
["vim_type"]
84 if "config" in vim_account
and "vim_type" in vim_account
["config"]:
85 vim_type
= vim_account
["config"]["vim_type"].lower()
86 if vim_type
== "vio" and "vrops_site" not in vim_account
["config"]:
87 vim_type
= "openstack"
88 if vim_type
== "openstack":
89 return OpenStackCollector(vim_account
)
91 return GcpCollector(vim_account
)
92 if vim_type
== "azure":
93 return AzureCollector(vim_account
)
94 logger
.info(f
"VIM type '{vim_type}' not supported")
97 def get_all_vm_status(vim_account
):
98 """Get VM status from the VIM"""
99 collector
= get_vim_collector(vim_account
)
101 status
= collector
.is_vim_ok()
102 logger
.info(f
"VIM status: {status}")
103 vm_status_list
= collector
.collect_servers_status()
104 return vm_status_list
108 @task(task_id
="get_all_vm_status_and_send_to_prometheus")
109 def get_all_vm_status_and_send_to_prometheus(vim_id
: str):
110 """Authenticate against VIM, collect servers status and send to prometheus"""
112 # Get VIM account info from MongoDB
113 logger
.info(f
"Reading VIM info, id: {vim_id}")
115 common_db
= CommonDbClient(cfg
)
116 vim_account
= common_db
.get_vim_account(vim_account_id
=vim_id
)
117 logger
.info(vim_account
)
119 # Define Prometheus Metric for NS topology
120 registry
= CollectorRegistry()
123 PROMETHEUS_METRIC_DESCRIPTION
,
131 # Get status of all VM from VIM
132 all_vm_status
= get_all_vm_status(vim_account
)
133 logger
.info(f
"Got {len(all_vm_status)} VMs with their status:")
135 for vm
in all_vm_status
:
137 vm_status
= vm
["status"]
138 vm_name
= vm
.get("name", "")
139 logger
.info(f
" {vm_name} ({vm_id}) {vm_status}")
140 metric
.labels(vm_id
, vim_id
).set(vm_status
)
141 # Push to Prometheus only if there are VM
143 gateway
=PROMETHEUS_PUSHGW
,
144 job
=f
"{PROMETHEUS_JOB_PREFIX}{vim_id}",
149 get_all_vm_status_and_send_to_prometheus(vim_id
)
154 vim_list
= get_all_vim()
155 for index
, vim
in enumerate(vim_list
):
156 vim_type
= vim
["vim_type"]
157 if vim_type
in SUPPORTED_VIM_TYPES
:
159 vim_name
= vim
["name"]
160 dag_description
= f
"Dag for vim {vim_name}"
161 dag_id
= f
"vm_status_vim_{vim_id}"
162 logger
.info(f
"Creating DAG {dag_id}")
163 globals()[dag_id
] = create_dag(
166 dag_description
=dag_description
,
170 logger
.info(f
"VIM type '{vim_type}' not supported for collecting VM status")