Feature 10981: add Openstack metrics collector and scale-out/in DAGs for autoscaling
[osm/NG-SA.git] / src / osm_ngsa / osm_mon / vim_connectors / openstack.py
index d37973d..1eb33af 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
+from enum import Enum
 import logging
+import time
 from typing import Dict, List
 
+from ceilometerclient import client as ceilometer_client
+from ceilometerclient.exc import HTTPException
+import gnocchiclient.exceptions
+from gnocchiclient.v1 import client as gnocchi_client
 from keystoneauth1 import session
+from keystoneauth1.exceptions.catalog import EndpointNotFound
 from keystoneauth1.identity import v3
 from novaclient import client as nova_client
 from osm_mon.vim_connectors.base_vim import VIMConnector
+from prometheus_api_client import PrometheusConnect as prometheus_client
 
 log = logging.getLogger(__name__)
 
+METRIC_MULTIPLIERS = {"cpu": 0.0000001}
+
+METRIC_AGGREGATORS = {"cpu": "rate:mean"}
+
+INTERFACE_METRICS = [
+    "packets_in_dropped",
+    "packets_out_dropped",
+    "packets_received",
+    "packets_sent",
+]
+
+INSTANCE_DISK = [
+    "disk_read_ops",
+    "disk_write_ops",
+    "disk_read_bytes",
+    "disk_write_bytes",
+]
+
+METRIC_MAPPINGS = {
+    "average_memory_utilization": "memory.usage",
+    "disk_read_ops": "disk.device.read.requests",
+    "disk_write_ops": "disk.device.write.requests",
+    "disk_read_bytes": "disk.device.read.bytes",
+    "disk_write_bytes": "disk.device.write.bytes",
+    "packets_in_dropped": "network.outgoing.packets.drop",
+    "packets_out_dropped": "network.incoming.packets.drop",
+    "packets_received": "network.incoming.packets",
+    "packets_sent": "network.outgoing.packets",
+    "cpu_utilization": "cpu",
+}
+
+METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD = {
+    "cpu_utilization": "cpu",
+    "average_memory_utilization": "memory_usage",
+    "disk_read_ops": "disk_device_read_requests",
+    "disk_write_ops": "disk_device_write_requests",
+    "disk_read_bytes": "disk_device_read_bytes",
+    "disk_write_bytes": "disk_device_write_bytes",
+    "packets_in_dropped": "network_incoming_packets_drop",
+    "packets_out_dropped": "network_outgoing_packets_drop",
+    "packets_received": "network_incoming_packets",
+    "packets_sent": "network_outgoing_packets",
+}
+
+
+class MetricType(Enum):
+    INSTANCE = "instance"
+    INTERFACE_ALL = "interface_all"
+    INTERFACE_ONE = "interface_one"
+    INSTANCEDISK = "instancedisk"
+
 
 class CertificateNotCreated(Exception):
     pass
@@ -31,13 +90,16 @@ class CertificateNotCreated(Exception):
 
 class OpenStackCollector(VIMConnector):
     def __init__(self, vim_account: Dict):
-        log.info("__init__")
+        log.debug("__init__")
         self.vim_account = vim_account
         self.vim_session = None
         self.vim_session = self._get_session(vim_account)
         self.nova = self._build_nova_client()
+        # self.gnocchi = self._build_gnocchi_client()
+        self.backend = self._get_backend(vim_account, self.vim_session)
 
     def _get_session(self, creds: Dict):
+        log.debug("_get_session")
         verify_ssl = True
         project_domain_name = "Default"
         user_domain_name = "Default"
@@ -67,11 +129,35 @@ class OpenStackCollector(VIMConnector):
         except CertificateNotCreated as e:
             log.error(e)
 
+    def _get_backend(self, vim_account: dict, vim_session: object):
+        if vim_account.get("prometheus-config"):
+            # try:
+            #     tsbd = PrometheusTSBDBackend(vim_account)
+            #     log.debug("Using prometheustsbd backend to collect metric")
+            #     return tsbd
+            # except Exception as e:
+            #     log.error(f"Can't create prometheus client, {e}")
+            #     return None
+            return None
+        try:
+            gnocchi = GnocchiBackend(vim_account, vim_session)
+            gnocchi.client.metric.list(limit=1)
+            log.debug("Using gnocchi backend to collect metric")
+            return gnocchi
+        except (HTTPException, EndpointNotFound):
+            ceilometer = CeilometerBackend(vim_account, vim_session)
+            ceilometer.client.capabilities.get()
+            log.debug("Using ceilometer backend to collect metric")
+            return ceilometer
+
     def _build_nova_client(self) -> nova_client.Client:
         return nova_client.Client("2", session=self.vim_session, timeout=10)
 
+    def _build_gnocchi_client(self) -> gnocchi_client.Client:
+        return gnocchi_client.Client(session=self.vim_session)
+
     def collect_servers_status(self) -> List[Dict]:
-        log.info("collect_servers_status")
+        log.debug("collect_servers_status")
         servers = []
         for server in self.nova.servers.list(detailed=True):
             vm = {
@@ -83,9 +169,252 @@ class OpenStackCollector(VIMConnector):
         return servers
 
     def is_vim_ok(self) -> bool:
+        log.debug("is_vim_ok")
         try:
             self.nova.servers.list()
             return True
         except Exception as e:
             log.warning("VIM status is not OK: %s" % e)
             return False
+
+    def _get_metric_type(self, metric_name: str) -> MetricType:
+        if metric_name not in INTERFACE_METRICS:
+            if metric_name not in INSTANCE_DISK:
+                return MetricType.INSTANCE
+            else:
+                return MetricType.INSTANCEDISK
+        else:
+            return MetricType.INTERFACE_ALL
+
+    def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+        log.debug("collect_metrics")
+        if not self.backend:
+            log.error("Undefined backend")
+            return []
+
+        if type(self.backend) is PrometheusTSBDBackend:
+            log.info("Using Prometheus as backend (NOT SUPPORTED)")
+            return []
+
+        metric_results = []
+        for metric in metric_list:
+            server = metric["vm_id"]
+            metric_name = metric["metric"]
+            openstack_metric_name = METRIC_MAPPINGS[metric_name]
+            metric_type = self._get_metric_type(metric_name)
+            log.info(f"Collecting metric {openstack_metric_name} for {server}")
+            try:
+                value = self.backend.collect_metric(
+                    metric_type, openstack_metric_name, server
+                )
+                if value is not None:
+                    log.info(f"value: {value}")
+                    metric["value"] = value
+                    metric_results.append(metric)
+                else:
+                    log.info("metric value is empty")
+            except Exception as e:
+                log.error("Error in metric collection: %s" % e)
+        return metric_results
+
+
+class OpenstackBackend:
+    def collect_metric(
+        self, metric_type: MetricType, metric_name: str, resource_id: str
+    ):
+        pass
+
+
+class PrometheusTSBDBackend(OpenstackBackend):
+    def __init__(self, vim_account: dict):
+        self.map = self._build_map(vim_account)
+        self.cred = vim_account["prometheus-config"].get("prometheus-cred")
+        self.client = self._build_prometheus_client(
+            vim_account["prometheus-config"]["prometheus-url"]
+        )
+
+    def _build_prometheus_client(self, url: str) -> prometheus_client:
+        return prometheus_client(url, disable_ssl=True)
+
+    def _build_map(self, vim_account: dict) -> dict:
+        custom_map = METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD
+        if "prometheus-map" in vim_account["prometheus-config"]:
+            custom_map.update(vim_account["prometheus-config"]["prometheus-map"])
+        return custom_map
+
+    def collect_metric(
+        self, metric_type: MetricType, metric_name: str, resource_id: str
+    ):
+        metric = self.query_metric(metric_name, resource_id)
+        return metric["value"][1] if metric else None
+
+    def map_metric(self, metric_name: str):
+        return self.map[metric_name]
+
+    def query_metric(self, metric_name, resource_id=None):
+        metrics = self.client.get_current_metric_value(metric_name=metric_name)
+        if resource_id:
+            metric = next(
+                filter(lambda x: resource_id in x["metric"]["resource_id"], metrics)
+            )
+            return metric
+        return metrics
+
+
+class GnocchiBackend(OpenstackBackend):
+    def __init__(self, vim_account: dict, vim_session: object):
+        self.client = self._build_gnocchi_client(vim_account, vim_session)
+
+    def _build_gnocchi_client(
+        self, vim_account: dict, vim_session: object
+    ) -> gnocchi_client.Client:
+        return gnocchi_client.Client(session=vim_session)
+
+    def collect_metric(
+        self, metric_type: MetricType, metric_name: str, resource_id: str
+    ):
+        if metric_type == MetricType.INTERFACE_ALL:
+            return self._collect_interface_all_metric(metric_name, resource_id)
+
+        elif metric_type == MetricType.INSTANCE:
+            return self._collect_instance_metric(metric_name, resource_id)
+
+        elif metric_type == MetricType.INSTANCEDISK:
+            return self._collect_instance_disk_metric(metric_name, resource_id)
+
+        else:
+            raise Exception("Unknown metric type %s" % metric_type.value)
+
+    def _collect_interface_all_metric(self, openstack_metric_name, resource_id):
+        total_measure = None
+        interfaces = self.client.resource.search(
+            resource_type="instance_network_interface",
+            query={"=": {"instance_id": resource_id}},
+        )
+        for interface in interfaces:
+            try:
+                measures = self.client.metric.get_measures(
+                    openstack_metric_name, resource_id=interface["id"], limit=1
+                )
+                if measures:
+                    if not total_measure:
+                        total_measure = 0.0
+                    total_measure += measures[-1][2]
+            except (gnocchiclient.exceptions.NotFound, TypeError) as e:
+                # Gnocchi in some Openstack versions raise TypeError instead of NotFound
+                log.debug(
+                    "No metric %s found for interface %s: %s",
+                    openstack_metric_name,
+                    interface["id"],
+                    e,
+                )
+        return total_measure
+
+    def _collect_instance_disk_metric(self, openstack_metric_name, resource_id):
+        value = None
+        instances = self.client.resource.search(
+            resource_type="instance_disk",
+            query={"=": {"instance_id": resource_id}},
+        )
+        for instance in instances:
+            try:
+                measures = self.client.metric.get_measures(
+                    openstack_metric_name, resource_id=instance["id"], limit=1
+                )
+                if measures:
+                    value = measures[-1][2]
+
+            except gnocchiclient.exceptions.NotFound as e:
+                log.debug(
+                    "No metric %s found for instance disk %s: %s",
+                    openstack_metric_name,
+                    instance["id"],
+                    e,
+                )
+        return value
+
+    def _collect_instance_metric(self, openstack_metric_name, resource_id):
+        value = None
+        try:
+            aggregation = METRIC_AGGREGATORS.get(openstack_metric_name)
+
+            try:
+                measures = self.client.metric.get_measures(
+                    openstack_metric_name,
+                    aggregation=aggregation,
+                    start=time.time() - 1200,
+                    resource_id=resource_id,
+                )
+                if measures:
+                    value = measures[-1][2]
+            except (
+                gnocchiclient.exceptions.NotFound,
+                gnocchiclient.exceptions.BadRequest,
+                TypeError,
+            ) as e:
+                # CPU metric in previous Openstack versions do not support rate:mean aggregation method
+                # Gnocchi in some Openstack versions raise TypeError instead of NotFound or BadRequest
+                if openstack_metric_name == "cpu":
+                    log.debug(
+                        "No metric %s found for instance %s: %s",
+                        openstack_metric_name,
+                        resource_id,
+                        e,
+                    )
+                    log.info(
+                        "Retrying to get metric %s for instance %s without aggregation",
+                        openstack_metric_name,
+                        resource_id,
+                    )
+                    measures = self.client.metric.get_measures(
+                        openstack_metric_name, resource_id=resource_id, limit=1
+                    )
+                else:
+                    raise e
+                # measures[-1] is the last measure
+                # measures[-2] is the previous measure
+                # measures[x][2] is the value of the metric
+                if measures and len(measures) >= 2:
+                    value = measures[-1][2] - measures[-2][2]
+            if value:
+                # measures[-1][0] is the time of the reporting interval
+                # measures[-1][1] is the duration of the reporting interval
+                if aggregation:
+                    # If this is an aggregate, we need to divide the total over the reported time period.
+                    # Even if the aggregation method is not supported by Openstack, the code will execute it
+                    # because aggregation is specified in METRIC_AGGREGATORS
+                    value = value / measures[-1][1]
+                if openstack_metric_name in METRIC_MULTIPLIERS:
+                    value = value * METRIC_MULTIPLIERS[openstack_metric_name]
+        except gnocchiclient.exceptions.NotFound as e:
+            log.debug(
+                "No metric %s found for instance %s: %s",
+                openstack_metric_name,
+                resource_id,
+                e,
+            )
+        return value
+
+
+class CeilometerBackend(OpenstackBackend):
+    def __init__(self, vim_account: dict, vim_session: object):
+        self.client = self._build_ceilometer_client(vim_account, vim_session)
+
+    def _build_ceilometer_client(
+        self, vim_account: dict, vim_session: object
+    ) -> ceilometer_client.Client:
+        return ceilometer_client.Client("2", session=vim_session)
+
+    def collect_metric(
+        self, metric_type: MetricType, metric_name: str, resource_id: str
+    ):
+        if metric_type != MetricType.INSTANCE:
+            raise NotImplementedError(
+                "Ceilometer backend only support instance metrics"
+            )
+        measures = self.client.samples.list(
+            meter_name=metric_name,
+            limit=1,
+            q=[{"field": "resource_id", "op": "eq", "value": resource_id}],
+        )
+        return measures[0].counter_volume if measures else None