Adds vdu_id to message bus models
[osm/MON.git] / osm_mon / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9
10 # http://www.apache.org/licenses/LICENSE-2.0
11
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
20 ##
21 """This is a common kafka producer app.
22
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
24 and AWS.
25 """
26
27 import logging
28 import os
29
30 from kafka import KafkaProducer as kaf
31 from kafka.errors import KafkaError
32
33 __author__ = "Prithiv Mohan"
34 __date__ = "06/Sep/2017"
35
36 current_path = os.path.realpath(__file__)
37 json_path = os.path.abspath(os.path.join(current_path, '..', '..', 'models'))
38
39 # TODO(): validate all of the request and response messages against the
40 # json_schemas
41
42
43 class KafkaProducer(object):
44 """A common KafkaProducer for requests and responses."""
45
46 def __init__(self, topic):
47 """Initialize the common kafka producer."""
48 self._topic = topic
49
50 if "BROKER_URI" in os.environ:
51 broker = os.getenv("BROKER_URI")
52 else:
53 broker = "localhost:9092"
54
55 '''
56 If the broker URI is not set in the env by default,
57 localhost container is taken as the host because an instance of
58 is already running.
59 '''
60
61 self.producer = kaf(
62 key_serializer=str.encode,
63 value_serializer=str.encode,
64 bootstrap_servers=broker, api_version=(0, 10))
65
66 def publish(self, key, value, topic=None):
67 """Send the required message on the Kafka message bus."""
68 try:
69 future = self.producer.send(topic=topic, key=key, value=value)
70 self.producer.flush()
71 except Exception:
72 logging.exception("Error publishing to {} topic." .format(topic))
73 raise
74 try:
75 record_metadata = future.get(timeout=10)
76 logging.debug("TOPIC:", record_metadata.topic)
77 logging.debug("PARTITION:", record_metadata.partition)
78 logging.debug("OFFSET:", record_metadata.offset)
79 except KafkaError:
80 pass
81
82 def create_alarm_request(self, key, message):
83 """Create alarm request from SO to MON."""
84 # External to MON
85
86 self.publish(key,
87 value=message,
88 topic='alarm_request')
89
90 def create_alarm_response(self, key, message):
91 """Response to a create alarm request from MON to SO."""
92 # Internal to MON
93
94 self.publish(key,
95 value=message,
96 topic='alarm_response')
97
98 def acknowledge_alarm(self, key, message):
99 """Alarm acknowledgement request from SO to MON."""
100 # Internal to MON
101
102 self.publish(key,
103 value=message,
104 topic='alarm_request')
105
106 def list_alarm_request(self, key, message):
107 """List alarms request from SO to MON."""
108 # External to MON
109
110 self.publish(key,
111 value=message,
112 topic='alarm_request')
113
114 def notify_alarm(self, key, message):
115 """Notify of triggered alarm from MON to SO."""
116
117 self.publish(key,
118 value=message,
119 topic='alarm_response')
120
121 def list_alarm_response(self, key, message):
122 """Response for list alarms request from MON to SO."""
123
124 self.publish(key,
125 value=message,
126 topic='alarm_response')
127
128 def update_alarm_request(self, key, message):
129 """Update alarm request from SO to MON."""
130 # External to Mon
131
132 self.publish(key,
133 value=message,
134 topic='alarm_request')
135
136 def update_alarm_response(self, key, message):
137 """Response from update alarm request from MON to SO."""
138 # Internal to Mon
139
140 self.publish(key,
141 value=message,
142 topic='alarm_response')
143
144 def delete_alarm_request(self, key, message):
145 """Delete alarm request from SO to MON."""
146 # External to Mon
147
148 self.publish(key,
149 value=message,
150 topic='alarm_request')
151
152 def delete_alarm_response(self, key, message):
153 """Response for a delete alarm request from MON to SO."""
154 # Internal to Mon
155
156 self.publish(key,
157 value=message,
158 topic='alarm_response')
159
160 def create_metrics_request(self, key, message):
161 """Create metrics request from SO to MON."""
162 # External to Mon
163
164 self.publish(key,
165 value=message,
166 topic='metric_request')
167
168 def create_metrics_resp(self, key, message):
169 """Response for a create metric request from MON to SO."""
170 # Internal to Mon
171
172 self.publish(key,
173 value=message,
174 topic='metric_response')
175
176 def read_metric_data_request(self, key, message):
177 """Read metric data request from SO to MON."""
178 # External to Mon
179
180 self.publish(key,
181 value=message,
182 topic='metric_request')
183
184 def read_metric_data_response(self, key, message):
185 """Response from MON to SO for read metric data request."""
186 # Internal to Mon
187
188 self.publish(key,
189 value=message,
190 topic='metric_response')
191
192 def list_metric_request(self, key, message):
193 """List metric request from SO to MON."""
194 # External to MON
195
196 self.publish(key,
197 value=message,
198 topic='metric_request')
199
200 def list_metric_response(self, key, message):
201 """Response from SO to MON for list metrics request."""
202 # Internal to MON
203
204 self.publish(key,
205 value=message,
206 topic='metric_response')
207
208 def delete_metric_request(self, key, message):
209 """Delete metric request from SO to MON."""
210 # External to Mon
211
212 self.publish(key,
213 value=message,
214 topic='metric_request')
215
216 def delete_metric_response(self, key, message):
217 """Response from MON to SO for delete metric request."""
218 # Internal to Mon
219
220 self.publish(key,
221 value=message,
222 topic='metric_response')
223
224 def update_metric_request(self, key, message):
225 """Metric update request from SO to MON."""
226 # External to Mon
227
228 self.publish(key,
229 value=message,
230 topic='metric_request')
231
232 def update_metric_response(self, key, message):
233 """Reponse from MON to SO for metric update."""
234 # Internal to Mon
235
236 self.publish(key,
237 value=message,
238 topic='metric_response')
239
240 def access_credentials(self, key, message):
241 """Send access credentials to MON from SO."""
242
243 self.publish(key,
244 value=message,
245 topic='access_credentials')