X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcommon%2Fmon_client.py;h=e4f75336aaf02ace3a5463e36b876bfc965b25a4;hb=bfe6988e8ec5ad9283200f46134529cac10e006c;hp=f9a51cd6fac69ac31312b1a0a8f3e6f612653134;hpb=1087dcb824d02e6b1f83d6c34375f90701a724c6;p=osm%2FPOL.git diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index f9a51cd..e4f7533 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -21,10 +21,9 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import asyncio import json import logging -import random +from random import SystemRandom from json import JSONDecodeError import yaml @@ -36,13 +35,10 @@ log = logging.getLogger(__name__) class MonClient: - def __init__(self, config: Config, loop=None): + def __init__(self, config: Config): self.kafka_server = "{}:{}".format( config.get("message", "host"), config.get("message", "port") ) - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop async def create_alarm( self, @@ -54,8 +50,10 @@ class MonClient: operation: str, statistic: str = "AVERAGE", action: str = "", + vnfr: object = None, + vnfd: object = None, ): - cor_id = random.randint(1, 10e7) + cor_id = SystemRandom().randint(1, 10e7) msg = self._build_create_alarm_payload( cor_id, metric_name, @@ -66,10 +64,11 @@ class MonClient: statistic, operation, action, + vnfr, + vnfd, ) log.debug("Sending create_alarm_request %s", msg) producer = AIOKafkaProducer( - loop=self.loop, bootstrap_servers=self.kafka_server, key_serializer=str.encode, value_serializer=str.encode, @@ -84,7 +83,6 @@ class MonClient: log.debug("Waiting for create_alarm_response...") consumer = AIOKafkaConsumer( "alarm_response_" + str(cor_id), - loop=self.loop, bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, @@ -113,13 +111,12 @@ class MonClient: async def delete_alarm( self, ns_id: str, vnf_member_index: str, vdu_name: str, alarm_uuid: str ): - cor_id = random.randint(1, 10e7) + cor_id = SystemRandom().randint(1, 10e7) msg = self._build_delete_alarm_payload( cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid ) log.debug("Sending delete_alarm_request %s", msg) producer = AIOKafkaProducer( - loop=self.loop, bootstrap_servers=self.kafka_server, key_serializer=str.encode, value_serializer=str.encode, @@ -134,7 +131,6 @@ class MonClient: log.debug("Waiting for delete_alarm_response...") consumer = AIOKafkaConsumer( "alarm_response_" + str(cor_id), - loop=self.loop, bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, @@ -173,7 +169,27 @@ class MonClient: statistic: str, operation: str, action: str, + vnfr=None, + vnfd=None, ): + tags = { + "ns_id": ns_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_member_index, + } + if vnfr and vnfd: + # TODO: Change for multiple DF support + df = vnfd.get("df", [{}])[0] + metric_port = 9100 + if "exporters-endpoints" in df: + metric_port = df["exporters-endpoints"].get("metric-port", 9100) + if metric_name.startswith("kpi_"): + metric_name = metric_name.replace("kpi_", "") + metric_name.strip() + for vdu in vnfr["vdur"]: + if vdu["name"] == vdu_name: + vdu_ip = vdu["ip-address"] + tags = {"instance": vdu_ip + ":" + str(metric_port)} alarm_create_request = { "correlation_id": cor_id, "alarm_name": "osm_alarm_{}_{}_{}_{}".format( @@ -185,11 +201,7 @@ class MonClient: "threshold_value": threshold, "statistic": statistic, "action": action, - "tags": { - "ns_id": ns_id, - "vdu_name": vdu_name, - "vnf_member_index": vnf_member_index, - }, + "tags": tags, } msg = { "alarm_create_request": alarm_create_request,