1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime
import datetime
, timedelta
20 from airflow
import DAG
21 from airflow
.decorators
import task
22 from osm_mon
.core
.common_db
import CommonDbClient
23 from osm_mon
.core
.config
import Config
24 from osm_mon
.sdnc_connectors
.onos
import OnosInfraCollector
25 from prometheus_client
import CollectorRegistry
, Gauge
, push_to_gateway
28 SUPPORTED_SDNC_TYPES
= ["onos_vpls"]
29 PROMETHEUS_PUSHGW
= "pushgateway-prometheus-pushgateway:9091"
30 PROMETHEUS_JOB_PREFIX
= "airflow_osm_sdnc_status_"
31 PROMETHEUS_METRIC
= "osm_sdnc_status"
32 PROMETHEUS_METRIC_DESCRIPTION
= "SDN Controller status"
36 logger
= logging
.getLogger("airflow.task")
40 """Get SDNCs from MongoDB"""
41 logger
.info("Getting SDNC list")
45 common_db
= CommonDbClient(cfg
)
46 sdnc_accounts
= common_db
.get_sdnc_accounts()
48 for sdnc
in sdnc_accounts
:
49 logger
.info(f
'Read SDNC {sdnc["_id"]} ({sdnc["name"]})')
51 {"_id": sdnc
["_id"], "name": sdnc
["name"], "type": sdnc
["type"]}
54 logger
.info(sdnc_list
)
55 logger
.info("Getting SDNC list OK")
59 def create_dag(dag_id
, dag_number
, dag_description
, sdnc_id
):
64 "depends_on_past": False,
66 # "retry_delay": timedelta(minutes=1),
67 "retry_delay": timedelta(seconds
=10),
69 description
=dag_description
,
70 is_paused_upon_creation
=False,
71 # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
72 schedule_interval
=f
"*/{SCHEDULE_INTERVAL} * * * *",
73 start_date
=datetime(2022, 1, 1),
79 def get_sdnc_collector(sdnc_account
):
80 """Return a SDNC collector for the sdnc_account"""
81 sdnc_type
= sdnc_account
["type"]
82 if sdnc_type
== "onos_vpls":
83 return OnosInfraCollector(sdnc_account
)
84 logger
.info(f
"SDNC type '{sdnc_type}' not supported")
87 @task(task_id
="get_sdnc_status_and_send_to_prometheus")
88 def get_sdnc_status_and_send_to_prometheus(sdnc_id
: str):
89 """Authenticate against SDN controller and check status"""
91 # Get SDNC account info from MongoDB
92 logger
.info(f
"Reading SDNC info, id: {sdnc_id}")
94 common_db
= CommonDbClient(cfg
)
95 sdnc_account
= common_db
.get_sdnc_account(sdnc_account_id
=sdnc_id
)
96 logger
.info(sdnc_account
)
98 # Define Prometheus Metric for NS topology
99 registry
= CollectorRegistry()
102 PROMETHEUS_METRIC_DESCRIPTION
,
108 metric
.labels(sdnc_id
).set(0)
111 collector
= get_sdnc_collector(sdnc_account
)
113 status
= collector
.is_sdnc_ok()
114 logger
.info(f
"SDNC status: {status}")
115 metric
.labels(sdnc_id
).set(1)
117 logger
.info("Error creating SDNC collector")
120 gateway
=PROMETHEUS_PUSHGW
,
121 job
=f
"{PROMETHEUS_JOB_PREFIX}{sdnc_id}",
126 get_sdnc_status_and_send_to_prometheus(sdnc_id
)
131 sdnc_list
= get_all_sdnc()
132 for index
, sdnc
in enumerate(sdnc_list
):
133 sdnc_type
= sdnc
["type"]
134 if sdnc_type
in SUPPORTED_SDNC_TYPES
:
135 sdnc_id
= sdnc
["_id"]
136 sdnc_name
= sdnc
["name"]
137 dag_description
= f
"Dag for SDNC {sdnc_name} status"
138 dag_id
= f
"sdnc_status_{sdnc_id}"
139 logger
.info(f
"Creating DAG {dag_id}")
140 globals()[dag_id
] = create_dag(
143 dag_description
=dag_description
,
147 logger
.info(f
"SDNC type '{sdnc_type}' not supported for monitoring SDNC status")