f6feba16352ffd22f2df43301a39ad20cb50d106
[osm/MON.git] / osm_mon / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9
10 # http://www.apache.org/licenses/LICENSE-2.0
11
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
20 ##
21 """This is a common kafka producer app.
22
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
24 and AWS.
25 """
26
27 import logging
28 import os
29
30 from kafka import KafkaProducer as kaf
31 from kafka.errors import KafkaError
32
33 __author__ = "Prithiv Mohan"
34 __date__ = "06/Sep/2017"
35
36 current_path = os.path.realpath(__file__)
37 json_path = os.path.abspath(os.path.join(current_path, '..', '..', 'models'))
38
39 # TODO(): validate all of the request and response messages against the
40 # json_schemas
41
42
43 class KafkaProducer(object):
44 """A common KafkaProducer for requests and responses."""
45
46 def __init__(self, topic):
47 """Initialize the common kafka producer."""
48 self._topic = topic
49
50 if "BROKER_URI" in os.environ:
51 broker = os.getenv("BROKER_URI")
52 else:
53 broker = "localhost:9092"
54
55 '''
56 If the broker URI is not set in the env by default,
57 localhost container is taken as the host because an instance of
58 is already running.
59 '''
60
61 self.producer = kaf(
62 key_serializer=str.encode,
63 value_serializer=str.encode,
64 bootstrap_servers=broker, api_version=(0, 10, 1))
65
66 def publish(self, key, value, topic=None):
67 """Send the required message on the Kafka message bus."""
68 try:
69 future = self.producer.send(topic=topic, key=key, value=value)
70 future.get(timeout=10)
71 except Exception:
72 logging.exception("Error publishing to {} topic." .format(topic))
73 raise
74
75 def publish_alarm_request(self, key, message):
76 """Publish an alarm request."""
77 # External to MON
78
79 self.publish(key,
80 value=message,
81 topic='alarm_request')
82
83 def publish_alarm_response(self, key, message):
84 """Publish an alarm response."""
85 # Internal to MON
86
87 self.publish(key,
88 value=message,
89 topic='alarm_response')
90
91 def publish_metrics_request(self, key, message):
92 """Create metrics request from SO to MON."""
93 # External to Mon
94
95 self.publish(key,
96 value=message,
97 topic='metric_request')
98
99 def publish_metrics_response(self, key, message):
100 """Response for a create metric request from MON to SO."""
101 # Internal to Mon
102
103 self.publish(key,
104 value=message,
105 topic='metric_response')
106
107 def read_metric_data_request(self, key, message):
108 """Read metric data request from SO to MON."""
109 # External to Mon
110
111 self.publish(key,
112 value=message,
113 topic='metric_request')
114
115 def read_metric_data_response(self, key, message):
116 """Response from MON to SO for read metric data request."""
117 # Internal to Mon
118
119 self.publish(key,
120 value=message,
121 topic='metric_response')
122
123 def list_metric_request(self, key, message):
124 """List metric request from SO to MON."""
125 # External to MON
126
127 self.publish(key,
128 value=message,
129 topic='metric_request')
130
131 def list_metric_response(self, key, message):
132 """Response from SO to MON for list metrics request."""
133 # Internal to MON
134
135 self.publish(key,
136 value=message,
137 topic='metric_response')
138
139 def delete_metric_request(self, key, message):
140 """Delete metric request from SO to MON."""
141 # External to Mon
142
143 self.publish(key,
144 value=message,
145 topic='metric_request')
146
147 def delete_metric_response(self, key, message):
148 """Response from MON to SO for delete metric request."""
149 # Internal to Mon
150
151 self.publish(key,
152 value=message,
153 topic='metric_response')
154
155 def update_metric_request(self, key, message):
156 """Metric update request from SO to MON."""
157 # External to Mon
158
159 self.publish(key,
160 value=message,
161 topic='metric_request')
162
163 def update_metric_response(self, key, message):
164 """Reponse from MON to SO for metric update."""
165 # Internal to Mon
166
167 self.publish(key,
168 value=message,
169 topic='metric_response')
170
171 def access_credentials(self, key, message):
172 """Send access credentials to MON from SO."""
173
174 self.publish(key,
175 value=message,
176 topic='access_credentials')