X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcommon%2Fmon_client.py;h=ff8339fe68da92f8f9eaa3fe735ebea5e13749a6;hb=4584f8e86a492d67d120bfea1195eff1475c0a65;hp=c83f7944decdce812303e1fbf01bf1f695beee1b;hpb=e8ee171063d81a02033729273793f5ba2c2b71cc;p=osm%2FPOL.git diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index c83f794..ff8339f 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -37,31 +37,46 @@ log = logging.getLogger(__name__) class MonClient: def __init__(self, config: Config, loop=None): - self.kafka_server = '{}:{}'.format(config.get('message', 'host'), - config.get('message', 'port')) + self.kafka_server = "{}:{}".format( + config.get("message", "host"), config.get("message", "port") + ) if not loop: loop = asyncio.get_event_loop() self.loop = loop - async def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: str, threshold: int, - operation: str, statistic: str = 'AVERAGE'): + async def create_alarm( + self, + metric_name: str, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + threshold: int, + operation: str, + statistic: str = "AVERAGE", + ): cor_id = random.randint(1, 10e7) - msg = self._build_create_alarm_payload(cor_id, - metric_name, - ns_id, - vdu_name, - vnf_member_index, - threshold, - statistic, - operation) + msg = self._build_create_alarm_payload( + cor_id, + metric_name, + ns_id, + vdu_name, + vnf_member_index, + threshold, + statistic, + operation, + ) log.debug("Sending create_alarm_request %s", msg) - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode, + ) await producer.start() try: - await producer.send_and_wait("alarm_request", key="create_alarm_request", value=json.dumps(msg)) + await producer.send_and_wait( + "alarm_request", key="create_alarm_request", value=json.dumps(msg) + ) finally: await producer.stop() log.debug("Waiting for create_alarm_response...") @@ -71,7 +86,8 @@ class MonClient: bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - auto_offset_reset='earliest') + auto_offset_reset="earliest", + ) await consumer.start() alarm_uuid = None try: @@ -81,28 +97,36 @@ class MonClient: except JSONDecodeError: content = yaml.safe_load(message.value) log.debug("Received create_alarm_response %s", content) - if content['alarm_create_response']['correlation_id'] == cor_id: - if not content['alarm_create_response']['status']: + if content["alarm_create_response"]["correlation_id"] == cor_id: + if not content["alarm_create_response"]["status"]: raise ValueError("Error creating alarm in MON") - alarm_uuid = content['alarm_create_response']['alarm_uuid'] + alarm_uuid = content["alarm_create_response"]["alarm_uuid"] break finally: await consumer.stop() if not alarm_uuid: - raise ValueError('No alarm deletion response from MON. Is MON up?') + raise ValueError("No alarm deletion response from MON. Is MON up?") return alarm_uuid - async def delete_alarm(self, ns_id: str, vnf_member_index: str, vdu_name: str, alarm_uuid: str): + async def delete_alarm( + self, ns_id: str, vnf_member_index: str, vdu_name: str, alarm_uuid: str + ): cor_id = random.randint(1, 10e7) - msg = self._build_delete_alarm_payload(cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid) + msg = self._build_delete_alarm_payload( + cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid + ) log.debug("Sending delete_alarm_request %s", msg) - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode, + ) await producer.start() try: - await producer.send_and_wait("alarm_request", key="delete_alarm_request", value=json.dumps(msg)) + await producer.send_and_wait( + "alarm_request", key="delete_alarm_request", value=json.dumps(msg) + ) finally: await producer.stop() log.debug("Waiting for delete_alarm_response...") @@ -112,7 +136,8 @@ class MonClient: bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - auto_offset_reset='earliest') + auto_offset_reset="earliest", + ) await consumer.start() alarm_uuid = None try: @@ -121,58 +146,71 @@ class MonClient: content = json.loads(message.value) except JSONDecodeError: content = yaml.safe_load(message.value) - if content['alarm_delete_response']['correlation_id'] == cor_id: + if content["alarm_delete_response"]["correlation_id"] == cor_id: log.debug("Received delete_alarm_response %s", content) - if not content['alarm_delete_response']['status']: - raise ValueError("Error deleting alarm in MON. Response status is False.") - alarm_uuid = content['alarm_delete_response']['alarm_uuid'] + if not content["alarm_delete_response"]["status"]: + raise ValueError( + "Error deleting alarm in MON. Response status is False." + ) + alarm_uuid = content["alarm_delete_response"]["alarm_uuid"] break finally: await consumer.stop() if not alarm_uuid: - raise ValueError('No alarm deletion response from MON. Is MON up?') + raise ValueError("No alarm deletion response from MON. Is MON up?") return alarm_uuid - def _build_create_alarm_payload(self, cor_id: int, - metric_name: str, - ns_id: str, - vdu_name: str, - vnf_member_index: str, - threshold: int, - statistic: str, - operation: str): + def _build_create_alarm_payload( + self, + cor_id: int, + metric_name: str, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + threshold: int, + statistic: str, + operation: str, + ): alarm_create_request = { - 'correlation_id': cor_id, - 'alarm_name': 'osm_alarm_{}_{}_{}_{}'.format(ns_id, vnf_member_index, vdu_name, metric_name), - 'metric_name': metric_name, - 'operation': operation, - 'severity': 'critical', - 'threshold_value': threshold, - 'statistic': statistic, - 'tags': { - 'ns_id': ns_id, - 'vdu_name': vdu_name, - 'vnf_member_index': vnf_member_index, - } + "correlation_id": cor_id, + "alarm_name": "osm_alarm_{}_{}_{}_{}".format( + ns_id, vnf_member_index, vdu_name, metric_name + ), + "metric_name": metric_name, + "operation": operation, + "severity": "critical", + "threshold_value": threshold, + "statistic": statistic, + "tags": { + "ns_id": ns_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_member_index, + }, } msg = { - 'alarm_create_request': alarm_create_request, + "alarm_create_request": alarm_create_request, } return msg - def _build_delete_alarm_payload(self, cor_id: int, ns_id: str, vdu_name: str, - vnf_member_index: str, alarm_uuid: str): + def _build_delete_alarm_payload( + self, + cor_id: int, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + alarm_uuid: str, + ): alarm_delete_request = { - 'correlation_id': cor_id, - 'alarm_uuid': alarm_uuid, - 'tags': { - 'ns_id': ns_id, - 'vdu_name': vdu_name, - 'vnf_member_index': vnf_member_index - } + "correlation_id": cor_id, + "alarm_uuid": alarm_uuid, + "tags": { + "ns_id": ns_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_member_index, + }, } msg = { - 'alarm_delete_request': alarm_delete_request, + "alarm_delete_request": alarm_delete_request, } return msg