X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcommon%2Fmon_client.py;h=76a6f52c5a052699b35e2da02cce4ba4e23c4d28;hb=refs%2Fchanges%2F78%2F7378%2F1;hp=2f2dc12b293ee466100ded5a916916cc4552bfcb;hpb=312c16596975a42d6294a1a2ca7af98b0ff2ffb5;p=osm%2FPOL.git diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index 2f2dc12..76a6f52 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -25,8 +25,9 @@ import asyncio import json import logging import random -import uuid +from json import JSONDecodeError +import yaml from aiokafka import AIOKafkaProducer, AIOKafkaConsumer from osm_policy_module.core.config import Config @@ -35,10 +36,9 @@ log = logging.getLogger(__name__) class MonClient: - def __init__(self, loop=None): - cfg = Config.instance() - self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) + def __init__(self, config: Config, loop=None): + self.kafka_server = '{}:{}'.format(config.get('message', 'host'), + config.get('message', 'port')) if not loop: loop = asyncio.get_event_loop() self.loop = loop @@ -55,34 +55,37 @@ class MonClient: key_serializer=str.encode, value_serializer=str.encode) await producer.start() + try: + await producer.send_and_wait("alarm_request", key="create_alarm_request", value=json.dumps(msg)) + finally: + await producer.stop() + log.info("Waiting for create_alarm_response...") consumer = AIOKafkaConsumer( - "alarm_response", + "alarm_response_" + str(cor_id), loop=self.loop, bootstrap_servers=self.kafka_server, - group_id="pol-consumer-" + str(uuid.uuid4()), - enable_auto_commit=False, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - consumer_timeout_ms=10000) + auto_offset_reset='earliest') await consumer.start() - try: - await producer.send_and_wait("alarm_request", key="create_alarm_request", value=json.dumps(msg)) - finally: - await producer.stop() + alarm_uuid = None try: async for message in consumer: - if message.key == 'create_alarm_response': + try: content = json.loads(message.value) - log.info("Received create_alarm_response %s", content) - if content['alarm_create_response']['correlation_id'] == cor_id: - if not content['alarm_create_response']['status']: - raise ValueError("Error creating alarm in MON") - alarm_uuid = content['alarm_create_response']['alarm_uuid'] - await consumer.stop() - return alarm_uuid + except JSONDecodeError: + content = yaml.safe_load(message.value) + log.info("Received create_alarm_response %s", content) + if content['alarm_create_response']['correlation_id'] == cor_id: + if not content['alarm_create_response']['status']: + raise ValueError("Error creating alarm in MON") + alarm_uuid = content['alarm_create_response']['alarm_uuid'] + break finally: await consumer.stop() - raise ValueError('Timeout: No alarm creation response from MON. Is MON up?') + if not alarm_uuid: + raise ValueError('No alarm deletion response from MON. Is MON up?') + return alarm_uuid async def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str): cor_id = random.randint(1, 10e7) @@ -93,41 +96,44 @@ class MonClient: key_serializer=str.encode, value_serializer=str.encode) await producer.start() + try: + await producer.send_and_wait("alarm_request", key="delete_alarm_request", value=json.dumps(msg)) + finally: + await producer.stop() + log.info("Waiting for delete_alarm_response...") consumer = AIOKafkaConsumer( - "alarm_response", + "alarm_response_" + str(cor_id), loop=self.loop, bootstrap_servers=self.kafka_server, - group_id="pol-consumer-" + str(uuid.uuid4()), - enable_auto_commit=False, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - consumer_timeout_ms=10000) + auto_offset_reset='earliest') await consumer.start() - try: - await producer.send_and_wait("alarm_request", key="delete_alarm_request", value=json.dumps(msg)) - finally: - await producer.stop() + alarm_uuid = None try: async for message in consumer: - if message.key == 'delete_alarm_response': + try: content = json.loads(message.value) + except JSONDecodeError: + content = yaml.safe_load(message.value) + if content['alarm_delete_response']['correlation_id'] == cor_id: log.info("Received delete_alarm_response %s", content) - if content['alarm_delete_response']['correlation_id'] == cor_id: - if not content['alarm_delete_response']['status']: - raise ValueError("Error deleting alarm in MON") - alarm_uuid = content['alarm_delete_response']['alarm_uuid'] - await consumer.stop() - return alarm_uuid + if not content['alarm_delete_response']['status']: + raise ValueError("Error deleting alarm in MON. Response status is False.") + alarm_uuid = content['alarm_delete_response']['alarm_uuid'] + break finally: await consumer.stop() - raise ValueError('Timeout: No alarm deletion response from MON. Is MON up?') + if not alarm_uuid: + raise ValueError('No alarm deletion response from MON. Is MON up?') + return alarm_uuid def _build_create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int, statistic: str, operation: str): alarm_create_request = { 'correlation_id': cor_id, - 'alarm_name': str(uuid.uuid4()), + 'alarm_name': 'osm_alarm_{}_{}_{}_{}'.format(ns_id, vnf_member_index, vdu_name, metric_name), 'metric_name': metric_name, 'ns_id': ns_id, 'vdu_name': vdu_name,