Airflow DAG and connectors to get SDNC status
[osm/NG-SA.git] / src / osm_ngsa / osm_mon / vim_connectors / azure.py
index 161e596..ede0152 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
+import datetime
 import logging
 from typing import Dict, List
 
 from azure.identity import ClientSecretCredential
 from azure.mgmt.compute import ComputeManagementClient
+from azure.mgmt.monitor import MonitorManagementClient
 from azure.profiles import ProfileDefinition
 from osm_mon.vim_connectors.base_vim import VIMConnector
 
@@ -26,6 +28,35 @@ from osm_mon.vim_connectors.base_vim import VIMConnector
 log = logging.getLogger(__name__)
 
 
+METRIC_MAPPINGS = {
+    "cpu_utilization": {
+        "metricname": "Percentage CPU",
+        "aggregation": "Average",
+    },
+    "disk_read_ops": {
+        "metricname": "Disk Read Operations/Sec",
+        "aggregation": "Average",
+    },
+    "disk_write_ops": {
+        "metricname": "Disk Write Operations/Sec",
+        "aggregation": "Average",
+    },
+    "disk_read_bytes": {
+        "metricname": "Disk Read Bytes",
+        "aggregation": "Total",
+    },
+    "disk_write_bytes": {
+        "metricname": "Disk Write Bytes",
+        "aggregation": "Total",
+    },
+    # "average_memory_utilization": {},
+    # "packets_in_dropped": {},
+    # "packets_out_dropped": {},
+    # "packets_received": {},
+    # "packets_sent": {},
+}
+
+
 class AzureCollector(VIMConnector):
     # Translate azure provisioning state to OSM provision state.
     # The first three ones are the transitional status once a user initiated
@@ -102,8 +133,7 @@ class AzureCollector(VIMConnector):
     def __init__(self, vim_account: Dict):
         self.vim_account = vim_account
         self.reload_client = True
-        logger = logging.getLogger("azure")
-        logger.setLevel(logging.ERROR)
+
         # Store config to create azure subscription later
         self._config = {
             "user": vim_account["vim_user"],
@@ -141,6 +171,11 @@ class AzureCollector(VIMConnector):
                     self._config["subscription_id"],
                     profile=self.AZURE_COMPUTE_MGMT_PROFILE,
                 )
+                # create client
+                self.conn_monitor = MonitorManagementClient(
+                    self.credentials,
+                    self._config["subscription_id"],
+                )
                 # Set to client created
                 self.reload_client = False
             except Exception as e:
@@ -190,3 +225,57 @@ class AzureCollector(VIMConnector):
         except Exception as e:
             log.error(e)
         return status
+
+    def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+        log.debug("collect_metrics")
+        self._reload_connection()
+
+        metric_results = []
+        log.info(metric_list)
+        for metric in metric_list:
+            server = metric["vm_id"]
+            metric_name = metric["metric"]
+            metric_mapping = METRIC_MAPPINGS.get(metric_name)
+            if not metric_mapping:
+                # log.info(f"Metric {metric_name} not available in Azure")
+                continue
+            azure_metric_name = metric_mapping["metricname"]
+            azure_aggregation = metric_mapping["aggregation"]
+            end = datetime.datetime.now()
+            init = end - datetime.timedelta(minutes=5)
+            try:
+                metrics_data = self.conn_monitor.metrics.list(
+                    server,
+                    timespan="{}/{}".format(init, end),
+                    interval="PT1M",
+                    metricnames=azure_metric_name,
+                    aggregation=azure_aggregation,
+                )
+            except Exception as e:
+                log.error(e)
+                continue
+            total = 0
+            n_metrics = 0
+            for item in metrics_data.value:
+                log.info("{} ({})".format(item.name.localized_value, item.unit))
+                for timeserie in item.timeseries:
+                    for data in timeserie.data:
+                        if azure_aggregation == "Average":
+                            val = data.average
+                        elif azure_aggregation == "Total":
+                            val = data.total
+                        else:
+                            val = None
+                        log.info("{}: {}".format(data.time_stamp, val))
+                        if val is not None:
+                            total += val
+                            n_metrics += 1
+            if n_metrics > 0:
+                value = total / n_metrics
+                log.info(f"value = {value}")
+                metric["value"] = value
+                metric_results.append(metric)
+            else:
+                log.info("No metric available")
+
+        return metric_results