Add DAG for collecting VM metrics from GCP
[osm/NG-SA.git] / src / osm_ngsa / osm_mon / vim_connectors / gcp.py
index 73d09af..bfb3e10 100644 (file)
 # pylint: disable=E1101
 
 import logging
+import time
 from typing import Dict, List
 
+from google.cloud import monitoring_v3
 from google.oauth2 import service_account
 import googleapiclient.discovery
 from osm_mon.vim_connectors.base_vim import VIMConnector
@@ -26,6 +28,39 @@ from osm_mon.vim_connectors.base_vim import VIMConnector
 log = logging.getLogger(__name__)
 
 
+METRIC_MAPPINGS = {
+    "cpu_utilization": {
+        "metrictype": "compute.googleapis.com/instance/cpu/utilization",
+        "multiplier": 100,
+    },
+    "average_memory_utilization": {
+        # metric only available in e2 family
+        "metrictype": "compute.googleapis.com/instance/memory/balloon/ram_used",
+        "multiplier": 0.000001,
+    },
+    "disk_read_ops": {
+        "metrictype": "compute.googleapis.com/instance/disk/read_ops_count",
+    },
+    "disk_write_ops": {
+        "metrictype": "compute.googleapis.com/instance/disk/write_ops_count",
+    },
+    "disk_read_bytes": {
+        "metrictype": "compute.googleapis.com/instance/disk/read_bytes_count",
+    },
+    "disk_write_bytes": {
+        "metrictype": "compute.googleapis.com/instance/disk/write_bytes_count",
+    },
+    "packets_received": {
+        "metrictype": "compute.googleapis.com/instance/network/received_packets_count",
+    },
+    "packets_sent": {
+        "metrictype": "compute.googleapis.com/instance/network/sent_packets_count",
+    },
+    # "packets_in_dropped": {},
+    # "packets_out_dropped": {},
+}
+
+
 class GcpCollector(VIMConnector):
     def __init__(self, vim_account: Dict):
         self.vim_account = vim_account
@@ -67,6 +102,13 @@ class GcpCollector(VIMConnector):
                 )
             except Exception as e:
                 log.error(e)
+            # Construct a client for interacting with metrics API.
+            try:
+                self.metric_client = monitoring_v3.MetricServiceClient(
+                    credentials=creds
+                )
+            except Exception as e:
+                log.error(e)
         else:
             log.error("It is not possible to init GCP with no credentials")
 
@@ -101,3 +143,56 @@ class GcpCollector(VIMConnector):
         except Exception as e:
             log.error(e)
         return status
+
+    def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+        log.debug("collect_metrics")
+
+        metric_results = []
+        log.info(metric_list)
+        for metric in metric_list:
+            server = metric["vm_id"]
+            metric_name = metric["metric"]
+            metric_mapping = METRIC_MAPPINGS.get(metric_name)
+            if not metric_mapping:
+                # log.info(f"Metric {metric_name} not available in GCP")
+                continue
+            gcp_metric_type = metric_mapping["metrictype"]
+            metric_multiplier = metric_mapping.get("multiplier", 1)
+            log.info(f"server: {server}, gcp_metric_type: {gcp_metric_type}")
+
+            end = int(time.time())
+            start = end - 600
+            interval = monitoring_v3.TimeInterval(
+                {
+                    "end_time": {"seconds": end},
+                    "start_time": {"seconds": start},
+                }
+            )
+            aggregation = monitoring_v3.Aggregation(
+                {
+                    "alignment_period": {"seconds": 600},
+                    "per_series_aligner": monitoring_v3.Aggregation.Aligner.ALIGN_MEAN,
+                }
+            )
+            filter = f'metric.type = "{gcp_metric_type}" AND metric.labels.instance_name = "{server}"'
+            project = f"projects/{self.project}"
+            log.info(f"filter: {filter}")
+            results = self.metric_client.list_time_series(
+                request={
+                    "name": project,
+                    "filter": filter,
+                    "interval": interval,
+                    "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
+                    "aggregation": aggregation,
+                }
+            )
+            value = None
+            for result in results:
+                for point in result.points:
+                    value = point.value.double_value
+            if value is not None:
+                metric["value"] = value * metric_multiplier
+                log.info(f'value: {metric["value"]}')
+                metric_results.append(metric)
+
+        return metric_results