19b440ed95c266283e1e7e8785bec97ed0fd962d
[osm/MON.git] / policy_module / osm_policy_module / common / mon_client.py
1 import json
2 import logging
3 import random
4 import uuid
5
6 from kafka import KafkaProducer, KafkaConsumer
7
8 from osm_policy_module.core.config import Config
9
10 log = logging.getLogger(__name__)
11
12
13 class MonClient:
14 def __init__(self):
15 cfg = Config.instance()
16 self.kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
17 cfg.get('policy_module', 'kafka_server_port'))
18 self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
19 key_serializer=str.encode,
20 value_serializer=lambda v: json.dumps(v).encode('utf-8'))
21
22 def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
23 cor_id = random.randint(1, 1000000)
24 msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation)
25 self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
26 self.producer.flush()
27 consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000)
28 consumer.subscribe(['alarm_response'])
29 alarm_uuid = None
30 for message in consumer:
31 if message.key == 'create_alarm_response':
32 content = json.load(message.value)
33 if self._is_alarm_response_correlation_id_eq(cor_id, content):
34 alarm_uuid = content['alarm_create_response']['alarm_uuid']
35 # TODO Handle error response
36 break
37 consumer.close()
38 if not alarm_uuid:
39 raise ValueError(
40 'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?')
41 return alarm_uuid
42
43 def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
44 create_alarm_request = {
45 'correlation_id': cor_id,
46 'alarm_name': str(uuid.uuid4()),
47 'metric_name': metric_name,
48 'resource_uuid': resource_uuid,
49 'operation': operation,
50 'severity': 'critical',
51 'threshold_value': threshold,
52 'statistic': statistic
53 }
54 msg = {
55 'create_alarm_request': create_alarm_request,
56 'vim_uuid': vim_uuid
57 }
58 return msg
59
60 def _is_alarm_response_correlation_id_eq(self, cor_id, message_content):
61 return message_content['alarm_create_response']['correlation_id'] == cor_id