Collect consumption metrics from Azure in DAG 02/13402/4
authoraguilard <e.dah.tid@telefonica.com>
Wed, 17 May 2023 08:05:29 +0000 (08:05 +0000)
committeraguilard <e.dah.tid@telefonica.com>
Mon, 22 May 2023 13:51:36 +0000 (13:51 +0000)
Change-Id: I499c8c99060a2ab3fcb51174f319a92ef82aa338
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
requirements.in
requirements.txt
src/osm_ngsa/dags/multivim_vm_metrics.py
src/osm_ngsa/osm_mon/vim_connectors/azure.py

index e333d35..349b0d3 100644 (file)
@@ -18,6 +18,7 @@
 azure-common
 azure-identity
 azure-mgmt-compute
+azure-mgmt-monitor
 gnocchiclient
 google-api-python-client
 google-auth
index a82d60c..3818f76 100644 (file)
@@ -22,17 +22,22 @@ azure-common==1.1.28
     # via
     #   -r requirements.in
     #   azure-mgmt-compute
+    #   azure-mgmt-monitor
 azure-core==1.26.4
     # via
     #   azure-identity
     #   azure-mgmt-core
     #   msrest
-azure-identity==1.12.0
+azure-identity==1.13.0
     # via -r requirements.in
 azure-mgmt-compute==29.1.0
     # via -r requirements.in
 azure-mgmt-core==1.4.0
-    # via azure-mgmt-compute
+    # via
+    #   azure-mgmt-compute
+    #   azure-mgmt-monitor
+azure-mgmt-monitor==6.0.1
+    # via -r requirements.in
 cachetools==5.3.0
     # via google-auth
 certifi==2023.5.7
@@ -64,7 +69,7 @@ debtcollector==2.5.0
     #   oslo-config
     #   oslo-utils
     #   python-keystoneclient
-fonttools==4.39.3
+fonttools==4.39.4
     # via matplotlib
 futurist==2.4.1
     # via gnocchiclient
@@ -74,7 +79,7 @@ google-api-core==2.11.0
     # via google-api-python-client
 google-api-python-client==2.86.0
     # via -r requirements.in
-google-auth==2.17.3
+google-auth==2.18.1
     # via
     #   -r requirements.in
     #   google-api-core
@@ -102,8 +107,10 @@ iso8601==1.1.0
     #   python-ceilometerclient
     #   python-novaclient
 isodate==0.6.1
-    # via msrest
-keystoneauth1==5.1.2
+    # via
+    #   azure-mgmt-monitor
+    #   msrest
+keystoneauth1==5.2.0
     # via
     #   gnocchiclient
     #   python-ceilometerclient
@@ -189,7 +196,7 @@ prometheus-api-client==0.5.3
     # via -r requirements.in
 prometheus-client==0.16.0
     # via -r requirements.in
-protobuf==4.23.0
+protobuf==4.23.1
     # via
     #   google-api-core
     #   googleapis-common-protos
@@ -228,8 +235,6 @@ pytz==2023.3
     #   oslo-serialization
     #   oslo-utils
     #   pandas
-pytz-deprecation-shim==0.1.0.post0
-    # via tzlocal
 pyyaml==5.4.1
     # via
     #   -r requirements.in
@@ -264,11 +269,10 @@ six==1.16.0
     #   google-auth
     #   google-auth-httplib2
     #   isodate
-    #   keystoneauth1
     #   python-ceilometerclient
     #   python-dateutil
     #   python-keystoneclient
-stevedore==5.0.0
+stevedore==5.1.0
     # via
     #   cliff
     #   keystoneauth1
@@ -279,17 +283,17 @@ stevedore==5.0.0
 typing-extensions==4.5.0
     # via azure-core
 tzdata==2023.3
-    # via
-    #   pandas
-    #   pytz-deprecation-shim
-tzlocal==4.3
+    # via pandas
+tzlocal==5.0.1
     # via dateparser
 ujson==5.7.0
     # via gnocchiclient
 uritemplate==4.1.1
     # via google-api-python-client
-urllib3==2.0.2
-    # via requests
+urllib3==1.26.15
+    # via
+    #   google-auth
+    #   requests
 wcwidth==0.2.6
     # via cmd2
 wrapt==1.15.0
index 72ceca4..caca9ea 100644 (file)
@@ -23,13 +23,14 @@ from airflow import DAG
 from airflow.decorators import task
 from osm_mon.core.common_db import CommonDbClient
 from osm_mon.core.config import Config
+from osm_mon.vim_connectors.azure import AzureCollector
 from osm_mon.vim_connectors.openstack import OpenStackCollector
 from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
 
 
 SCHEDULE_INTERVAL = 5
 COLLECTOR_MAX_METRICS_PER_TASK = 100
-SUPPORTED_VIM_TYPES = ["openstack", "vio"]
+SUPPORTED_VIM_TYPES = ["openstack", "vio", "azure"]
 PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
 PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
 PROMETHEUS_METRICS = {
@@ -111,7 +112,6 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
         },
         description=dag_description,
         is_paused_upon_creation=False,
-        # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
         schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
         start_date=datetime(2022, 1, 1),
         tags=["osm", "vdu"],
@@ -227,6 +227,8 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
                     vim_type = "openstack"
             if vim_type == "openstack":
                 collector = OpenStackCollector(vim_account)
+            elif vim_type == "azure":
+                collector = AzureCollector(vim_account)
             else:
                 logger.error(f"VIM type '{vim_type}' not supported")
                 return None
index 161e596..16de6c2 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
+import datetime
 import logging
 from typing import Dict, List
 
 from azure.identity import ClientSecretCredential
 from azure.mgmt.compute import ComputeManagementClient
+from azure.mgmt.monitor import MonitorManagementClient
 from azure.profiles import ProfileDefinition
 from osm_mon.vim_connectors.base_vim import VIMConnector
 
@@ -26,6 +28,35 @@ from osm_mon.vim_connectors.base_vim import VIMConnector
 log = logging.getLogger(__name__)
 
 
+METRIC_MAPPINGS = {
+    "cpu_utilization": {
+        "metricname": "Percentage CPU",
+        "aggregation": "Average",
+    },
+    "disk_read_ops": {
+        "metricname": "Disk Read Operations/Sec",
+        "aggregation": "Average",
+    },
+    "disk_write_ops": {
+        "metricname": "Disk Write Operations/Sec",
+        "aggregation": "Average",
+    },
+    "disk_read_bytes": {
+        "metricname": "Disk Read Bytes",
+        "aggregation": "Total",
+    },
+    "disk_write_bytes": {
+        "metricname": "Disk Write Bytes",
+        "aggregation": "Total",
+    },
+    # "average_memory_utilization": {},
+    # "packets_in_dropped": {},
+    # "packets_out_dropped": {},
+    # "packets_received": {},
+    # "packets_sent": {},
+}
+
+
 class AzureCollector(VIMConnector):
     # Translate azure provisioning state to OSM provision state.
     # The first three ones are the transitional status once a user initiated
@@ -141,6 +172,11 @@ class AzureCollector(VIMConnector):
                     self._config["subscription_id"],
                     profile=self.AZURE_COMPUTE_MGMT_PROFILE,
                 )
+                # create client
+                self.conn_monitor = MonitorManagementClient(
+                    self.credentials,
+                    self._config["subscription_id"],
+                )
                 # Set to client created
                 self.reload_client = False
             except Exception as e:
@@ -190,3 +226,57 @@ class AzureCollector(VIMConnector):
         except Exception as e:
             log.error(e)
         return status
+
+    def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+        log.debug("collect_metrics")
+        self._reload_connection()
+
+        metric_results = []
+        log.info(metric_list)
+        for metric in metric_list:
+            server = metric["vm_id"]
+            metric_name = metric["metric"]
+            metric_mapping = METRIC_MAPPINGS.get(metric_name)
+            if not metric_mapping:
+                # log.info(f"Metric {metric_name} not available in Azure")
+                continue
+            azure_metric_name = metric_mapping["metricname"]
+            azure_aggregation = metric_mapping["aggregation"]
+            end = datetime.datetime.now()
+            init = end - datetime.timedelta(minutes=5)
+            try:
+                metrics_data = self.conn_monitor.metrics.list(
+                    server,
+                    timespan="{}/{}".format(init, end),
+                    interval="PT1M",
+                    metricnames=azure_metric_name,
+                    aggregation=azure_aggregation,
+                )
+            except Exception as e:
+                log.error(e)
+                continue
+            total = 0
+            n_metrics = 0
+            for item in metrics_data.value:
+                log.info("{} ({})".format(item.name.localized_value, item.unit))
+                for timeserie in item.timeseries:
+                    for data in timeserie.data:
+                        if azure_aggregation == "Average":
+                            val = data.average
+                        elif azure_aggregation == "Total":
+                            val = data.total
+                        else:
+                            val = None
+                        log.info("{}: {}".format(data.time_stamp, val))
+                        if val is not None:
+                            total += val
+                            n_metrics += 1
+            if n_metrics > 0:
+                value = total / n_metrics
+                log.info(f"value = {value}")
+                metric["value"] = value
+                metric_results.append(metric)
+            else:
+                log.info("No metric available")
+
+        return metric_results