Add DAG for collecting VM metrics from GCP 42/13942/1
authoraguilard <e.dah.tid@telefonica.com>
Tue, 22 Aug 2023 06:59:07 +0000 (06:59 +0000)
committeraguilard <e.dah.tid@telefonica.com>
Tue, 10 Oct 2023 06:05:29 +0000 (08:05 +0200)
Change-Id: I4b91d6ac1b5beaedea24c251175bd6994b8cc368
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
(cherry picked from commit 5184a3d91baf1a2ae5e4f4a462790e1c83e6e5f5)

requirements-dev.txt
requirements.in
requirements.txt
src/osm_ngsa/dags/multivim_vm_metrics.py
src/osm_ngsa/osm_mon/vim_connectors/gcp.py

index a1a2678..2cda8b8 100644 (file)
@@ -141,7 +141,7 @@ flask-babel==2.0.0
     # via flask-appbuilder
 flask-caching==2.0.2
     # via apache-airflow
-flask-jwt-extended==4.5.2
+flask-jwt-extended==4.5.3
     # via flask-appbuilder
 flask-login==0.6.2
     # via
@@ -329,7 +329,7 @@ rpds-py==0.10.3
     # via
     #   jsonschema
     #   referencing
-setproctitle==1.3.2
+setproctitle==1.3.3
     # via apache-airflow
 six==1.16.0
     # via
index 0901183..a9a0c0e 100644 (file)
@@ -22,6 +22,7 @@ azure-mgmt-monitor
 gnocchiclient
 google-api-python-client
 google-auth
+google-cloud-monitoring
 packaging==23.1
 prometheus-api-client
 prometheus-client
index e33f43a..36b28f2 100644 (file)
@@ -72,9 +72,11 @@ futurist==2.4.1
     # via gnocchiclient
 gnocchiclient==7.0.8
     # via -r requirements.in
-google-api-core==2.12.0
-    # via google-api-python-client
-google-api-python-client==2.101.0
+google-api-core[grpc]==2.12.0
+    # via
+    #   google-api-python-client
+    #   google-cloud-monitoring
+google-api-python-client==2.102.0
     # via -r requirements.in
 google-auth==2.23.2
     # via
@@ -84,7 +86,17 @@ google-auth==2.23.2
     #   google-auth-httplib2
 google-auth-httplib2==0.1.1
     # via google-api-python-client
+google-cloud-monitoring==2.15.1
+    # via -r requirements.in
 googleapis-common-protos==1.60.0
+    # via
+    #   google-api-core
+    #   grpcio-status
+grpcio==1.59.0
+    # via
+    #   google-api-core
+    #   grpcio-status
+grpcio-status==1.59.0
     # via google-api-core
 httmock==1.4.0
     # via prometheus-api-client
@@ -190,10 +202,15 @@ prometheus-api-client==0.5.4
     # via -r requirements.in
 prometheus-client==0.17.1
     # via -r requirements.in
+proto-plus==1.22.3
+    # via google-cloud-monitoring
 protobuf==4.24.3
     # via
     #   google-api-core
+    #   google-cloud-monitoring
     #   googleapis-common-protos
+    #   grpcio-status
+    #   proto-plus
 pyasn1==0.5.0
     # via
     #   pyasn1-modules
@@ -234,7 +251,7 @@ pyyaml==6.0.1
     #   -r requirements.in
     #   cliff
     #   oslo-config
-regex==2023.8.8
+regex==2023.10.3
     # via dateparser
 requests==2.31.0
     # via
index caca9ea..2e67ce1 100644 (file)
@@ -24,13 +24,14 @@ from airflow.decorators import task
 from osm_mon.core.common_db import CommonDbClient
 from osm_mon.core.config import Config
 from osm_mon.vim_connectors.azure import AzureCollector
+from osm_mon.vim_connectors.gcp import GcpCollector
 from osm_mon.vim_connectors.openstack import OpenStackCollector
 from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
 
 
 SCHEDULE_INTERVAL = 5
 COLLECTOR_MAX_METRICS_PER_TASK = 100
-SUPPORTED_VIM_TYPES = ["openstack", "vio", "azure"]
+SUPPORTED_VIM_TYPES = ["openstack", "vio", "azure", "gcp"]
 PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
 PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
 PROMETHEUS_METRICS = {
@@ -229,6 +230,8 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
                 collector = OpenStackCollector(vim_account)
             elif vim_type == "azure":
                 collector = AzureCollector(vim_account)
+            elif vim_type == "gcp":
+                collector = GcpCollector(vim_account)
             else:
                 logger.error(f"VIM type '{vim_type}' not supported")
                 return None
index 73d09af..bfb3e10 100644 (file)
 # pylint: disable=E1101
 
 import logging
+import time
 from typing import Dict, List
 
+from google.cloud import monitoring_v3
 from google.oauth2 import service_account
 import googleapiclient.discovery
 from osm_mon.vim_connectors.base_vim import VIMConnector
@@ -26,6 +28,39 @@ from osm_mon.vim_connectors.base_vim import VIMConnector
 log = logging.getLogger(__name__)
 
 
+METRIC_MAPPINGS = {
+    "cpu_utilization": {
+        "metrictype": "compute.googleapis.com/instance/cpu/utilization",
+        "multiplier": 100,
+    },
+    "average_memory_utilization": {
+        # metric only available in e2 family
+        "metrictype": "compute.googleapis.com/instance/memory/balloon/ram_used",
+        "multiplier": 0.000001,
+    },
+    "disk_read_ops": {
+        "metrictype": "compute.googleapis.com/instance/disk/read_ops_count",
+    },
+    "disk_write_ops": {
+        "metrictype": "compute.googleapis.com/instance/disk/write_ops_count",
+    },
+    "disk_read_bytes": {
+        "metrictype": "compute.googleapis.com/instance/disk/read_bytes_count",
+    },
+    "disk_write_bytes": {
+        "metrictype": "compute.googleapis.com/instance/disk/write_bytes_count",
+    },
+    "packets_received": {
+        "metrictype": "compute.googleapis.com/instance/network/received_packets_count",
+    },
+    "packets_sent": {
+        "metrictype": "compute.googleapis.com/instance/network/sent_packets_count",
+    },
+    # "packets_in_dropped": {},
+    # "packets_out_dropped": {},
+}
+
+
 class GcpCollector(VIMConnector):
     def __init__(self, vim_account: Dict):
         self.vim_account = vim_account
@@ -67,6 +102,13 @@ class GcpCollector(VIMConnector):
                 )
             except Exception as e:
                 log.error(e)
+            # Construct a client for interacting with metrics API.
+            try:
+                self.metric_client = monitoring_v3.MetricServiceClient(
+                    credentials=creds
+                )
+            except Exception as e:
+                log.error(e)
         else:
             log.error("It is not possible to init GCP with no credentials")
 
@@ -101,3 +143,56 @@ class GcpCollector(VIMConnector):
         except Exception as e:
             log.error(e)
         return status
+
+    def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+        log.debug("collect_metrics")
+
+        metric_results = []
+        log.info(metric_list)
+        for metric in metric_list:
+            server = metric["vm_id"]
+            metric_name = metric["metric"]
+            metric_mapping = METRIC_MAPPINGS.get(metric_name)
+            if not metric_mapping:
+                # log.info(f"Metric {metric_name} not available in GCP")
+                continue
+            gcp_metric_type = metric_mapping["metrictype"]
+            metric_multiplier = metric_mapping.get("multiplier", 1)
+            log.info(f"server: {server}, gcp_metric_type: {gcp_metric_type}")
+
+            end = int(time.time())
+            start = end - 600
+            interval = monitoring_v3.TimeInterval(
+                {
+                    "end_time": {"seconds": end},
+                    "start_time": {"seconds": start},
+                }
+            )
+            aggregation = monitoring_v3.Aggregation(
+                {
+                    "alignment_period": {"seconds": 600},
+                    "per_series_aligner": monitoring_v3.Aggregation.Aligner.ALIGN_MEAN,
+                }
+            )
+            filter = f'metric.type = "{gcp_metric_type}" AND metric.labels.instance_name = "{server}"'
+            project = f"projects/{self.project}"
+            log.info(f"filter: {filter}")
+            results = self.metric_client.list_time_series(
+                request={
+                    "name": project,
+                    "filter": filter,
+                    "interval": interval,
+                    "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
+                    "aggregation": aggregation,
+                }
+            )
+            value = None
+            for result in results:
+                for point in result.points:
+                    value = point.value.double_value
+            if value is not None:
+                metric["value"] = value * metric_multiplier
+                log.info(f'value: {metric["value"]}')
+                metric_results.append(metric)
+
+        return metric_results