blob: 7d2af070ae947ac25d194022121253cd8f31f64e [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
from datetime import datetime, timedelta
import logging
from airflow.decorators import dag, task
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
PROMETHEUS_JOB = "airflow_osm_ns_topology"
PROMETHEUS_METRIC = "ns_topology"
PROMETHEUS_METRIC_DESCRIPTION = "Network services topology"
SCHEDULE_INTERVAL = 2
# Logging
logger = logging.getLogger("airflow.task")
@dag(
catchup=False,
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(seconds=10),
},
description="NS topology",
is_paused_upon_creation=False,
max_active_runs=1,
# schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
start_date=datetime(2022, 1, 1),
tags=["osm", "topology"],
)
def ns_topology():
@task(task_id="get_topology")
def get_topology():
"""
Get NS topology from MongoDB and exports it as a metric
to Prometheus
"""
# Define Prometheus Metric for NS topology
registry = CollectorRegistry()
metric = Gauge(
PROMETHEUS_METRIC,
PROMETHEUS_METRIC_DESCRIPTION,
labelnames=[
"ns_id",
"project_id",
"vnf_id",
"vdu_id",
"vm_id",
"vim_id",
"vdu_name",
"vnf_member_index",
],
registry=registry,
)
# Getting VNFR list from MongoDB
logger.info("Getting VNFR list from MongoDB")
cfg = Config()
# logger.debug(cfg.conf)
common_db = CommonDbClient(cfg)
vnfr_list = common_db.get_vnfrs()
# Only send topology if ns state is one of the nsAllowedStatesSet
nsAllowedStatesSet = {"INSTANTIATED"}
# For loop to get NS topology.
# For each VDU, a metric sample is created with the appropriate labels
for vnfr in vnfr_list:
# Label ns_id
vnf_id = vnfr["_id"]
# Label ns_id
ns_id = vnfr["nsr-id-ref"]
# Label vnfd_id
vnfd_id = vnfr["vnfd-ref"]
# Label project_id
project_list = vnfr.get("_admin", {}).get("projects_read", [])
project_id = "None"
if project_list:
project_id = project_list[0]
# Other info
ns_state = vnfr["_admin"]["nsState"]
vnf_membex_index = vnfr["member-vnf-index-ref"]
logger.info(
f"Read VNFR: id: {vnf_id}, ns_id: {ns_id}, "
f"state: {ns_state}, vnfd_id: {vnfd_id}, "
f"vnf_membex_index: {vnf_membex_index}, "
f"project_id: {project_id}"
)
# Only send topology if ns State is one of the nsAllowedStatesSet
if ns_state not in nsAllowedStatesSet:
continue
logger.debug("VDU list:")
for vdu in vnfr.get("vdur", []):
# Label vdu_id
vdu_id = vdu["_id"]
# Label vim_id
vim_info = vdu.get("vim_info")
if not vim_info:
logger.info("Error: vim_info not available in vdur")
continue
if len(vim_info) != 1:
logger.info("Error: more than one vim_info in vdur")
continue
vim_id = next(iter(vim_info))[4:]
# TODO: check if it makes sense to use vnfr.vim-account-id as vim_id instead of the vim_info key
# Label vm_id
vm_id = vdu.get("vim-id")
if not vm_id:
logger.info("Error: vim-id not available in vdur")
continue
# Other VDU info
vdu_name = vdu.get("name", "UNKNOWN")
logger.debug(
f" VDU id: {vdu_id}, name: {vdu_name}, "
f"vim_id: {vim_id}, vm_id: {vm_id}"
)
logger.info(
f"METRIC SAMPLE: ns_id: {ns_id}, "
f"project_id: {project_id}, vnf_id: {vnf_id}, "
f"vdu_id: {vdu_id}, vm_id: {vm_id}, vim_id: {vim_id}"
)
metric.labels(
ns_id,
project_id,
vnf_id,
vdu_id,
vm_id,
vim_id,
vdu_name,
vnf_membex_index,
).set(1)
push_to_gateway(
gateway=PROMETHEUS_PUSHGW, job=PROMETHEUS_JOB, registry=registry
)
return
get_topology()
dag = ns_topology()