X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=policy_module%2Fosm_policy_module%2Fcommon%2Fmon_client.py;h=0d86046c06fbf4ac8584b718d11438feff85b017;hb=341c33b98b4951c5d617e040cf856d4011a09266;hp=19b440ed95c266283e1e7e8785bec97ed0fd962d;hpb=62781ff00b30790610cf4cc2ef5ed5422c571e10;p=osm%2FMON.git diff --git a/policy_module/osm_policy_module/common/mon_client.py b/policy_module/osm_policy_module/common/mon_client.py index 19b440e..0d86046 100644 --- a/policy_module/osm_policy_module/common/mon_client.py +++ b/policy_module/osm_policy_module/common/mon_client.py @@ -1,3 +1,26 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## import json import logging import random @@ -17,31 +40,31 @@ class MonClient: cfg.get('policy_module', 'kafka_server_port')) self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, key_serializer=str.encode, - value_serializer=lambda v: json.dumps(v).encode('utf-8')) + value_serializer=str.encode) def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation): cor_id = random.randint(1, 1000000) msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation) - self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg) - self.producer.flush() - consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000) + log.info("Sending create_alarm_request %s", msg) + future = self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg)) + future.get(timeout=60) + consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode) consumer.subscribe(['alarm_response']) - alarm_uuid = None for message in consumer: if message.key == 'create_alarm_response': - content = json.load(message.value) + content = json.loads(message.value) + log.info("Received create_alarm_response %s", content) if self._is_alarm_response_correlation_id_eq(cor_id, content): alarm_uuid = content['alarm_create_response']['alarm_uuid'] # TODO Handle error response - break - consumer.close() - if not alarm_uuid: - raise ValueError( - 'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?') - return alarm_uuid + return alarm_uuid + + raise ValueError('Timeout: No alarm creation response from MON. Is MON up?') def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation): - create_alarm_request = { + alarm_create_request = { 'correlation_id': cor_id, 'alarm_name': str(uuid.uuid4()), 'metric_name': metric_name, @@ -52,7 +75,7 @@ class MonClient: 'statistic': statistic } msg = { - 'create_alarm_request': create_alarm_request, + 'alarm_create_request': alarm_create_request, 'vim_uuid': vim_uuid } return msg