X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcommon%2Fmon_client.py;fp=osm_policy_module%2Fcommon%2Fmon_client.py;h=911e655455b416d48162e3fc9fb0684cf76103e6;hb=16256cbbf6cdfde8debc3254bf55ce0b8fa51b08;hp=630902559d10f691b5101c9f789cc0ef33ab14be;hpb=a4e615d9bab28eefa0ae1e713ad75289ed2f65cc;p=osm%2FPOL.git diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index 6309025..911e655 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -21,12 +21,13 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import asyncio import json import logging import random import uuid -from kafka import KafkaProducer, KafkaConsumer +from aiokafka import AIOKafkaProducer, AIOKafkaConsumer from osm_policy_module.core.config import Config @@ -34,63 +35,92 @@ log = logging.getLogger(__name__) class MonClient: - def __init__(self): + def __init__(self, loop=None): cfg = Config.instance() self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, cfg.OSMPOL_MESSAGE_PORT) - self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop - def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int, - statistic: str, operation: str): - cor_id = random.randint(1, 1000000) + async def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int, + statistic: str, operation: str): + cor_id = random.randint(1, 10e7) msg = self._build_create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold, statistic, operation) log.info("Sending create_alarm_request %s", msg) - self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg)) - self.producer.flush() - consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - consumer_timeout_ms=10000, - group_id='mon-client-' + str(uuid.uuid4())) - consumer.subscribe(['alarm_response']) - for message in consumer: - if message.key == 'create_alarm_response': - content = json.loads(message.value) - log.info("Received create_alarm_response %s", content) - if self._is_alarm_response_correlation_id_eq(cor_id, content): - if not content['alarm_create_response']['status']: - raise ValueError("Error creating alarm in MON") - alarm_uuid = content['alarm_create_response']['alarm_uuid'] - return alarm_uuid - + producer = AIOKafkaProducer(loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode) + await producer.start() + consumer = AIOKafkaConsumer( + "alarm_response", + loop=self.loop, + bootstrap_servers=self.kafka_server, + group_id="pol-consumer-" + str(uuid.uuid4()), + enable_auto_commit=False, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + consumer_timeout_ms=10000) + await consumer.start() + try: + await producer.send_and_wait("alarm_request", key="create_alarm_request", value=json.dumps(msg)) + finally: + await producer.stop() + try: + async for message in consumer: + if message.key == 'create_alarm_response': + content = json.loads(message.value) + log.info("Received create_alarm_response %s", content) + if self._is_alarm_response_correlation_id_eq(cor_id, content): + if not content['alarm_create_response']['status']: + raise ValueError("Error creating alarm in MON") + alarm_uuid = content['alarm_create_response']['alarm_uuid'] + await consumer.stop() + return alarm_uuid + finally: + await consumer.stop() raise ValueError('Timeout: No alarm creation response from MON. Is MON up?') - def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str): - cor_id = random.randint(1, 1000000) + async def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str): + cor_id = random.randint(1, 10e7) msg = self._build_delete_alarm_payload(cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid) log.info("Sending delete_alarm_request %s", msg) - self.producer.send(topic='alarm_request', key='delete_alarm_request', value=json.dumps(msg)) - self.producer.flush() - consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - consumer_timeout_ms=10000) - consumer.subscribe(['alarm_response']) - for message in consumer: - if message.key == 'delete_alarm_response': - content = json.loads(message.value) - log.info("Received delete_alarm_response %s", content) - if self._is_alarm_response_correlation_id_eq(cor_id, content): - if not content['alarm_delete_response']['status']: - raise ValueError("Error deleting alarm in MON") - alarm_uuid = content['alarm_delete_response']['alarm_uuid'] - return alarm_uuid - - raise ValueError('Timeout: No alarm creation response from MON. Is MON up?') + producer = AIOKafkaProducer(loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode) + await producer.start() + consumer = AIOKafkaConsumer( + "alarm_response", + loop=self.loop, + bootstrap_servers=self.kafka_server, + group_id="pol-consumer-" + str(uuid.uuid4()), + enable_auto_commit=False, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + consumer_timeout_ms=10000) + await consumer.start() + try: + await producer.send_and_wait("alarm_request", key="delete_alarm_request", value=json.dumps(msg)) + finally: + await producer.stop() + try: + async for message in consumer: + if message.key == 'delete_alarm_response': + content = json.loads(message.value) + log.info("Received delete_alarm_response %s", content) + if self._is_alarm_response_correlation_id_eq(cor_id, content): + if not content['alarm_delete_response']['status']: + raise ValueError("Error deleting alarm in MON") + alarm_uuid = content['alarm_delete_response']['alarm_uuid'] + await consumer.stop() + return alarm_uuid + finally: + await consumer.stop() + raise ValueError('Timeout: No alarm deletion response from MON. Is MON up?') def _build_create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int,