19b440ed95c266283e1e7e8785bec97ed0fd962d
6 from kafka
import KafkaProducer
, KafkaConsumer
8 from osm_policy_module
.core
.config
import Config
10 log
= logging
.getLogger(__name__
)
15 cfg
= Config
.instance()
16 self
.kafka_server
= '{}:{}'.format(cfg
.get('policy_module', 'kafka_server_host'),
17 cfg
.get('policy_module', 'kafka_server_port'))
18 self
.producer
= KafkaProducer(bootstrap_servers
=self
.kafka_server
,
19 key_serializer
=str.encode
,
20 value_serializer
=lambda v
: json
.dumps(v
).encode('utf-8'))
22 def create_alarm(self
, metric_name
, resource_uuid
, vim_uuid
, threshold
, statistic
, operation
):
23 cor_id
= random
.randint(1, 1000000)
24 msg
= self
._create
_alarm
_payload
(cor_id
, metric_name
, resource_uuid
, vim_uuid
, threshold
, statistic
, operation
)
25 self
.producer
.send(topic
='alarm_request', key
='create_alarm_request', value
=msg
)
27 consumer
= KafkaConsumer(bootstrap_servers
=self
.kafka_server
, consumer_timeout_ms
=10000)
28 consumer
.subscribe(['alarm_response'])
30 for message
in consumer
:
31 if message
.key
== 'create_alarm_response':
32 content
= json
.load(message
.value
)
33 if self
._is
_alarm
_response
_correlation
_id
_eq
(cor_id
, content
):
34 alarm_uuid
= content
['alarm_create_response']['alarm_uuid']
35 # TODO Handle error response
40 'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?')
43 def _create_alarm_payload(self
, cor_id
, metric_name
, resource_uuid
, vim_uuid
, threshold
, statistic
, operation
):
44 create_alarm_request
= {
45 'correlation_id': cor_id
,
46 'alarm_name': str(uuid
.uuid4()),
47 'metric_name': metric_name
,
48 'resource_uuid': resource_uuid
,
49 'operation': operation
,
50 'severity': 'critical',
51 'threshold_value': threshold
,
52 'statistic': statistic
55 'create_alarm_request': create_alarm_request
,
60 def _is_alarm_response_correlation_id_eq(self
, cor_id
, message_content
):
61 return message_content
['alarm_create_response']['correlation_id'] == cor_id