f03b3c31f8d17c4bbe0db46a3988b656f83d68d2
[osm/MON.git] / policy_module / osm_policy_module / common / mon_client.py
1 import json
2 import logging
3 import random
4 import uuid
5
6 from kafka import KafkaProducer, KafkaConsumer
7
8 from osm_policy_module.core.config import Config
9
10 log = logging.getLogger(__name__)
11
12
13 class MonClient:
14 def __init__(self):
15 cfg = Config.instance()
16 self.kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
17 cfg.get('policy_module', 'kafka_server_port'))
18 self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
19 key_serializer=str.encode,
20 value_serializer=str.encode)
21
22 def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
23 cor_id = random.randint(1, 1000000)
24 msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation)
25 log.info("Sending create_alarm_request %s", msg)
26 future = self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg))
27 future.get(timeout=60)
28 consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
29 key_deserializer=bytes.decode,
30 value_deserializer=bytes.decode)
31 consumer.subscribe(['alarm_response'])
32 for message in consumer:
33 if message.key == 'create_alarm_response':
34 content = json.loads(message.value)
35 log.info("Received create_alarm_response %s", content)
36 if self._is_alarm_response_correlation_id_eq(cor_id, content):
37 alarm_uuid = content['alarm_create_response']['alarm_uuid']
38 # TODO Handle error response
39 return alarm_uuid
40
41 raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
42
43 def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
44 alarm_create_request = {
45 'correlation_id': cor_id,
46 'alarm_name': str(uuid.uuid4()),
47 'metric_name': metric_name,
48 'resource_uuid': resource_uuid,
49 'operation': operation,
50 'severity': 'critical',
51 'threshold_value': threshold,
52 'statistic': statistic
53 }
54 msg = {
55 'alarm_create_request': alarm_create_request,
56 'vim_uuid': vim_uuid
57 }
58 return msg
59
60 def _is_alarm_response_correlation_id_eq(self, cor_id, message_content):
61 return message_content['alarm_create_response']['correlation_id'] == cor_id