630902559d10f691b5101c9f789cc0ef33ab14be
1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
29 from kafka
import KafkaProducer
, KafkaConsumer
31 from osm_policy_module
.core
.config
import Config
33 log
= logging
.getLogger(__name__
)
38 cfg
= Config
.instance()
39 self
.kafka_server
= '{}:{}'.format(cfg
.OSMPOL_MESSAGE_HOST
,
40 cfg
.OSMPOL_MESSAGE_PORT
)
41 self
.producer
= KafkaProducer(bootstrap_servers
=self
.kafka_server
,
42 key_serializer
=str.encode
,
43 value_serializer
=str.encode
)
45 def create_alarm(self
, metric_name
: str, ns_id
: str, vdu_name
: str, vnf_member_index
: int, threshold
: int,
46 statistic
: str, operation
: str):
47 cor_id
= random
.randint(1, 1000000)
48 msg
= self
._build
_create
_alarm
_payload
(cor_id
, metric_name
, ns_id
, vdu_name
, vnf_member_index
, threshold
,
51 log
.info("Sending create_alarm_request %s", msg
)
52 self
.producer
.send(topic
='alarm_request', key
='create_alarm_request', value
=json
.dumps(msg
))
54 consumer
= KafkaConsumer(bootstrap_servers
=self
.kafka_server
,
55 key_deserializer
=bytes
.decode
,
56 value_deserializer
=bytes
.decode
,
57 consumer_timeout_ms
=10000,
58 group_id
='mon-client-' + str(uuid
.uuid4()))
59 consumer
.subscribe(['alarm_response'])
60 for message
in consumer
:
61 if message
.key
== 'create_alarm_response':
62 content
= json
.loads(message
.value
)
63 log
.info("Received create_alarm_response %s", content
)
64 if self
._is
_alarm
_response
_correlation
_id
_eq
(cor_id
, content
):
65 if not content
['alarm_create_response']['status']:
66 raise ValueError("Error creating alarm in MON")
67 alarm_uuid
= content
['alarm_create_response']['alarm_uuid']
70 raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
72 def delete_alarm(self
, ns_id
: str, vnf_member_index
: int, vdu_name
: str, alarm_uuid
: str):
73 cor_id
= random
.randint(1, 1000000)
74 msg
= self
._build
_delete
_alarm
_payload
(cor_id
, ns_id
, vdu_name
, vnf_member_index
, alarm_uuid
)
75 log
.info("Sending delete_alarm_request %s", msg
)
76 self
.producer
.send(topic
='alarm_request', key
='delete_alarm_request', value
=json
.dumps(msg
))
78 consumer
= KafkaConsumer(bootstrap_servers
=self
.kafka_server
,
79 key_deserializer
=bytes
.decode
,
80 value_deserializer
=bytes
.decode
,
81 consumer_timeout_ms
=10000)
82 consumer
.subscribe(['alarm_response'])
83 for message
in consumer
:
84 if message
.key
== 'delete_alarm_response':
85 content
= json
.loads(message
.value
)
86 log
.info("Received delete_alarm_response %s", content
)
87 if self
._is
_alarm
_response
_correlation
_id
_eq
(cor_id
, content
):
88 if not content
['alarm_delete_response']['status']:
89 raise ValueError("Error deleting alarm in MON")
90 alarm_uuid
= content
['alarm_delete_response']['alarm_uuid']
93 raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
95 def _build_create_alarm_payload(self
, cor_id
: int, metric_name
: str, ns_id
: str, vdu_name
: str,
96 vnf_member_index
: int,
97 threshold
: int, statistic
: str, operation
: str):
98 alarm_create_request
= {
99 'correlation_id': cor_id
,
100 'alarm_name': str(uuid
.uuid4()),
101 'metric_name': metric_name
,
103 'vdu_name': vdu_name
,
104 'vnf_member_index': vnf_member_index
,
105 'operation': operation
,
106 'severity': 'critical',
107 'threshold_value': threshold
,
108 'statistic': statistic
111 'alarm_create_request': alarm_create_request
,
115 def _build_delete_alarm_payload(self
, cor_id
: int, ns_id
: str, vdu_name
: str,
116 vnf_member_index
: int, alarm_uuid
: str):
117 alarm_delete_request
= {
118 'correlation_id': cor_id
,
119 'alarm_uuid': alarm_uuid
,
121 'vdu_name': vdu_name
,
122 'vnf_member_index': vnf_member_index
125 'alarm_delete_request': alarm_delete_request
,
129 def _is_alarm_response_correlation_id_eq(self
, cor_id
, message_content
):
130 return message_content
['alarm_create_response']['correlation_id'] == cor_id