Airflow DAG and connectors to get SDNC status
[osm/NG-SA.git] / src / osm_ngsa / dags / multisdnc_sdnc_status.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime import datetime, timedelta
18 import logging
19
20 from airflow import DAG
21 from airflow.decorators import task
22 from osm_mon.core.common_db import CommonDbClient
23 from osm_mon.core.config import Config
24 from osm_mon.sdnc_connectors.onos import OnosInfraCollector
25 from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
26
27
28 SUPPORTED_SDNC_TYPES = ["onos_vpls"]
29 PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
30 PROMETHEUS_JOB_PREFIX = "airflow_osm_sdnc_status_"
31 PROMETHEUS_METRIC = "osm_sdnc_status"
32 PROMETHEUS_METRIC_DESCRIPTION = "SDN Controller status"
33 SCHEDULE_INTERVAL = 1
34
35 # Logging
36 logger = logging.getLogger("airflow.task")
37
38
39 def get_all_sdnc():
40 """Get SDNCs from MongoDB"""
41 logger.info("Getting SDNC list")
42
43 cfg = Config()
44 logger.info(cfg.conf)
45 common_db = CommonDbClient(cfg)
46 sdnc_accounts = common_db.get_sdnc_accounts()
47 sdnc_list = []
48 for sdnc in sdnc_accounts:
49 logger.info(f'Read SDNC {sdnc["_id"]} ({sdnc["name"]})')
50 sdnc_list.append(
51 {"_id": sdnc["_id"], "name": sdnc["name"], "type": sdnc["type"]}
52 )
53
54 logger.info(sdnc_list)
55 logger.info("Getting SDNC list OK")
56 return sdnc_list
57
58
59 def create_dag(dag_id, dag_number, dag_description, sdnc_id):
60 dag = DAG(
61 dag_id,
62 catchup=False,
63 default_args={
64 "depends_on_past": False,
65 "retries": 1,
66 # "retry_delay": timedelta(minutes=1),
67 "retry_delay": timedelta(seconds=10),
68 },
69 description=dag_description,
70 is_paused_upon_creation=False,
71 # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
72 schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
73 start_date=datetime(2022, 1, 1),
74 tags=["osm", "sdnc"],
75 )
76
77 with dag:
78
79 def get_sdnc_collector(sdnc_account):
80 """Return a SDNC collector for the sdnc_account"""
81 sdnc_type = sdnc_account["type"]
82 if sdnc_type == "onos_vpls":
83 return OnosInfraCollector(sdnc_account)
84 logger.info(f"SDNC type '{sdnc_type}' not supported")
85 return None
86
87 @task(task_id="get_sdnc_status_and_send_to_prometheus")
88 def get_sdnc_status_and_send_to_prometheus(sdnc_id: str):
89 """Authenticate against SDN controller and check status"""
90
91 # Get SDNC account info from MongoDB
92 logger.info(f"Reading SDNC info, id: {sdnc_id}")
93 cfg = Config()
94 common_db = CommonDbClient(cfg)
95 sdnc_account = common_db.get_sdnc_account(sdnc_account_id=sdnc_id)
96 logger.info(sdnc_account)
97
98 # Define Prometheus Metric for NS topology
99 registry = CollectorRegistry()
100 metric = Gauge(
101 PROMETHEUS_METRIC,
102 PROMETHEUS_METRIC_DESCRIPTION,
103 labelnames=[
104 "sdnc_id",
105 ],
106 registry=registry,
107 )
108 metric.labels(sdnc_id).set(0)
109
110 # Get status of SDNC
111 collector = get_sdnc_collector(sdnc_account)
112 if collector:
113 status = collector.is_sdnc_ok()
114 logger.info(f"SDNC status: {status}")
115 metric.labels(sdnc_id).set(1)
116 else:
117 logger.info("Error creating SDNC collector")
118 # Push to Prometheus
119 push_to_gateway(
120 gateway=PROMETHEUS_PUSHGW,
121 job=f"{PROMETHEUS_JOB_PREFIX}{sdnc_id}",
122 registry=registry,
123 )
124 return
125
126 get_sdnc_status_and_send_to_prometheus(sdnc_id)
127
128 return dag
129
130
131 sdnc_list = get_all_sdnc()
132 for index, sdnc in enumerate(sdnc_list):
133 sdnc_type = sdnc["type"]
134 if sdnc_type in SUPPORTED_SDNC_TYPES:
135 sdnc_id = sdnc["_id"]
136 sdnc_name = sdnc["name"]
137 dag_description = f"Dag for SDNC {sdnc_name} status"
138 dag_id = f"sdnc_status_{sdnc_id}"
139 logger.info(f"Creating DAG {dag_id}")
140 globals()[dag_id] = create_dag(
141 dag_id=dag_id,
142 dag_number=index,
143 dag_description=dag_description,
144 sdnc_id=sdnc_id,
145 )
146 else:
147 logger.info(f"SDNC type '{sdnc_type}' not supported for monitoring SDNC status")