X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcommon%2Fmon_client.py;h=e9216aabb108a3941c29b06ec1b009a3901f8a02;hb=bb6bdfab64270428f29dfa0f03d9b7c35c9faa90;hp=f9a51cd6fac69ac31312b1a0a8f3e6f612653134;hpb=1087dcb824d02e6b1f83d6c34375f90701a724c6;p=osm%2FPOL.git diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index f9a51cd..e9216aa 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -21,7 +21,6 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import asyncio import json import logging import random @@ -36,13 +35,10 @@ log = logging.getLogger(__name__) class MonClient: - def __init__(self, config: Config, loop=None): + def __init__(self, config: Config): self.kafka_server = "{}:{}".format( config.get("message", "host"), config.get("message", "port") ) - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop async def create_alarm( self, @@ -54,6 +50,8 @@ class MonClient: operation: str, statistic: str = "AVERAGE", action: str = "", + vnfr: object = None, + vnfd: object = None, ): cor_id = random.randint(1, 10e7) msg = self._build_create_alarm_payload( @@ -66,10 +64,11 @@ class MonClient: statistic, operation, action, + vnfr, + vnfd, ) log.debug("Sending create_alarm_request %s", msg) producer = AIOKafkaProducer( - loop=self.loop, bootstrap_servers=self.kafka_server, key_serializer=str.encode, value_serializer=str.encode, @@ -84,7 +83,6 @@ class MonClient: log.debug("Waiting for create_alarm_response...") consumer = AIOKafkaConsumer( "alarm_response_" + str(cor_id), - loop=self.loop, bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, @@ -119,7 +117,6 @@ class MonClient: ) log.debug("Sending delete_alarm_request %s", msg) producer = AIOKafkaProducer( - loop=self.loop, bootstrap_servers=self.kafka_server, key_serializer=str.encode, value_serializer=str.encode, @@ -134,7 +131,6 @@ class MonClient: log.debug("Waiting for delete_alarm_response...") consumer = AIOKafkaConsumer( "alarm_response_" + str(cor_id), - loop=self.loop, bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, @@ -173,7 +169,26 @@ class MonClient: statistic: str, operation: str, action: str, + vnfr=None, + vnfd=None, ): + tags = { + "ns_id": ns_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_member_index, + } + if vnfr and vnfd: + # TODO: Change for multiple DF support + df = vnfd.get("df", [{}])[0] + if "exporters-endpoints" in df: + metric_port = df["exporters-endpoints"].get("metric-port", 9100) + if metric_name.startswith("kpi_"): + metric_name = metric_name.replace("kpi_", "") + metric_name.strip() + for vdu in vnfr["vdur"]: + if vdu["name"] == vdu_name: + vdu_ip = vdu["ip-address"] + tags = {"instance": vdu_ip + ":" + str(metric_port)} alarm_create_request = { "correlation_id": cor_id, "alarm_name": "osm_alarm_{}_{}_{}_{}".format( @@ -185,11 +200,7 @@ class MonClient: "threshold_value": threshold, "statistic": statistic, "action": action, - "tags": { - "ns_id": ns_id, - "vdu_name": vdu_name, - "vnf_member_index": vnf_member_index, - }, + "tags": tags, } msg = { "alarm_create_request": alarm_create_request,