1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 """This is a common kafka producer app.
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
30 from kafka
import KafkaProducer
as kaf
31 from kafka
.errors
import KafkaError
33 __author__
= "Prithiv Mohan"
34 __date__
= "06/Sep/2017"
36 current_path
= os
.path
.realpath(__file__
)
37 json_path
= os
.path
.abspath(os
.path
.join(current_path
, '..', '..', 'models'))
39 # TODO(): validate all of the request and response messages against the
43 class KafkaProducer(object):
44 """A common KafkaProducer for requests and responses."""
46 def __init__(self
, topic
):
47 """Initialize the common kafka producer."""
50 if "BROKER_URI" in os
.environ
:
51 broker
= os
.getenv("BROKER_URI")
53 broker
= "localhost:9092"
56 If the broker URI is not set in the env by default,
57 localhost container is taken as the host because an instance of
62 key_serializer
=str.encode
,
63 value_serializer
=str.encode
,
64 bootstrap_servers
=broker
, api_version
=(0, 10))
66 def publish(self
, key
, value
, topic
=None):
67 """Send the required message on the Kafka message bus."""
69 future
= self
.producer
.send(topic
=topic
, key
=key
, value
=value
)
70 record_metadata
= future
.get(timeout
=10)
72 logging
.exception("Error publishing to {} topic." .format(topic
))
75 logging
.debug("TOPIC:", record_metadata
.topic
)
76 logging
.debug("PARTITION:", record_metadata
.partition
)
77 logging
.debug("OFFSET:", record_metadata
.offset
)
81 def publish_alarm_request(self
, key
, message
):
82 """Publish an alarm request."""
87 topic
='alarm_request')
89 def publish_alarm_response(self
, key
, message
):
90 """Publish an alarm response."""
95 topic
='alarm_response')
97 def publish_metrics_request(self
, key
, message
):
98 """Create metrics request from SO to MON."""
103 topic
='metric_request')
105 def publish_metrics_response(self
, key
, message
):
106 """Response for a create metric request from MON to SO."""
111 topic
='metric_response')
113 def read_metric_data_request(self
, key
, message
):
114 """Read metric data request from SO to MON."""
119 topic
='metric_request')
121 def read_metric_data_response(self
, key
, message
):
122 """Response from MON to SO for read metric data request."""
127 topic
='metric_response')
129 def list_metric_request(self
, key
, message
):
130 """List metric request from SO to MON."""
135 topic
='metric_request')
137 def list_metric_response(self
, key
, message
):
138 """Response from SO to MON for list metrics request."""
143 topic
='metric_response')
145 def delete_metric_request(self
, key
, message
):
146 """Delete metric request from SO to MON."""
151 topic
='metric_request')
153 def delete_metric_response(self
, key
, message
):
154 """Response from MON to SO for delete metric request."""
159 topic
='metric_response')
161 def update_metric_request(self
, key
, message
):
162 """Metric update request from SO to MON."""
167 topic
='metric_request')
169 def update_metric_response(self
, key
, message
):
170 """Reponse from MON to SO for metric update."""
175 topic
='metric_response')
177 def access_credentials(self
, key
, message
):
178 """Send access credentials to MON from SO."""
182 topic
='access_credentials')