X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcommon%2Fmon_client.py;h=b8c3779114f77da530e25fa6ba09da29269fe98e;hb=cb5642a2af495af4319beb1dba2d417b78f3200a;hp=5d2416e7f3845847e589f3c1cbf7a02ee35ad4ca;hpb=d8e91c539f8a1657a93c6ed59dd5f9ad489684ef;p=osm%2FPOL.git diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index 5d2416e..b8c3779 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -21,12 +21,14 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import asyncio import json import logging import random -import uuid +from json import JSONDecodeError -from kafka import KafkaProducer, KafkaConsumer +import yaml +from aiokafka import AIOKafkaProducer, AIOKafkaConsumer from osm_policy_module.core.config import Config @@ -34,57 +36,185 @@ log = logging.getLogger(__name__) class MonClient: - def __init__(self): - cfg = Config.instance() - self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) - self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - - def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int, - statistic: str, operation: str): - cor_id = random.randint(1, 1000000) - msg = self._create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold, statistic, - operation) - log.info("Sending create_alarm_request %s", msg) - self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg)) - self.producer.flush() - consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - consumer_timeout_ms=10000, - group_id='mon-client-' + str(uuid.uuid4())) - consumer.subscribe(['alarm_response']) - for message in consumer: - if message.key == 'create_alarm_response': - content = json.loads(message.value) - log.info("Received create_alarm_response %s", content) - if self._is_alarm_response_correlation_id_eq(cor_id, content): - alarm_uuid = content['alarm_create_response']['alarm_uuid'] - # TODO Handle error response - return alarm_uuid - - raise ValueError('Timeout: No alarm creation response from MON. Is MON up?') - - def _create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, - threshold: int, statistic: str, operation: str): + def __init__(self, config: Config, loop=None): + self.kafka_server = "{}:{}".format( + config.get("message", "host"), config.get("message", "port") + ) + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + + async def create_alarm( + self, + metric_name: str, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + threshold: int, + operation: str, + statistic: str = "AVERAGE", + action: str = '', + ): + cor_id = random.randint(1, 10e7) + msg = self._build_create_alarm_payload( + cor_id, + metric_name, + ns_id, + vdu_name, + vnf_member_index, + threshold, + statistic, + operation, + action, + ) + log.debug("Sending create_alarm_request %s", msg) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode, + ) + await producer.start() + try: + await producer.send_and_wait( + "alarm_request", key="create_alarm_request", value=json.dumps(msg) + ) + finally: + await producer.stop() + log.debug("Waiting for create_alarm_response...") + consumer = AIOKafkaConsumer( + "alarm_response_" + str(cor_id), + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + auto_offset_reset="earliest", + ) + await consumer.start() + alarm_uuid = None + try: + async for message in consumer: + try: + content = json.loads(message.value) + except JSONDecodeError: + content = yaml.safe_load(message.value) + log.debug("Received create_alarm_response %s", content) + if content["alarm_create_response"]["correlation_id"] == cor_id: + if not content["alarm_create_response"]["status"]: + raise ValueError("Error creating alarm in MON") + alarm_uuid = content["alarm_create_response"]["alarm_uuid"] + break + finally: + await consumer.stop() + if not alarm_uuid: + raise ValueError("No alarm deletion response from MON. Is MON up?") + return alarm_uuid + + async def delete_alarm( + self, ns_id: str, vnf_member_index: str, vdu_name: str, alarm_uuid: str + ): + cor_id = random.randint(1, 10e7) + msg = self._build_delete_alarm_payload( + cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid + ) + log.debug("Sending delete_alarm_request %s", msg) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode, + ) + await producer.start() + try: + await producer.send_and_wait( + "alarm_request", key="delete_alarm_request", value=json.dumps(msg) + ) + finally: + await producer.stop() + log.debug("Waiting for delete_alarm_response...") + consumer = AIOKafkaConsumer( + "alarm_response_" + str(cor_id), + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + auto_offset_reset="earliest", + ) + await consumer.start() + alarm_uuid = None + try: + async for message in consumer: + try: + content = json.loads(message.value) + except JSONDecodeError: + content = yaml.safe_load(message.value) + if content["alarm_delete_response"]["correlation_id"] == cor_id: + log.debug("Received delete_alarm_response %s", content) + if not content["alarm_delete_response"]["status"]: + raise ValueError( + "Error deleting alarm in MON. Response status is False." + ) + alarm_uuid = content["alarm_delete_response"]["alarm_uuid"] + break + finally: + await consumer.stop() + if not alarm_uuid: + raise ValueError("No alarm deletion response from MON. Is MON up?") + return alarm_uuid + + def _build_create_alarm_payload( + self, + cor_id: int, + metric_name: str, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + threshold: int, + statistic: str, + operation: str, + action: str, + ): + alarm_create_request = { - 'correlation_id': cor_id, - 'alarm_name': str(uuid.uuid4()), - 'metric_name': metric_name, - 'ns_id': ns_id, - 'vdu_name': vdu_name, - 'vnf_member_index': vnf_member_index, - 'operation': operation, - 'severity': 'critical', - 'threshold_value': threshold, - 'statistic': statistic + "correlation_id": cor_id, + "alarm_name": "osm_alarm_{}_{}_{}_{}".format( + ns_id, vnf_member_index, vdu_name, metric_name + ), + "metric_name": metric_name, + "operation": operation, + "severity": "critical", + "threshold_value": threshold, + "statistic": statistic, + "action": action, + "tags": { + "ns_id": ns_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_member_index, + }, } msg = { - 'alarm_create_request': alarm_create_request, + "alarm_create_request": alarm_create_request, } return msg - def _is_alarm_response_correlation_id_eq(self, cor_id, message_content): - return message_content['alarm_create_response']['correlation_id'] == cor_id + def _build_delete_alarm_payload( + self, + cor_id: int, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + alarm_uuid: str, + ): + alarm_delete_request = { + "correlation_id": cor_id, + "alarm_uuid": alarm_uuid, + "tags": { + "ns_id": ns_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_member_index, + }, + } + msg = { + "alarm_delete_request": alarm_delete_request, + } + return msg