bf0839cb643ba55ed7e5f6c86cadbddce2765a98
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 """This is a common kafka producer app.
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
30 from kafka
import KafkaProducer
as kaf
31 from kafka
.errors
import KafkaError
33 __author__
= "Prithiv Mohan"
34 __date__
= "06/Sep/2017"
36 current_path
= os
.path
.realpath(__file__
)
37 json_path
= os
.path
.abspath(os
.path
.join(current_path
, '..', '..', 'models'))
39 # TODO(): validate all of the request and response messages against the
43 class KafkaProducer(object):
44 """A common KafkaProducer for requests and responses."""
46 def __init__(self
, topic
):
47 """Initialize the common kafka producer."""
50 if "BROKER_URI" in os
.environ
:
51 broker
= os
.getenv("BROKER_URI")
53 broker
= "localhost:9092"
56 If the broker URI is not set in the env by default,
57 localhost container is taken as the host because an instance of
62 key_serializer
=str.encode
,
63 value_serializer
=str.encode
,
64 bootstrap_servers
=broker
, api_version
=(0, 10))
66 def publish(self
, key
, value
, topic
=None):
67 """Send the required message on the Kafka message bus."""
69 future
= self
.producer
.send(topic
=topic
, key
=key
, value
=value
)
72 logging
.exception("Error publishing to {} topic." .format(topic
))
75 record_metadata
= future
.get(timeout
=10)
76 logging
.debug("TOPIC:", record_metadata
.topic
)
77 logging
.debug("PARTITION:", record_metadata
.partition
)
78 logging
.debug("OFFSET:", record_metadata
.offset
)
82 def create_alarm_request(self
, key
, message
):
83 """Create alarm request from SO to MON."""
88 topic
='alarm_request')
90 def create_alarm_response(self
, key
, message
):
91 """Response to a create alarm request from MON to SO."""
96 topic
='alarm_response')
98 def acknowledge_alarm(self
, key
, message
):
99 """Alarm acknowledgement request from SO to MON."""
104 topic
='alarm_request')
106 def list_alarm_request(self
, key
, message
):
107 """List alarms request from SO to MON."""
112 topic
='alarm_request')
114 def notify_alarm(self
, key
, message
):
115 """Notify of triggered alarm from MON to SO."""
119 topic
='alarm_response')
121 def list_alarm_response(self
, key
, message
):
122 """Response for list alarms request from MON to SO."""
126 topic
='alarm_response')
128 def update_alarm_request(self
, key
, message
):
129 """Update alarm request from SO to MON."""
134 topic
='alarm_request')
136 def update_alarm_response(self
, key
, message
):
137 """Response from update alarm request from MON to SO."""
142 topic
='alarm_response')
144 def delete_alarm_request(self
, key
, message
):
145 """Delete alarm request from SO to MON."""
150 topic
='alarm_request')
152 def delete_alarm_response(self
, key
, message
):
153 """Response for a delete alarm request from MON to SO."""
158 topic
='alarm_response')
160 def create_metrics_request(self
, key
, message
):
161 """Create metrics request from SO to MON."""
166 topic
='metric_request')
168 def create_metrics_resp(self
, key
, message
):
169 """Response for a create metric request from MON to SO."""
174 topic
='metric_response')
176 def read_metric_data_request(self
, key
, message
):
177 """Read metric data request from SO to MON."""
182 topic
='metric_request')
184 def read_metric_data_response(self
, key
, message
):
185 """Response from MON to SO for read metric data request."""
190 topic
='metric_response')
192 def list_metric_request(self
, key
, message
):
193 """List metric request from SO to MON."""
198 topic
='metric_request')
200 def list_metric_response(self
, key
, message
):
201 """Response from SO to MON for list metrics request."""
206 topic
='metric_response')
208 def delete_metric_request(self
, key
, message
):
209 """Delete metric request from SO to MON."""
214 topic
='metric_request')
216 def delete_metric_response(self
, key
, message
):
217 """Response from MON to SO for delete metric request."""
222 topic
='metric_response')
224 def update_metric_request(self
, key
, message
):
225 """Metric update request from SO to MON."""
230 topic
='metric_request')
232 def update_metric_response(self
, key
, message
):
233 """Reponse from MON to SO for metric update."""
238 topic
='metric_response')
240 def access_credentials(self
, key
, message
):
241 """Send access credentials to MON from SO."""
245 topic
='access_credentials')