Added vROPs consumer, receiver that will consume messages & act on it. Aligned with...
[osm/MON.git] / core / message-bus / northbound_producer.py
1 from kafka import KafkaProducer
2 from kafka.errors import KafkaError
3 import logging
4 import json
5 import os
6
7
8 class KafkaProducer(object):
9
10 def __init__(self, topic, message):
11
12 self._topic= topic
13 self._message = message
14
15 if "ZOOKEEPER_URI" in os.environ:
16 broker = os.getenv("ZOOKEEPER_URI")
17 else:
18 broker = "SO-ub.lxd:2181"
19
20 '''
21 If the zookeeper broker URI is not set in the env, by default,
22 SO-ub.lxd container is taken as the host because an instance of
23 is already running.
24 '''
25
26 producer = KafkaProducer(key_serializer=str.encode,
27 value_serializer=lambda v: json.dumps(v).encode('ascii'),
28 bootstrap_servers=broker, api_version=(0,10))
29
30
31 def publish(self, key, message, topic=None):
32 try:
33 future = producer.send('alarms', key, payload)
34 producer.flush()
35 except Exception:
36 log.exception("Error publishing to {} topic." .format(topic))
37 raise
38 try:
39 record_metadata = future.get(timeout=10)
40 self._log.debug("TOPIC:", record_metadata.topic)
41 self._log.debug("PARTITION:", record_metadata.partition)
42 self._log.debug("OFFSET:", record_metadata.offset)
43 except KafkaError:
44 pass
45
46 def configure_alarm(self, key, message, topic):
47
48 payload_configure = {
49 "alarm_configuration":
50 {
51 "schema_version": 1.0,
52 "schema_type": "configure_alarm",
53 "alarm_configuration":
54 {
55 "metric_name": { "type": "string" },
56 "tenant_uuid": { "type": "string" },
57 "resource_uuid": { "type": "string" },
58 "description": { "type": "string" },
59 "severity": { "type": "string" },
60 "operation": { "type": "string" },
61 "threshold_value": { "type": "integer" },
62 "unit": { "type": "string" },
63 "statistic": { "type": "string" }
64 },
65 "required": [ "schema_version",
66 "schema_type",
67 "metric_name",
68 "resource_uuid",
69 "severity",
70 "operation",
71 "threshold_value",
72 "unit",
73 "statistic" ]
74 }
75 }
76
77 publish(key, value=json.dumps(payload_configure), topic='alarms')
78
79 def notify_alarm(self, key, message, topic):
80
81 payload_notify = {
82 "notify_alarm":
83 {
84 "schema_version": 1.0,
85 "schema_type": "notify_alarm",
86 "notify_details":
87 {
88 "alarm_uuid": { "type": "string" },
89 "resource_uuid": { "type": "string" },
90 "description": { "type": "string" },
91 "tenant_uuid": { "type": "string" },
92 "severity": { "type" : ["integer", "string"] },
93 "status": { "type": "string" },
94 "start_date": { "type": "date-time" },
95 "update_date": { "type": "date-time" },
96 "cancel_date": { "type": "date-time" }
97 },
98 "required": [ "schema_version",
99 "schema_type",
100 "alarm_uuid",
101 "resource_uuid",
102 "tenant_uuid",
103 "severity",
104 "status",
105 "start_date" ]
106 }
107 }
108
109 publish(key, value=json.dumps(payload_notify), topic='alarms')
110
111 def alarm_ack(self, key, message, topic):
112
113 payload_ack = {
114 "alarm_ack":
115 {
116 "schema_version": 1.0,
117 "schema_type": "alarm_ack",
118 "ack_details":
119 {
120 "alarm_uuid": { "type": "string" },
121 "tenant_uuid": { "type": "string" },
122 "resource_uuid": { "type": "string" }
123 },
124 "required": [ "schema_version",
125 "schema_type",
126 "alarm_uuid",
127 "tenant_uuid",
128 "resource_uuid" ]
129 }
130 }
131
132 publish(key, value.json.dumps(payload_ack), topic='alarms')
133
134 def configure_metrics(self, key, message, topic):
135
136 payload_configure_metrics = {
137 "configure_metrics":
138 {
139 "schema_version": 1.0,
140 "schema_type": "configure_metrics",
141 "tenant_uuid": { "type": "string" },
142 "metrics_configuration":
143 {
144 "metric_name": { "type": "string" },
145 "metric_unit": { "type": "string" },
146 "resource_uuid": { "type": "string" }
147 },
148 "required": [ "schema_version",
149 "schema_type",
150 "metric_name",
151 "metric_unit",
152 "resource_uuid" ]
153 }
154 }
155
156 publish(key, value.json.dumps(payload_configure_metrics), topic='metrics')
157
158 def metric_data_request(self, key, message, topic):
159
160 payload_metric_data_request = {
161 "metric_data_request":
162 {
163 "schema_version": 1.0,
164 "schema_type": "metric_data_request",
165 "metric_name": { "type": "string" },
166 "resource_uuid": { "type": "string" },
167 "tenant_uuid": { "type": "string" },
168 "collection_period": { "type": "string" }
169 },
170 "required": ["schema_version",
171 "schema_type",
172 "tenant_uuid",
173 "metric_name",
174 "collection_period",
175 "resource_uuid"]
176 }
177
178 publish(key, value.json.dumps(payload_metric_data_request), topic='metrics')
179
180 def metric_data_response(self, key, message, topic):
181
182 payload_metric_data_response = {
183 "metric_data_response":
184 {
185 "schema_version": 1.0,
186 "schema_type": "metric_data_response",
187 "metrics_name": { "type": "string" },
188 "resource_uuid": { "type": "string" },
189 "tenant_uuid": { "type": "string" },
190 "metrics_data":
191 {
192 "time_series": { "type": "array" },
193 "metrics_series": { "type": "array" }
194 }
195 },
196 "required": [ "schema_version",
197 "schema_type",
198 "metrics_name",
199 "resource_uuid",
200 "tenant_uuid",
201 "time_series",
202 "metrics_series" ]
203 }
204
205 publish(key, value.json.dumps(payload_metric_data_response), topic='metrics')
206
207 def access_credentials(self, key, message, topic):
208
209 payload_access_credentials = {
210 "access_credentials":
211 {
212 "schema_version": 1.0,
213 "schema_type": "vim_access_credentials",
214 "vim_type": { "type": "string" },
215 "required": [ "schema_version",
216 "schema_type",
217 "vim_type" ],
218 "access_config":
219 {
220 "if":
221 {
222 "vim_type": "openstack"
223 },
224 "then":
225 {
226 "openstack-site": { "type": "string" },
227 "user": { "type": "string" },
228 "password": { "type": "string",
229 "options": { "hidden": true }},
230 "vim_tenant_name": { "type": "string" }
231 },
232 "required": [ "openstack_site",
233 "user",
234 "password",
235 "vim_tenant_name" ],
236 "else":
237 {
238 "vim_type": "aws"
239 },
240 "then":
241 {
242 "aws_site": { "type": "string" },
243 "user": { "type": "string" },
244 "password": { "type": "string",
245 "options": { "hidden": true }},
246 "vim_tenant_name": { "type": "string" }
247 },
248 "required": [ "aws_site",
249 "user",
250 "password",
251 "vim_tenant_name" ],
252 "else":
253 {
254 "vim_type": "VMWare"
255 },
256 "then":
257 {
258 "vrops_site": { "type": "string" },
259 "vrops_user": { "type": "string" },
260 "vrops_password": { "type": "string",
261 "options": { "hidden": true }},
262 "vcloud_site": { "type": "string" },
263 "admin_username": { "type": "string" },
264 "admin_password": { "type": "string",
265 "options": { "hidden": true }},
266 "nsx_manager": { "type": "string" },
267 "nsx_user": { "type": "string" },
268 "nsx_password": { "type": "string",
269 "options": { "hidden": true }},
270 "vcenter_ip": { "type": "string" },
271 "vcenter_port": { "type": "string" },
272 "vcenter_user": { "type": "string" },
273 "vcenter_password": { "type": "string",
274 "options": { "hidden": true }},
275 "vim_tenant_name": { "type": "string" },
276 "org_name": { "type": "string" }
277 },
278 "required": [ "vrops_site",
279 "vrops_user",
280 "vrops_password",
281 "vcloud_site",
282 "admin_username",
283 "admin_password",
284 "vcenter_ip",
285 "vcenter_port",
286 "vcenter_user",
287 "vcenter_password",
288 "vim_tenant_name",
289 "orgname" ]
290 }
291 }
292 }
293
294 publish(key, value.json.dumps(payload_access_credentials), topic='access_credentials')