X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=inline;f=src%2Fosm_ngsa%2Fdags%2Fmultisdnc_sdnc_status.py;fp=src%2Fosm_ngsa%2Fdags%2Fmultisdnc_sdnc_status.py;h=a9b7c13289b2d88a2ee97e6c1a55a27c639c633b;hb=9d57e94671c23a364b009e08b444ec3ee4b31a5f;hp=0000000000000000000000000000000000000000;hpb=9bea7378e9011b0f8835e97fd1a617da9dae3680;p=osm%2FNG-SA.git diff --git a/src/osm_ngsa/dags/multisdnc_sdnc_status.py b/src/osm_ngsa/dags/multisdnc_sdnc_status.py new file mode 100644 index 0000000..a9b7c13 --- /dev/null +++ b/src/osm_ngsa/dags/multisdnc_sdnc_status.py @@ -0,0 +1,147 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +from datetime import datetime, timedelta +import logging + +from airflow import DAG +from airflow.decorators import task +from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config +from osm_mon.sdnc_connectors.onos import OnosInfraCollector +from prometheus_client import CollectorRegistry, Gauge, push_to_gateway + + +SUPPORTED_SDNC_TYPES = ["onos_vpls"] +PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091" +PROMETHEUS_JOB_PREFIX = "airflow_osm_sdnc_status_" +PROMETHEUS_METRIC = "osm_sdnc_status" +PROMETHEUS_METRIC_DESCRIPTION = "SDN Controller status" +SCHEDULE_INTERVAL = 1 + +# Logging +logger = logging.getLogger("airflow.task") + + +def get_all_sdnc(): + """Get SDNCs from MongoDB""" + logger.info("Getting SDNC list") + + cfg = Config() + logger.info(cfg.conf) + common_db = CommonDbClient(cfg) + sdnc_accounts = common_db.get_sdnc_accounts() + sdnc_list = [] + for sdnc in sdnc_accounts: + logger.info(f'Read SDNC {sdnc["_id"]} ({sdnc["name"]})') + sdnc_list.append( + {"_id": sdnc["_id"], "name": sdnc["name"], "type": sdnc["type"]} + ) + + logger.info(sdnc_list) + logger.info("Getting SDNC list OK") + return sdnc_list + + +def create_dag(dag_id, dag_number, dag_description, sdnc_id): + dag = DAG( + dag_id, + catchup=False, + default_args={ + "depends_on_past": False, + "retries": 1, + # "retry_delay": timedelta(minutes=1), + "retry_delay": timedelta(seconds=10), + }, + description=dag_description, + is_paused_upon_creation=False, + # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL), + schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *", + start_date=datetime(2022, 1, 1), + tags=["osm", "sdnc"], + ) + + with dag: + + def get_sdnc_collector(sdnc_account): + """Return a SDNC collector for the sdnc_account""" + sdnc_type = sdnc_account["type"] + if sdnc_type == "onos_vpls": + return OnosInfraCollector(sdnc_account) + logger.info(f"SDNC type '{sdnc_type}' not supported") + return None + + @task(task_id="get_sdnc_status_and_send_to_prometheus") + def get_sdnc_status_and_send_to_prometheus(sdnc_id: str): + """Authenticate against SDN controller and check status""" + + # Get SDNC account info from MongoDB + logger.info(f"Reading SDNC info, id: {sdnc_id}") + cfg = Config() + common_db = CommonDbClient(cfg) + sdnc_account = common_db.get_sdnc_account(sdnc_account_id=sdnc_id) + logger.info(sdnc_account) + + # Define Prometheus Metric for NS topology + registry = CollectorRegistry() + metric = Gauge( + PROMETHEUS_METRIC, + PROMETHEUS_METRIC_DESCRIPTION, + labelnames=[ + "sdnc_id", + ], + registry=registry, + ) + metric.labels(sdnc_id).set(0) + + # Get status of SDNC + collector = get_sdnc_collector(sdnc_account) + if collector: + status = collector.is_sdnc_ok() + logger.info(f"SDNC status: {status}") + metric.labels(sdnc_id).set(1) + else: + logger.info("Error creating SDNC collector") + # Push to Prometheus + push_to_gateway( + gateway=PROMETHEUS_PUSHGW, + job=f"{PROMETHEUS_JOB_PREFIX}{sdnc_id}", + registry=registry, + ) + return + + get_sdnc_status_and_send_to_prometheus(sdnc_id) + + return dag + + +sdnc_list = get_all_sdnc() +for index, sdnc in enumerate(sdnc_list): + sdnc_type = sdnc["type"] + if sdnc_type in SUPPORTED_SDNC_TYPES: + sdnc_id = sdnc["_id"] + sdnc_name = sdnc["name"] + dag_description = f"Dag for SDNC {sdnc_name} status" + dag_id = f"sdnc_status_{sdnc_id}" + logger.info(f"Creating DAG {dag_id}") + globals()[dag_id] = create_dag( + dag_id=dag_id, + dag_number=index, + dag_description=dag_description, + sdnc_id=sdnc_id, + ) + else: + logger.info(f"SDNC type '{sdnc_type}' not supported for monitoring SDNC status")