--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime, timedelta
+import logging
+
+from airflow import DAG
+from airflow.decorators import task
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.sdnc_connectors.onos import OnosInfraCollector
+from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
+
+
+SUPPORTED_SDNC_TYPES = ["onos_vpls"]
+PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
+PROMETHEUS_JOB_PREFIX = "airflow_osm_sdnc_status_"
+PROMETHEUS_METRIC = "osm_sdnc_status"
+PROMETHEUS_METRIC_DESCRIPTION = "SDN Controller status"
+SCHEDULE_INTERVAL = 1
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+def get_all_sdnc():
+ """Get SDNCs from MongoDB"""
+ logger.info("Getting SDNC list")
+
+ cfg = Config()
+ logger.info(cfg.conf)
+ common_db = CommonDbClient(cfg)
+ sdnc_accounts = common_db.get_sdnc_accounts()
+ sdnc_list = []
+ for sdnc in sdnc_accounts:
+ logger.info(f'Read SDNC {sdnc["_id"]} ({sdnc["name"]})')
+ sdnc_list.append(
+ {"_id": sdnc["_id"], "name": sdnc["name"], "type": sdnc["type"]}
+ )
+
+ logger.info(sdnc_list)
+ logger.info("Getting SDNC list OK")
+ return sdnc_list
+
+
+def create_dag(dag_id, dag_number, dag_description, sdnc_id):
+ dag = DAG(
+ dag_id,
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ # "retry_delay": timedelta(minutes=1),
+ "retry_delay": timedelta(seconds=10),
+ },
+ description=dag_description,
+ is_paused_upon_creation=False,
+ # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
+ schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "sdnc"],
+ )
+
+ with dag:
+
+ def get_sdnc_collector(sdnc_account):
+ """Return a SDNC collector for the sdnc_account"""
+ sdnc_type = sdnc_account["type"]
+ if sdnc_type == "onos_vpls":
+ return OnosInfraCollector(sdnc_account)
+ logger.info(f"SDNC type '{sdnc_type}' not supported")
+ return None
+
+ @task(task_id="get_sdnc_status_and_send_to_prometheus")
+ def get_sdnc_status_and_send_to_prometheus(sdnc_id: str):
+ """Authenticate against SDN controller and check status"""
+
+ # Get SDNC account info from MongoDB
+ logger.info(f"Reading SDNC info, id: {sdnc_id}")
+ cfg = Config()
+ common_db = CommonDbClient(cfg)
+ sdnc_account = common_db.get_sdnc_account(sdnc_account_id=sdnc_id)
+ logger.info(sdnc_account)
+
+ # Define Prometheus Metric for NS topology
+ registry = CollectorRegistry()
+ metric = Gauge(
+ PROMETHEUS_METRIC,
+ PROMETHEUS_METRIC_DESCRIPTION,
+ labelnames=[
+ "sdnc_id",
+ ],
+ registry=registry,
+ )
+ metric.labels(sdnc_id).set(0)
+
+ # Get status of SDNC
+ collector = get_sdnc_collector(sdnc_account)
+ if collector:
+ status = collector.is_sdnc_ok()
+ logger.info(f"SDNC status: {status}")
+ metric.labels(sdnc_id).set(1)
+ else:
+ logger.info("Error creating SDNC collector")
+ # Push to Prometheus
+ push_to_gateway(
+ gateway=PROMETHEUS_PUSHGW,
+ job=f"{PROMETHEUS_JOB_PREFIX}{sdnc_id}",
+ registry=registry,
+ )
+ return
+
+ get_sdnc_status_and_send_to_prometheus(sdnc_id)
+
+ return dag
+
+
+sdnc_list = get_all_sdnc()
+for index, sdnc in enumerate(sdnc_list):
+ sdnc_type = sdnc["type"]
+ if sdnc_type in SUPPORTED_SDNC_TYPES:
+ sdnc_id = sdnc["_id"]
+ sdnc_name = sdnc["name"]
+ dag_description = f"Dag for SDNC {sdnc_name} status"
+ dag_id = f"sdnc_status_{sdnc_id}"
+ logger.info(f"Creating DAG {dag_id}")
+ globals()[dag_id] = create_dag(
+ dag_id=dag_id,
+ dag_number=index,
+ dag_description=dag_description,
+ sdnc_id=sdnc_id,
+ )
+ else:
+ logger.info(f"SDNC type '{sdnc_type}' not supported for monitoring SDNC status")
def decrypt_vim_password(self, vim_password: str, schema_version: str, vim_id: str):
return self.common_db.decrypt(vim_password, schema_version, vim_id)
+ def decrypt_sdnc_password(
+ self, sdnc_password: str, schema_version: str, sdnc_id: str
+ ):
+ return self.common_db.decrypt(sdnc_password, schema_version, sdnc_id)
+
def get_vim_accounts(self):
return self.common_db.get_list("vim_accounts")
)
return vim_account
+ def get_sdnc_accounts(self):
+ return self.common_db.get_list("sdns")
+
+ def get_sdnc_account(self, sdnc_account_id: str) -> dict:
+ sdnc_account = self.common_db.get_one("sdns", {"_id": sdnc_account_id})
+ sdnc_account["password"] = self.decrypt_vim_password(
+ sdnc_account["password"], sdnc_account["schema_version"], sdnc_account_id
+ )
+ return sdnc_account
+
def get_alert(
self,
nsr_id: str,
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import logging
+from typing import Dict
+
+from osm_mon.sdnc_connectors.base_sdnc import SDNCConnector
+import requests
+from requests.auth import HTTPBasicAuth
+
+log = logging.getLogger(__name__)
+
+
+class OnosInfraCollector(SDNCConnector):
+ def __init__(self, sdnc_account: Dict):
+ self.sdnc_account = sdnc_account
+
+ def _obtain_url(self):
+ url = self.sdnc_account.get("url")
+ if url:
+ return url
+ else:
+ if not self.sdnc_account.get("ip") or not self.sdnc_account.get("port"):
+ raise Exception("You must provide a URL to contact the SDN Controller")
+ else:
+ return "http://{}:{}/onos/v1/devices".format(
+ self.sdnc_account["ip"], self.sdnc_account["port"]
+ )
+
+ def is_sdnc_ok(self) -> bool:
+ try:
+ url = self._obtain_url()
+ user = self.sdnc_account["user"]
+ password = self.sdnc_account["password"]
+
+ requests.get(url, auth=HTTPBasicAuth(user, password))
+ return True
+ except Exception:
+ log.exception("SDNC status is not OK!")
+ return False