18a02e1d278348dc59b8b4f1180b49be119fa560
[osm/NG-SA.git] / src / osm_ngsa / dags / multivim_vm_status.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime import datetime, timedelta
18 import logging
19
20 from airflow import DAG
21 from airflow.decorators import task
22 from osm_mon.core.common_db import CommonDbClient
23 from osm_mon.core.config import Config
24 from osm_mon.vim_connectors.azure import AzureCollector
25 from osm_mon.vim_connectors.gcp import GcpCollector
26 from osm_mon.vim_connectors.openstack import OpenStackCollector
27 from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
28
29
30 SUPPORTED_VIM_TYPES = ["openstack", "vio", "gcp", "azure"]
31 PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
32 PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_status_"
33 PROMETHEUS_METRIC = "vm_status"
34 PROMETHEUS_METRIC_DESCRIPTION = "VM Status from VIM"
35 SCHEDULE_INTERVAL = 1
36
37 # Logging
38 logger = logging.getLogger("airflow.task")
39
40
41 def get_all_vim():
42 """Get VIMs from MongoDB"""
43 logger.info("Getting VIM list")
44
45 cfg = Config()
46 logger.info(cfg.conf)
47 common_db = CommonDbClient(cfg)
48 vim_accounts = common_db.get_vim_accounts()
49 vim_list = []
50 for vim in vim_accounts:
51 logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
52 vim_list.append(
53 {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
54 )
55
56 logger.info(vim_list)
57 logger.info("Getting VIM list OK")
58 return vim_list
59
60
61 def create_dag(dag_id, dag_number, dag_description, vim_id):
62 dag = DAG(
63 dag_id,
64 catchup=False,
65 default_args={
66 "depends_on_past": False,
67 "retries": 1,
68 # "retry_delay": timedelta(minutes=1),
69 "retry_delay": timedelta(seconds=10),
70 },
71 description=dag_description,
72 is_paused_upon_creation=False,
73 max_active_runs=1,
74 # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
75 schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
76 start_date=datetime(2022, 1, 1),
77 tags=["osm", "vim"],
78 )
79
80 with dag:
81
82 def get_vim_collector(vim_account):
83 """Return a VIM collector for the vim_account"""
84 vim_type = vim_account["vim_type"]
85 if "config" in vim_account and "vim_type" in vim_account["config"]:
86 vim_type = vim_account["config"]["vim_type"].lower()
87 if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
88 vim_type = "openstack"
89 if vim_type == "openstack":
90 return OpenStackCollector(vim_account)
91 if vim_type == "gcp":
92 return GcpCollector(vim_account)
93 if vim_type == "azure":
94 return AzureCollector(vim_account)
95 logger.info(f"VIM type '{vim_type}' not supported")
96 return None
97
98 def get_all_vm_status(vim_account):
99 """Get VM status from the VIM"""
100 collector = get_vim_collector(vim_account)
101 if collector:
102 status = collector.is_vim_ok()
103 logger.info(f"VIM status: {status}")
104 vm_status_list = collector.collect_servers_status()
105 return vm_status_list
106 else:
107 logger.error("No collector for VIM")
108 return []
109
110 @task(task_id="get_all_vm_status_and_send_to_prometheus")
111 def get_all_vm_status_and_send_to_prometheus(vim_id: str):
112 """Authenticate against VIM, collect servers status and send to prometheus"""
113
114 # Get VIM account info from MongoDB
115 logger.info(f"Reading VIM info, id: {vim_id}")
116 cfg = Config()
117 common_db = CommonDbClient(cfg)
118 vim_account = common_db.get_vim_account(vim_account_id=vim_id)
119 logger.info(vim_account)
120
121 # Define Prometheus Metric for NS topology
122 registry = CollectorRegistry()
123 metric = Gauge(
124 PROMETHEUS_METRIC,
125 PROMETHEUS_METRIC_DESCRIPTION,
126 labelnames=[
127 "vm_id",
128 "vim_id",
129 ],
130 registry=registry,
131 )
132
133 # Get status of all VM from VIM
134 all_vm_status = get_all_vm_status(vim_account)
135 logger.info(f"Got {len(all_vm_status)} VMs with their status:")
136 for vm in all_vm_status:
137 vm_id = vm["id"]
138 vm_status = vm["status"]
139 vm_name = vm.get("name", "")
140 logger.info(f" {vm_name} ({vm_id}) {vm_status}")
141 metric.labels(vm_id, vim_id).set(vm_status)
142 # Push to Prometheus
143 push_to_gateway(
144 gateway=PROMETHEUS_PUSHGW,
145 job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
146 registry=registry,
147 )
148 return
149
150 get_all_vm_status_and_send_to_prometheus(vim_id)
151
152 return dag
153
154
155 vim_list = get_all_vim()
156 for index, vim in enumerate(vim_list):
157 vim_type = vim["vim_type"]
158 if vim_type in SUPPORTED_VIM_TYPES:
159 vim_id = vim["_id"]
160 vim_name = vim["name"]
161 dag_description = f"Dag for vim {vim_name}"
162 dag_id = f"vm_status_vim_{vim_id}"
163 logger.info(f"Creating DAG {dag_id}")
164 globals()[dag_id] = create_dag(
165 dag_id=dag_id,
166 dag_number=index,
167 dag_description=dag_description,
168 vim_id=vim_id,
169 )
170 else:
171 logger.info(f"VIM type '{vim_type}' not supported for collecting VM status")