| ####################################################################################### |
| # Copyright ETSI Contributors and Others. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ####################################################################################### |
| from datetime import datetime, timedelta |
| |
| from airflow import DAG |
| from airflow.decorators import task |
| from osm_mon.core.common_db import CommonDbClient |
| from osm_mon.core.config import Config |
| from osm_mon.vim_connectors.azure import AzureCollector |
| from osm_mon.vim_connectors.gcp import GcpCollector |
| from osm_mon.vim_connectors.openstack import OpenStackCollector |
| from prometheus_client import CollectorRegistry, Gauge, push_to_gateway |
| |
| |
| SUPPORTED_VIM_TYPES = ["openstack", "vio", "gcp", "azure"] |
| PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091" |
| PROMETHEUS_JOB_PREFIX = "airflow_osm_vim_status_" |
| PROMETHEUS_METRIC = "vim_status" |
| PROMETHEUS_METRIC_DESCRIPTION = "VIM status" |
| SCHEDULE_INTERVAL = 1 |
| |
| |
| def get_all_vim(): |
| """Get VIMs from MongoDB""" |
| print("Getting VIM list") |
| |
| cfg = Config() |
| print(cfg.conf) |
| common_db = CommonDbClient(cfg) |
| vim_accounts = common_db.get_vim_accounts() |
| vim_list = [] |
| for vim in vim_accounts: |
| print(f'Read VIM {vim["_id"]} ({vim["name"]})') |
| vim_list.append( |
| {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]} |
| ) |
| |
| print(vim_list) |
| print("Getting VIM list OK") |
| return vim_list |
| |
| |
| def create_dag(dag_id, dag_number, dag_description, vim_id): |
| dag = DAG( |
| dag_id, |
| catchup=False, |
| default_args={ |
| "depends_on_past": False, |
| "retries": 1, |
| # "retry_delay": timedelta(minutes=1), |
| "retry_delay": timedelta(seconds=10), |
| }, |
| description=dag_description, |
| is_paused_upon_creation=False, |
| # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL), |
| schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *", |
| start_date=datetime(2022, 1, 1), |
| tags=["osm", "vim"], |
| ) |
| |
| with dag: |
| |
| def get_vim_collector(vim_account): |
| """Return a VIM collector for the vim_account""" |
| vim_type = vim_account["vim_type"] |
| if "config" in vim_account and "vim_type" in vim_account["config"]: |
| vim_type = vim_account["config"]["vim_type"].lower() |
| if vim_type == "vio" and "vrops_site" not in vim_account["config"]: |
| vim_type = "openstack" |
| if vim_type == "openstack": |
| return OpenStackCollector(vim_account) |
| if vim_type == "gcp": |
| return GcpCollector(vim_account) |
| if vim_type == "azure": |
| return AzureCollector(vim_account) |
| print(f"VIM type '{vim_type}' not supported") |
| return None |
| |
| @task(task_id="get_vim_status_and_send_to_prometheus") |
| def get_vim_status_and_send_to_prometheus(vim_id: str): |
| """Authenticate against VIM and check status""" |
| |
| # Get VIM account info from MongoDB |
| print(f"Reading VIM info, id: {vim_id}") |
| cfg = Config() |
| common_db = CommonDbClient(cfg) |
| vim_account = common_db.get_vim_account(vim_account_id=vim_id) |
| print(vim_account) |
| |
| # Define Prometheus Metric for NS topology |
| registry = CollectorRegistry() |
| metric = Gauge( |
| PROMETHEUS_METRIC, |
| PROMETHEUS_METRIC_DESCRIPTION, |
| labelnames=[ |
| "vim_id", |
| ], |
| registry=registry, |
| ) |
| metric.labels(vim_id).set(0) |
| |
| # Get status of VIM |
| collector = get_vim_collector(vim_account) |
| if collector: |
| status = collector.is_vim_ok() |
| print(f"VIM status: {status}") |
| metric.labels(vim_id).set(1) |
| else: |
| print("Error creating VIM collector") |
| # Push to Prometheus |
| push_to_gateway( |
| gateway=PROMETHEUS_PUSHGW, |
| job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}", |
| registry=registry, |
| ) |
| return |
| |
| get_vim_status_and_send_to_prometheus(vim_id) |
| |
| return dag |
| |
| |
| vim_list = get_all_vim() |
| for index, vim in enumerate(vim_list): |
| vim_type = vim["vim_type"] |
| if vim_type in SUPPORTED_VIM_TYPES: |
| vim_id = vim["_id"] |
| vim_name = vim["name"] |
| dag_description = f"Dag for VIM {vim_name} status" |
| dag_id = f"vim_status_{vim_id}" |
| print(f"Creating DAG {dag_id}") |
| globals()[dag_id] = create_dag( |
| dag_id=dag_id, |
| dag_number=index, |
| dag_description=dag_description, |
| vim_id=vim_id, |
| ) |
| else: |
| print(f"VIM type '{vim_type}' not supported for monitoring VIM status") |