f6feba16352ffd22f2df43301a39ad20cb50d106
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 """This is a common kafka producer app.
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
30 from kafka
import KafkaProducer
as kaf
31 from kafka
.errors
import KafkaError
33 __author__
= "Prithiv Mohan"
34 __date__
= "06/Sep/2017"
36 current_path
= os
.path
.realpath(__file__
)
37 json_path
= os
.path
.abspath(os
.path
.join(current_path
, '..', '..', 'models'))
39 # TODO(): validate all of the request and response messages against the
43 class KafkaProducer(object):
44 """A common KafkaProducer for requests and responses."""
46 def __init__(self
, topic
):
47 """Initialize the common kafka producer."""
50 if "BROKER_URI" in os
.environ
:
51 broker
= os
.getenv("BROKER_URI")
53 broker
= "localhost:9092"
56 If the broker URI is not set in the env by default,
57 localhost container is taken as the host because an instance of
62 key_serializer
=str.encode
,
63 value_serializer
=str.encode
,
64 bootstrap_servers
=broker
, api_version
=(0, 10, 1))
66 def publish(self
, key
, value
, topic
=None):
67 """Send the required message on the Kafka message bus."""
69 future
= self
.producer
.send(topic
=topic
, key
=key
, value
=value
)
70 future
.get(timeout
=10)
72 logging
.exception("Error publishing to {} topic." .format(topic
))
75 def publish_alarm_request(self
, key
, message
):
76 """Publish an alarm request."""
81 topic
='alarm_request')
83 def publish_alarm_response(self
, key
, message
):
84 """Publish an alarm response."""
89 topic
='alarm_response')
91 def publish_metrics_request(self
, key
, message
):
92 """Create metrics request from SO to MON."""
97 topic
='metric_request')
99 def publish_metrics_response(self
, key
, message
):
100 """Response for a create metric request from MON to SO."""
105 topic
='metric_response')
107 def read_metric_data_request(self
, key
, message
):
108 """Read metric data request from SO to MON."""
113 topic
='metric_request')
115 def read_metric_data_response(self
, key
, message
):
116 """Response from MON to SO for read metric data request."""
121 topic
='metric_response')
123 def list_metric_request(self
, key
, message
):
124 """List metric request from SO to MON."""
129 topic
='metric_request')
131 def list_metric_response(self
, key
, message
):
132 """Response from SO to MON for list metrics request."""
137 topic
='metric_response')
139 def delete_metric_request(self
, key
, message
):
140 """Delete metric request from SO to MON."""
145 topic
='metric_request')
147 def delete_metric_response(self
, key
, message
):
148 """Response from MON to SO for delete metric request."""
153 topic
='metric_response')
155 def update_metric_request(self
, key
, message
):
156 """Metric update request from SO to MON."""
161 topic
='metric_request')
163 def update_metric_response(self
, key
, message
):
164 """Reponse from MON to SO for metric update."""
169 topic
='metric_response')
171 def access_credentials(self
, key
, message
):
172 """Send access credentials to MON from SO."""
176 topic
='access_credentials')