# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
import json
import logging
import random
class MonClient:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.kafka_server = "{}:{}".format(
config.get("message", "host"), config.get("message", "port")
)
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
async def create_alarm(
self,
threshold: int,
operation: str,
statistic: str = "AVERAGE",
+ action: str = "",
+ vnfr: object = None,
+ vnfd: object = None,
):
cor_id = random.randint(1, 10e7)
msg = self._build_create_alarm_payload(
threshold,
statistic,
operation,
+ action,
+ vnfr,
+ vnfd,
)
log.debug("Sending create_alarm_request %s", msg)
producer = AIOKafkaProducer(
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
value_serializer=str.encode,
log.debug("Waiting for create_alarm_response...")
consumer = AIOKafkaConsumer(
"alarm_response_" + str(cor_id),
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
)
log.debug("Sending delete_alarm_request %s", msg)
producer = AIOKafkaProducer(
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
value_serializer=str.encode,
log.debug("Waiting for delete_alarm_response...")
consumer = AIOKafkaConsumer(
"alarm_response_" + str(cor_id),
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
threshold: int,
statistic: str,
operation: str,
+ action: str,
+ vnfr=None,
+ vnfd=None,
):
-
+ tags = {
+ "ns_id": ns_id,
+ "vdu_name": vdu_name,
+ "vnf_member_index": vnf_member_index,
+ }
+ if vnfr and vnfd:
+ # TODO: Change for multiple DF support
+ df = vnfd.get("df", [{}])[0]
+ if "exporters-endpoints" in df:
+ metric_port = df["exporters-endpoints"].get("metric-port", 9100)
+ if metric_name.startswith("kpi_"):
+ metric_name = metric_name.replace("kpi_", "")
+ metric_name.strip()
+ for vdu in vnfr["vdur"]:
+ if vdu["name"] == vdu_name:
+ vdu_ip = vdu["ip-address"]
+ tags = {"instance": vdu_ip + ":" + str(metric_port)}
alarm_create_request = {
"correlation_id": cor_id,
"alarm_name": "osm_alarm_{}_{}_{}_{}".format(
"severity": "critical",
"threshold_value": threshold,
"statistic": statistic,
- "tags": {
- "ns_id": ns_id,
- "vdu_name": vdu_name,
- "vnf_member_index": vnf_member_index,
- },
+ "action": action,
+ "tags": tags,
}
msg = {
"alarm_create_request": alarm_create_request,