Feature 10965 Airflow monitoring pipeline for VIM status 76/12676/14
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 10 Nov 2022 13:19:44 +0000 (14:19 +0100)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 24 Nov 2022 14:23:23 +0000 (15:23 +0100)
Change-Id: I05e95543690977db2fd081dd53255308f9b828b5
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
src/dags/multivim_vim_status.py [new file with mode: 0644]
src/dags/multivim_vm_status.py
src/osm_mon/vim_connectors/azure.py
src/osm_mon/vim_connectors/base_vim.py
src/osm_mon/vim_connectors/gcp.py
src/osm_mon/vim_connectors/openstack.py
tox.ini

diff --git a/src/dags/multivim_vim_status.py b/src/dags/multivim_vim_status.py
new file mode 100644 (file)
index 0000000..93894b1
--- /dev/null
@@ -0,0 +1,153 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.decorators import task
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.vim_connectors.azure import AzureCollector
+from osm_mon.vim_connectors.gcp import GcpCollector
+from osm_mon.vim_connectors.openstack import OpenStackCollector
+from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
+
+
+SUPPORTED_VIM_TYPES = ["openstack", "vio", "gcp", "azure"]
+PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
+PROMETHEUS_JOB_PREFIX = "airflow_osm_vim_status_"
+PROMETHEUS_METRIC = "vim_status"
+PROMETHEUS_METRIC_DESCRIPTION = "VIM status"
+SCHEDULE_INTERVAL = 1
+
+
+def get_all_vim():
+    """Get VIMs from MongoDB"""
+    print("Getting VIM list")
+
+    cfg = Config()
+    print(cfg.conf)
+    common_db = CommonDbClient(cfg)
+    vim_accounts = common_db.get_vim_accounts()
+    vim_list = []
+    for vim in vim_accounts:
+        print(f'Read VIM {vim["_id"]} ({vim["name"]})')
+        vim_list.append(
+            {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
+        )
+
+    print(vim_list)
+    print("Getting VIM list OK")
+    return vim_list
+
+
+def create_dag(dag_id, dag_number, dag_description, vim_id):
+    dag = DAG(
+        dag_id,
+        catchup=False,
+        default_args={
+            "depends_on_past": False,
+            "retries": 1,
+            # "retry_delay": timedelta(minutes=1),
+            "retry_delay": timedelta(seconds=10),
+        },
+        description=dag_description,
+        is_paused_upon_creation=False,
+        # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
+        schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
+        start_date=datetime(2022, 1, 1),
+        tags=["osm", "vim"],
+    )
+
+    with dag:
+
+        def get_vim_collector(vim_account):
+            """Return a VIM collector for the vim_account"""
+            vim_type = vim_account["vim_type"]
+            if "config" in vim_account and "vim_type" in vim_account["config"]:
+                vim_type = vim_account["config"]["vim_type"].lower()
+                if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
+                    vim_type = "openstack"
+            if vim_type == "openstack":
+                return OpenStackCollector(vim_account)
+            if vim_type == "gcp":
+                return GcpCollector(vim_account)
+            if vim_type == "azure":
+                return AzureCollector(vim_account)
+            print(f"VIM type '{vim_type}' not supported")
+            return None
+
+        @task(task_id="get_vim_status_and_send_to_prometheus")
+        def get_vim_status_and_send_to_prometheus(vim_id: str):
+            """Authenticate against VIM and check status"""
+
+            # Get VIM account info from MongoDB
+            print(f"Reading VIM info, id: {vim_id}")
+            cfg = Config()
+            common_db = CommonDbClient(cfg)
+            vim_account = common_db.get_vim_account(vim_account_id=vim_id)
+            print(vim_account)
+
+            # Define Prometheus Metric for NS topology
+            registry = CollectorRegistry()
+            metric = Gauge(
+                PROMETHEUS_METRIC,
+                PROMETHEUS_METRIC_DESCRIPTION,
+                labelnames=[
+                    "vim_id",
+                ],
+                registry=registry,
+            )
+            metric.labels(vim_id).set(0)
+
+            # Get status of VIM
+            collector = get_vim_collector(vim_account)
+            if collector:
+                status = collector.is_vim_ok()
+                print(f"VIM status: {status}")
+                metric.labels(vim_id).set(1)
+            else:
+                print("Error creating VIM collector")
+            # Push to Prometheus
+            push_to_gateway(
+                gateway=PROMETHEUS_PUSHGW,
+                job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
+                registry=registry,
+            )
+            return
+
+        get_vim_status_and_send_to_prometheus(vim_id)
+
+    return dag
+
+
+vim_list = get_all_vim()
+for index, vim in enumerate(vim_list):
+    vim_type = vim["vim_type"]
+    if vim_type in SUPPORTED_VIM_TYPES:
+        vim_id = vim["_id"]
+        vim_name = vim["name"]
+        dag_description = f"Dag for VIM {vim_name} status"
+        dag_id = f"vim_status_{vim_id}"
+        print(f"Creating DAG {dag_id}")
+        globals()[dag_id] = create_dag(
+            dag_id=dag_id,
+            dag_number=index,
+            dag_description=dag_description,
+            vim_id=vim_id,
+        )
+    else:
+        print(f"VIM type '{vim_type}' not supported for monitoring VIM status")
index a189112..dbdbbc0 100644 (file)
@@ -94,8 +94,8 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
             """Get VM status from the VIM"""
             collector = get_vim_collector(vim_account)
             if collector:
-                status = collector.is_vim_ok()
-                print(f"VIM status: {status}")
+                status = collector.is_vim_ok()
+                print(f"VIM status: {status}")
                 vm_status_list = collector.collect_servers_status()
                 return vm_status_list
             else:
index a401f75..23086c4 100644 (file)
@@ -181,3 +181,13 @@ class AzureCollector(VIMConnector):
         except Exception as e:
             log.error(e)
         return servers
+
+    def is_vim_ok(self) -> bool:
+        status = False
+        self.reload_client = True
+        try:
+            self._reload_connection()
+            status = True
+        except Exception as e:
+            log.error(e)
+        return status
index 9ec2cde..5eb146b 100644 (file)
@@ -24,3 +24,6 @@ class VIMConnector:
     # def collect_servers_status(self) -> List[Metric]:
     def collect_servers_status(self) -> List:
         pass
+
+    def is_vim_ok(self) -> bool:
+        pass
index 6c7b557..396e136 100644 (file)
@@ -90,3 +90,14 @@ class GcpCollector(VIMConnector):
         except Exception as e:
             log.error(e)
         return servers
+
+    def is_vim_ok(self) -> bool:
+        status = False
+        try:
+            self.conn_compute.zones().get(
+                project=self.project, zone=self.zone
+            ).execute()
+            status = True
+        except Exception as e:
+            log.error(e)
+        return status
index 67ca4c5..d37973d 100644 (file)
@@ -81,3 +81,11 @@ class OpenStackCollector(VIMConnector):
             }
             servers.append(vm)
         return servers
+
+    def is_vim_ok(self) -> bool:
+        try:
+            self.nova.servers.list()
+            return True
+        except Exception as e:
+            log.warning("VIM status is not OK: %s" % e)
+            return False
diff --git a/tox.ini b/tox.ini
index b29a414..7247cf3 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -57,7 +57,7 @@ commands =
 
 #######################################################################################
 [testenv:flake8]
-deps =  flake8
+deps =  flake8==5.0.4
         flake8-import-order
 skip_install = true
 commands =