f04ecf820da89584582814bdbe3ad4e2c21f6a52
[osm/MON.git] / osm_mon / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9
10 # http://www.apache.org/licenses/LICENSE-2.0
11
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
20 ##
21 """This is a common kafka producer app.
22
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
24 and AWS.
25 """
26
27 import logging
28 import os
29
30 from kafka import KafkaProducer as kaf
31 from kafka.errors import KafkaError
32
33 __author__ = "Prithiv Mohan"
34 __date__ = "06/Sep/2017"
35
36 current_path = os.path.realpath(__file__)
37 json_path = os.path.abspath(os.path.join(current_path, '..', '..', 'models'))
38
39 # TODO(): validate all of the request and response messages against the
40 # json_schemas
41
42
43 class KafkaProducer(object):
44 """A common KafkaProducer for requests and responses."""
45
46 def __init__(self, topic):
47 """Initialize the common kafka producer."""
48 self._topic = topic
49
50 if "BROKER_URI" in os.environ:
51 broker = os.getenv("BROKER_URI")
52 else:
53 broker = "localhost:9092"
54
55 '''
56 If the broker URI is not set in the env by default,
57 localhost container is taken as the host because an instance of
58 is already running.
59 '''
60
61 self.producer = kaf(
62 key_serializer=str.encode,
63 value_serializer=str.encode,
64 bootstrap_servers=broker, api_version=(0, 10))
65
66 def publish(self, key, value, topic=None):
67 """Send the required message on the Kafka message bus."""
68 try:
69 future = self.producer.send(topic=topic, key=key, value=value)
70 record_metadata = future.get(timeout=10)
71 except Exception:
72 logging.exception("Error publishing to {} topic." .format(topic))
73 raise
74 try:
75 logging.debug("TOPIC:", record_metadata.topic)
76 logging.debug("PARTITION:", record_metadata.partition)
77 logging.debug("OFFSET:", record_metadata.offset)
78 except KafkaError:
79 pass
80
81 def publish_alarm_request(self, key, message):
82 """Publish an alarm request."""
83 # External to MON
84
85 self.publish(key,
86 value=message,
87 topic='alarm_request')
88
89 def publish_alarm_response(self, key, message):
90 """Publish an alarm response."""
91 # Internal to MON
92
93 self.publish(key,
94 value=message,
95 topic='alarm_response')
96
97 def publish_metrics_request(self, key, message):
98 """Create metrics request from SO to MON."""
99 # External to Mon
100
101 self.publish(key,
102 value=message,
103 topic='metric_request')
104
105 def publish_metrics_response(self, key, message):
106 """Response for a create metric request from MON to SO."""
107 # Internal to Mon
108
109 self.publish(key,
110 value=message,
111 topic='metric_response')
112
113 def read_metric_data_request(self, key, message):
114 """Read metric data request from SO to MON."""
115 # External to Mon
116
117 self.publish(key,
118 value=message,
119 topic='metric_request')
120
121 def read_metric_data_response(self, key, message):
122 """Response from MON to SO for read metric data request."""
123 # Internal to Mon
124
125 self.publish(key,
126 value=message,
127 topic='metric_response')
128
129 def list_metric_request(self, key, message):
130 """List metric request from SO to MON."""
131 # External to MON
132
133 self.publish(key,
134 value=message,
135 topic='metric_request')
136
137 def list_metric_response(self, key, message):
138 """Response from SO to MON for list metrics request."""
139 # Internal to MON
140
141 self.publish(key,
142 value=message,
143 topic='metric_response')
144
145 def delete_metric_request(self, key, message):
146 """Delete metric request from SO to MON."""
147 # External to Mon
148
149 self.publish(key,
150 value=message,
151 topic='metric_request')
152
153 def delete_metric_response(self, key, message):
154 """Response from MON to SO for delete metric request."""
155 # Internal to Mon
156
157 self.publish(key,
158 value=message,
159 topic='metric_response')
160
161 def update_metric_request(self, key, message):
162 """Metric update request from SO to MON."""
163 # External to Mon
164
165 self.publish(key,
166 value=message,
167 topic='metric_request')
168
169 def update_metric_response(self, key, message):
170 """Reponse from MON to SO for metric update."""
171 # Internal to Mon
172
173 self.publish(key,
174 value=message,
175 topic='metric_response')
176
177 def access_credentials(self, key, message):
178 """Send access credentials to MON from SO."""
179
180 self.publish(key,
181 value=message,
182 topic='access_credentials')