# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
import json
import logging
import random
class MonClient:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.kafka_server = "{}:{}".format(
config.get("message", "host"), config.get("message", "port")
)
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
async def create_alarm(
self,
)
log.debug("Sending create_alarm_request %s", msg)
producer = AIOKafkaProducer(
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
value_serializer=str.encode,
log.debug("Waiting for create_alarm_response...")
consumer = AIOKafkaConsumer(
"alarm_response_" + str(cor_id),
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
)
log.debug("Sending delete_alarm_request %s", msg)
producer = AIOKafkaProducer(
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
value_serializer=str.encode,
log.debug("Waiting for delete_alarm_response...")
consumer = AIOKafkaConsumer(
"alarm_response_" + str(cor_id),
- loop=self.loop,
bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,