- def __init__(self, loop=None):
- cfg = Config.instance()
- self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
- cfg.OSMPOL_MESSAGE_PORT)
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
-
- async def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int,
- statistic: str, operation: str):
- cor_id = random.randint(1, 10e7)
- msg = self._build_create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold,
- statistic,
- operation)
- log.info("Sending create_alarm_request %s", msg)
- producer = AIOKafkaProducer(loop=self.loop,
- bootstrap_servers=self.kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
+ def __init__(self, config: Config):
+ self.kafka_server = "{}:{}".format(
+ config.get("message", "host"), config.get("message", "port")
+ )
+
+ async def create_alarm(
+ self,
+ metric_name: str,
+ ns_id: str,
+ vdu_name: str,
+ vnf_member_index: str,
+ threshold: int,
+ operation: str,
+ statistic: str = "AVERAGE",
+ action: str = "",
+ vnfr: object = None,
+ vnfd: object = None,
+ ):
+ cor_id = SystemRandom().randint(1, 10e7)
+ msg = self._build_create_alarm_payload(
+ cor_id,
+ metric_name,
+ ns_id,
+ vdu_name,
+ vnf_member_index,
+ threshold,
+ statistic,
+ operation,
+ action,
+ vnfr,
+ vnfd,
+ )
+ log.debug("Sending create_alarm_request %s", msg)
+ producer = AIOKafkaProducer(
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode,
+ )